plate_detection.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. """Build plate empty detection using OpenCV.
  2. Analyzes camera frames to detect if there are objects on the build plate.
  3. Uses calibration-based difference detection - compares current frame to
  4. a reference image of the empty plate.
  5. """
  6. import logging
  7. from pathlib import Path
  8. logger = logging.getLogger(__name__)
  9. # Optional OpenCV import - feature disabled if not available
  10. try:
  11. import cv2
  12. import numpy as np
  13. OPENCV_AVAILABLE = True
  14. except ImportError:
  15. OPENCV_AVAILABLE = False
  16. logger.info("OpenCV not available - plate detection feature disabled")
  17. def _get_calibration_dir() -> Path:
  18. """Get the calibration directory from settings (ensures persistence in Docker)."""
  19. from backend.app.core.config import settings
  20. return settings.plate_calibration_dir
  21. class PlateDetectionResult:
  22. """Result of plate detection analysis."""
  23. def __init__(
  24. self,
  25. is_empty: bool,
  26. confidence: float,
  27. difference_percent: float,
  28. message: str,
  29. debug_image: bytes | None = None,
  30. needs_calibration: bool = False,
  31. ):
  32. self.is_empty = is_empty
  33. self.confidence = confidence # 0.0 to 1.0
  34. self.difference_percent = difference_percent # How different from reference
  35. self.message = message
  36. self.debug_image = debug_image # Optional annotated image for debugging
  37. self.needs_calibration = needs_calibration # True if no reference image exists
  38. def to_dict(self) -> dict:
  39. return {
  40. "is_empty": bool(self.is_empty),
  41. "confidence": float(round(self.confidence, 2)),
  42. "difference_percent": float(round(self.difference_percent, 2)),
  43. "message": self.message,
  44. "has_debug_image": self.debug_image is not None,
  45. "needs_calibration": bool(self.needs_calibration),
  46. }
  47. class PlateDetector:
  48. """Detects if the build plate is empty using calibration-based difference detection."""
  49. # Default region of interest (ROI) as percentage of image dimensions
  50. # These define where the build plate typically appears in the camera view
  51. # Format: (x_start%, y_start%, width%, height%)
  52. DEFAULT_ROI = (0.15, 0.35, 0.70, 0.55) # Center-lower portion of frame
  53. # Detection thresholds for difference detection
  54. # Using mean pixel difference (0-100% scale)
  55. # Small objects may only cause 1-2% mean difference
  56. DEFAULT_DIFFERENCE_THRESHOLD = 1.0
  57. DEFAULT_BLUR_SIZE = 21 # Gaussian blur kernel size (must be odd) - unused with edge detection
  58. def __init__(
  59. self,
  60. roi: tuple[float, float, float, float] | None = None,
  61. difference_threshold: float = DEFAULT_DIFFERENCE_THRESHOLD,
  62. blur_size: int = DEFAULT_BLUR_SIZE,
  63. ):
  64. """Initialize the plate detector.
  65. Args:
  66. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  67. difference_threshold: Percentage of pixels that must differ to trigger "not empty"
  68. blur_size: Gaussian blur kernel size for noise reduction
  69. """
  70. if not OPENCV_AVAILABLE:
  71. raise RuntimeError("OpenCV is not installed. Install with: pip install opencv-python-headless")
  72. self.roi = roi or self.DEFAULT_ROI
  73. self.difference_threshold = difference_threshold
  74. self.blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1 # Must be odd
  75. # Maximum number of reference images to store per printer
  76. MAX_REFERENCES = 5
  77. def _get_metadata_path(self, printer_id: int) -> Path:
  78. """Get the path to the metadata JSON file for a printer."""
  79. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  80. return _get_calibration_dir() / f"printer_{printer_id}_metadata.json"
  81. def _load_metadata(self, printer_id: int) -> dict:
  82. """Load metadata for a printer's references."""
  83. import json
  84. meta_path = self._get_metadata_path(printer_id)
  85. if meta_path.exists():
  86. try:
  87. with open(meta_path) as f:
  88. return json.load(f)
  89. except Exception:
  90. pass
  91. return {"references": {}}
  92. def _save_metadata(self, printer_id: int, metadata: dict) -> None:
  93. """Save metadata for a printer's references."""
  94. import json
  95. meta_path = self._get_metadata_path(printer_id)
  96. with open(meta_path, "w") as f:
  97. json.dump(metadata, f, indent=2)
  98. def _get_reference_paths(self, printer_id: int) -> list[Path]:
  99. """Get all existing reference image paths for a printer."""
  100. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  101. paths = []
  102. for i in range(self.MAX_REFERENCES):
  103. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  104. if path.exists():
  105. paths.append(path)
  106. return paths
  107. def _get_next_reference_slot(self, printer_id: int) -> Path:
  108. """Get the path for the next reference image slot (cycles through slots)."""
  109. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  110. # Find first empty slot, or use oldest (slot 0) and shift others
  111. for i in range(self.MAX_REFERENCES):
  112. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  113. if not path.exists():
  114. return path
  115. # All slots full - return slot 0 (will be overwritten, but we rotate first)
  116. return _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  117. def _rotate_references(self, printer_id: int) -> None:
  118. """Rotate references: delete oldest (0), shift others down."""
  119. # Delete slot 0
  120. slot0 = _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  121. if slot0.exists():
  122. logger.info(f"Rotating references: removing oldest {slot0}")
  123. slot0.unlink()
  124. # Shift others down
  125. for i in range(1, self.MAX_REFERENCES):
  126. old_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  127. new_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  128. if old_path.exists():
  129. old_path.rename(new_path)
  130. # Also rotate metadata
  131. metadata = self._load_metadata(printer_id)
  132. refs = metadata.get("references", {})
  133. new_refs = {}
  134. for i in range(1, self.MAX_REFERENCES):
  135. if str(i) in refs:
  136. new_refs[str(i - 1)] = refs[str(i)]
  137. metadata["references"] = new_refs
  138. self._save_metadata(printer_id, metadata)
  139. def get_references(self, printer_id: int) -> list[dict]:
  140. """Get all references with metadata for a printer.
  141. Returns list of dicts with: index, label, timestamp, has_image
  142. """
  143. metadata = self._load_metadata(printer_id)
  144. refs = metadata.get("references", {})
  145. result = []
  146. for i in range(self.MAX_REFERENCES):
  147. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  148. if path.exists():
  149. ref_meta = refs.get(str(i), {})
  150. result.append(
  151. {
  152. "index": i,
  153. "label": ref_meta.get("label", ""),
  154. "timestamp": ref_meta.get("timestamp", ""),
  155. "has_image": True,
  156. }
  157. )
  158. return result
  159. def update_reference_label(self, printer_id: int, index: int, label: str) -> bool:
  160. """Update the label for a reference."""
  161. if index < 0 or index >= self.MAX_REFERENCES:
  162. return False
  163. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  164. if not path.exists():
  165. return False
  166. metadata = self._load_metadata(printer_id)
  167. if "references" not in metadata:
  168. metadata["references"] = {}
  169. if str(index) not in metadata["references"]:
  170. metadata["references"][str(index)] = {}
  171. metadata["references"][str(index)]["label"] = label
  172. self._save_metadata(printer_id, metadata)
  173. return True
  174. def delete_reference(self, printer_id: int, index: int) -> bool:
  175. """Delete a specific reference by index."""
  176. if index < 0 or index >= self.MAX_REFERENCES:
  177. return False
  178. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  179. if not path.exists():
  180. return False
  181. # Delete image
  182. logger.info(f"Deleting reference {index} for printer {printer_id}: {path}")
  183. path.unlink()
  184. # Remove from metadata
  185. metadata = self._load_metadata(printer_id)
  186. refs = metadata.get("references", {})
  187. if str(index) in refs:
  188. del refs[str(index)]
  189. metadata["references"] = refs
  190. self._save_metadata(printer_id, metadata)
  191. # Shift remaining references down to fill the gap
  192. for i in range(index + 1, self.MAX_REFERENCES):
  193. old_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  194. new_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  195. if old_img.exists():
  196. old_img.rename(new_img)
  197. # Also shift metadata
  198. if str(i) in refs:
  199. refs[str(i - 1)] = refs[str(i)]
  200. del refs[str(i)]
  201. metadata["references"] = refs
  202. self._save_metadata(printer_id, metadata)
  203. return True
  204. def get_reference_thumbnail(self, printer_id: int, index: int, max_size: int = 150) -> bytes | None:
  205. """Get a thumbnail of a reference image.
  206. Returns JPEG bytes or None if not found.
  207. """
  208. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  209. if not path.exists():
  210. return None
  211. try:
  212. img = cv2.imread(str(path))
  213. if img is None:
  214. return None
  215. # Calculate thumbnail size maintaining aspect ratio
  216. h, w = img.shape[:2]
  217. if w > h:
  218. new_w = max_size
  219. new_h = int(h * max_size / w)
  220. else:
  221. new_h = max_size
  222. new_w = int(w * max_size / h)
  223. thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
  224. _, buffer = cv2.imencode(".jpg", thumb, [cv2.IMWRITE_JPEG_QUALITY, 80])
  225. return buffer.tobytes()
  226. except Exception as e:
  227. logger.error(f"Error creating thumbnail: {e}")
  228. return None
  229. def _extract_roi(self, frame: np.ndarray) -> tuple[np.ndarray, int, int, int, int]:
  230. """Extract the region of interest from a frame.
  231. Returns:
  232. Tuple of (roi_frame, x_start, y_start, roi_width, roi_height)
  233. """
  234. height, width = frame.shape[:2]
  235. x_start = int(width * self.roi[0])
  236. y_start = int(height * self.roi[1])
  237. roi_width = int(width * self.roi[2])
  238. roi_height = int(height * self.roi[3])
  239. roi_frame = frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  240. return roi_frame, x_start, y_start, roi_width, roi_height
  241. def _preprocess_for_comparison(self, frame: np.ndarray) -> np.ndarray:
  242. """Preprocess a frame for comparison.
  243. Uses heavy blur to create "blob" representation - smooths out texture
  244. and noise while preserving large objects. Then normalizes brightness
  245. to reduce lighting sensitivity.
  246. """
  247. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  248. # Very heavy blur to smooth texture, keep only large shapes
  249. blurred = cv2.GaussianBlur(gray, (51, 51), 0)
  250. # Normalize to 0-255 range to reduce brightness sensitivity
  251. normalized = cv2.normalize(blurred, None, 0, 255, cv2.NORM_MINMAX)
  252. return normalized
  253. def calibrate(self, image_data: bytes, printer_id: int, label: str | None = None) -> tuple[bool, str, int]:
  254. """Calibrate by saving a reference image of the empty plate.
  255. Stores up to MAX_REFERENCES (5) images per printer. When all slots are full,
  256. the oldest reference is removed and others are shifted.
  257. Args:
  258. image_data: JPEG image data as bytes
  259. printer_id: Printer database ID
  260. label: Optional label for this reference (e.g., "High Temp Plate")
  261. Returns:
  262. Tuple of (success, message, index) where index is the slot used
  263. """
  264. from datetime import datetime
  265. try:
  266. # Decode image
  267. nparr = np.frombuffer(image_data, np.uint8)
  268. frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  269. if frame is None:
  270. return False, "Failed to decode image", -1
  271. # Get existing references count
  272. existing_refs = self._get_reference_paths(printer_id)
  273. num_existing = len(existing_refs)
  274. # If all slots are full, rotate (remove oldest)
  275. if num_existing >= self.MAX_REFERENCES:
  276. self._rotate_references(printer_id)
  277. num_existing = self.MAX_REFERENCES - 1
  278. # Save to next available slot
  279. slot_index = num_existing
  280. reference_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{slot_index}.jpg"
  281. write_success = cv2.imwrite(str(reference_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
  282. if not write_success:
  283. logger.error(f"cv2.imwrite failed for {reference_path}")
  284. return False, "Failed to save reference image", -1
  285. # Verify the file actually exists and has content
  286. if not reference_path.exists():
  287. logger.error(f"Reference image not found after save: {reference_path}")
  288. return False, "Reference image not found after save", -1
  289. file_size = reference_path.stat().st_size
  290. if file_size < 1000: # JPEG should be at least 1KB
  291. logger.error(f"Reference image too small ({file_size} bytes): {reference_path}")
  292. reference_path.unlink() # Clean up invalid file
  293. return False, f"Reference image corrupted (only {file_size} bytes)", -1
  294. logger.info(f"Saved reference image: {reference_path} ({file_size} bytes)")
  295. # Save metadata
  296. metadata = self._load_metadata(printer_id)
  297. if "references" not in metadata:
  298. metadata["references"] = {}
  299. metadata["references"][str(slot_index)] = {
  300. "label": label or "",
  301. "timestamp": datetime.now().isoformat(),
  302. }
  303. self._save_metadata(printer_id, metadata)
  304. logger.info(
  305. f"Saved plate calibration reference {slot_index + 1}/{self.MAX_REFERENCES} for printer {printer_id}"
  306. )
  307. return True, f"Calibration saved ({slot_index + 1}/{self.MAX_REFERENCES} references)", slot_index
  308. except Exception as e:
  309. logger.exception("Error during plate calibration")
  310. return False, f"Calibration error: {e!s}", -1
  311. def get_calibration_count(self, printer_id: int) -> int:
  312. """Get the number of calibration references for a printer."""
  313. return len(self._get_reference_paths(printer_id))
  314. def has_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  315. """Check if a printer has any calibration reference images."""
  316. return len(self._get_reference_paths(printer_id)) > 0
  317. def delete_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  318. """Delete all calibration reference images for a printer."""
  319. paths = self._get_reference_paths(printer_id)
  320. if not paths:
  321. return False
  322. for path in paths:
  323. path.unlink()
  324. logger.info(f"Deleted {len(paths)} plate calibration reference(s) for printer {printer_id}")
  325. return True
  326. def analyze_frame(
  327. self, image_data: bytes, printer_id: int, plate_type: str | None = None, include_debug_image: bool = False
  328. ) -> PlateDetectionResult:
  329. """Analyze a camera frame to detect if the plate is empty.
  330. Compares the current frame to all calibration reference images and uses
  331. the best match (lowest difference) for the final result.
  332. Args:
  333. image_data: JPEG image data as bytes
  334. printer_id: Printer database ID (for reference lookup)
  335. plate_type: Unused - kept for API compatibility
  336. include_debug_image: If True, include annotated image in result
  337. Returns:
  338. PlateDetectionResult with analysis results
  339. """
  340. try:
  341. # Check for calibration
  342. reference_paths = self._get_reference_paths(printer_id)
  343. if not reference_paths:
  344. return PlateDetectionResult(
  345. is_empty=True, # Default to empty when not calibrated
  346. confidence=0.0,
  347. difference_percent=0.0,
  348. message="No calibration - please calibrate with empty plate first",
  349. needs_calibration=True,
  350. )
  351. # Decode current image
  352. nparr = np.frombuffer(image_data, np.uint8)
  353. current_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  354. if current_frame is None:
  355. return PlateDetectionResult(
  356. is_empty=True,
  357. confidence=0.0,
  358. difference_percent=0.0,
  359. message="Failed to decode current image",
  360. )
  361. # Extract ROI from current frame
  362. current_roi, x_start, y_start, roi_width, roi_height = self._extract_roi(current_frame)
  363. current_processed = self._preprocess_for_comparison(current_roi)
  364. # Compare against all references, find best match (lowest difference)
  365. best_difference_percent = float("inf")
  366. best_ref_idx = -1
  367. best_diff = None
  368. for idx, ref_path in enumerate(reference_paths):
  369. # Load reference image
  370. reference_frame = cv2.imread(str(ref_path), cv2.IMREAD_COLOR)
  371. if reference_frame is None:
  372. continue
  373. # Ensure same dimensions
  374. if current_frame.shape != reference_frame.shape:
  375. reference_frame = cv2.resize(reference_frame, (current_frame.shape[1], current_frame.shape[0]))
  376. # Extract ROI and preprocess
  377. reference_roi, _, _, _, _ = self._extract_roi(reference_frame)
  378. reference_processed = self._preprocess_for_comparison(reference_roi)
  379. # Calculate absolute difference
  380. diff = cv2.absdiff(current_processed, reference_processed)
  381. # Calculate mean difference as percentage
  382. mean_diff = np.mean(diff)
  383. difference_percent = (mean_diff / 255.0) * 100
  384. if difference_percent < best_difference_percent:
  385. best_difference_percent = difference_percent
  386. best_ref_idx = idx
  387. best_diff = diff
  388. if best_ref_idx == -1:
  389. return PlateDetectionResult(
  390. is_empty=True,
  391. confidence=0.0,
  392. difference_percent=0.0,
  393. message="Failed to load any reference images - please recalibrate",
  394. needs_calibration=True,
  395. )
  396. difference_percent = best_difference_percent
  397. # Determine if plate is empty (use best match)
  398. is_empty = difference_percent < self.difference_threshold
  399. # Calculate confidence
  400. if is_empty:
  401. # Higher confidence when very little difference
  402. confidence = 1.0 - min(1.0, difference_percent / self.difference_threshold)
  403. else:
  404. # Higher confidence when clearly different
  405. confidence = min(1.0, difference_percent / (self.difference_threshold * 2))
  406. # Generate message
  407. num_refs = len(reference_paths)
  408. if is_empty:
  409. message = (
  410. f"Plate appears empty (difference: {difference_percent:.1f}%, ref {best_ref_idx + 1}/{num_refs})"
  411. )
  412. else:
  413. message = f"Objects detected on plate (difference: {difference_percent:.1f}%, best ref {best_ref_idx + 1}/{num_refs})"
  414. # Generate debug image if requested
  415. debug_image = None
  416. if include_debug_image and best_diff is not None:
  417. debug_frame = current_frame.copy()
  418. # Draw ROI rectangle
  419. cv2.rectangle(
  420. debug_frame,
  421. (x_start, y_start),
  422. (x_start + roi_width, y_start + roi_height),
  423. (0, 255, 0),
  424. 2,
  425. )
  426. # Create colored difference overlay
  427. # Red = areas that are different from reference
  428. # Amplify diff for visibility (multiply by 3, cap at 255)
  429. diff_amplified = np.minimum(best_diff * 3, 255).astype(np.uint8)
  430. diff_colored = cv2.cvtColor(diff_amplified, cv2.COLOR_GRAY2BGR)
  431. diff_colored[:, :, 0] = 0 # Remove blue
  432. diff_colored[:, :, 1] = 0 # Remove green
  433. # Red channel has the diff
  434. # Overlay difference on ROI
  435. roi_overlay = debug_frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  436. cv2.addWeighted(diff_colored, 0.5, roi_overlay, 0.5, 0, roi_overlay)
  437. # Add status text
  438. status_text = "EMPTY" if is_empty else "OBJECTS DETECTED"
  439. color = (0, 255, 0) if is_empty else (0, 0, 255)
  440. cv2.putText(debug_frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
  441. cv2.putText(
  442. debug_frame,
  443. f"Diff: {difference_percent:.1f}% (ref {best_ref_idx + 1}/{num_refs})",
  444. (10, 60),
  445. cv2.FONT_HERSHEY_SIMPLEX,
  446. 0.7,
  447. color,
  448. 2,
  449. )
  450. cv2.putText(
  451. debug_frame,
  452. f"Confidence: {confidence:.0%}",
  453. (10, 90),
  454. cv2.FONT_HERSHEY_SIMPLEX,
  455. 0.7,
  456. color,
  457. 2,
  458. )
  459. # Encode debug image as JPEG
  460. _, buffer = cv2.imencode(".jpg", debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
  461. debug_image = buffer.tobytes()
  462. return PlateDetectionResult(
  463. is_empty=is_empty,
  464. confidence=confidence,
  465. difference_percent=difference_percent,
  466. message=message,
  467. debug_image=debug_image,
  468. )
  469. except Exception as e:
  470. logger.exception("Error analyzing frame for plate detection")
  471. return PlateDetectionResult(
  472. is_empty=True, # Default to empty on error (don't block prints)
  473. confidence=0.0,
  474. difference_percent=0.0,
  475. message=f"Analysis error: {e!s}",
  476. )
  477. async def capture_camera_image(
  478. printer_id: int,
  479. ip_address: str,
  480. access_code: str,
  481. model: str,
  482. external_camera_url: str | None = None,
  483. external_camera_type: str | None = None,
  484. use_external: bool = False,
  485. ) -> tuple[bytes | None, str]:
  486. """Capture an image from the printer camera.
  487. If there's an active camera stream, uses the buffered frame instead of
  488. creating a new connection (which would fail while stream is active).
  489. Returns:
  490. Tuple of (image_data, camera_source) or (None, error_message)
  491. """
  492. image_data: bytes | None = None
  493. camera_source = "unknown"
  494. # Try external camera first if requested and available
  495. if use_external and external_camera_url and external_camera_type:
  496. try:
  497. from backend.app.services.external_camera import capture_frame
  498. image_data = await capture_frame(external_camera_url, external_camera_type)
  499. if image_data:
  500. camera_source = "external"
  501. logger.debug(f"Captured frame from external camera for printer {printer_id}")
  502. except Exception as e:
  503. logger.warning(f"Failed to capture from external camera: {e}")
  504. # Fall back to built-in camera
  505. if image_data is None:
  506. # First, check if there's an active stream with a buffered frame
  507. # This avoids blocking when camera viewer is open
  508. try:
  509. from backend.app.api.routes.camera import get_buffered_frame
  510. buffered = get_buffered_frame(printer_id)
  511. if buffered:
  512. image_data = buffered
  513. camera_source = "built-in (buffered)"
  514. logger.debug(f"Using buffered frame from active stream for printer {printer_id}")
  515. except Exception as e:
  516. logger.debug(f"Could not get buffered frame: {e}")
  517. # If no buffered frame, try to capture a new one
  518. if image_data is None:
  519. import tempfile
  520. from backend.app.services.camera import capture_camera_frame
  521. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
  522. tmp_path = Path(tmp.name)
  523. try:
  524. success = await capture_camera_frame(ip_address, access_code, model, tmp_path, timeout=10)
  525. if success:
  526. with open(tmp_path, "rb") as f:
  527. image_data = f.read()
  528. camera_source = "built-in"
  529. logger.debug(f"Captured frame from built-in camera for printer {printer_id}")
  530. finally:
  531. try:
  532. tmp_path.unlink()
  533. except Exception:
  534. pass
  535. return image_data, camera_source
  536. async def check_plate_empty(
  537. printer_id: int,
  538. ip_address: str,
  539. access_code: str,
  540. model: str,
  541. plate_type: str | None = None,
  542. include_debug_image: bool = False,
  543. external_camera_url: str | None = None,
  544. external_camera_type: str | None = None,
  545. use_external: bool = False,
  546. roi: tuple[float, float, float, float] | None = None,
  547. ) -> PlateDetectionResult:
  548. """Check if the build plate is empty for a printer.
  549. Args:
  550. printer_id: Printer database ID
  551. ip_address: Printer IP address
  552. access_code: Printer access code
  553. model: Printer model string
  554. plate_type: Type of build plate for calibration lookup
  555. include_debug_image: If True, include annotated image in result
  556. external_camera_url: URL of external camera (if configured)
  557. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  558. use_external: If True, prefer external camera over built-in
  559. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  560. Returns:
  561. PlateDetectionResult with analysis results
  562. """
  563. if not OPENCV_AVAILABLE:
  564. return PlateDetectionResult(
  565. is_empty=True,
  566. confidence=0.0,
  567. difference_percent=0.0,
  568. message="OpenCV not available - plate detection disabled",
  569. )
  570. image_data, camera_source = await capture_camera_image(
  571. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  572. )
  573. if image_data is None:
  574. return PlateDetectionResult(
  575. is_empty=True, # Default to empty on error
  576. confidence=0.0,
  577. difference_percent=0.0,
  578. message="Failed to capture camera frame from any source",
  579. )
  580. # Analyze the captured frame
  581. detector = PlateDetector(roi=roi)
  582. result = detector.analyze_frame(image_data, printer_id, plate_type, include_debug_image)
  583. # Add camera source to message
  584. result.message = f"[{camera_source}] {result.message}"
  585. return result
  586. async def calibrate_plate(
  587. printer_id: int,
  588. ip_address: str,
  589. access_code: str,
  590. model: str,
  591. label: str | None = None,
  592. external_camera_url: str | None = None,
  593. external_camera_type: str | None = None,
  594. use_external: bool = False,
  595. ) -> tuple[bool, str, int]:
  596. """Calibrate plate detection by capturing a reference image of the empty plate.
  597. Args:
  598. printer_id: Printer database ID
  599. ip_address: Printer IP address
  600. access_code: Printer access code
  601. model: Printer model string
  602. label: Optional label for this reference (e.g., "High Temp Plate")
  603. external_camera_url: URL of external camera (if configured)
  604. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  605. use_external: If True, prefer external camera over built-in
  606. Returns:
  607. Tuple of (success, message, index)
  608. """
  609. if not OPENCV_AVAILABLE:
  610. return False, "OpenCV not available - plate detection disabled", -1
  611. image_data, camera_source = await capture_camera_image(
  612. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  613. )
  614. if image_data is None:
  615. return False, "Failed to capture camera frame for calibration", -1
  616. detector = PlateDetector()
  617. success, message, index = detector.calibrate(image_data, printer_id, label)
  618. if success:
  619. message = f"[{camera_source}] {message}"
  620. return success, message, index
  621. def get_calibration_status(printer_id: int, plate_type: str | None = None) -> dict:
  622. """Get calibration status for a printer.
  623. Returns:
  624. Dict with calibration info including reference count
  625. """
  626. if not OPENCV_AVAILABLE:
  627. return {
  628. "available": False,
  629. "calibrated": False,
  630. "reference_count": 0,
  631. "max_references": 5,
  632. "message": "OpenCV not available",
  633. }
  634. detector = PlateDetector()
  635. calibrated = detector.has_calibration(printer_id)
  636. ref_count = detector.get_calibration_count(printer_id)
  637. if calibrated:
  638. message = f"Calibrated with {ref_count}/{detector.MAX_REFERENCES} reference(s)"
  639. else:
  640. message = "Not calibrated - please calibrate with empty plate"
  641. return {
  642. "available": True,
  643. "calibrated": calibrated,
  644. "reference_count": ref_count,
  645. "max_references": detector.MAX_REFERENCES,
  646. "message": message,
  647. }
  648. def delete_calibration(printer_id: int, plate_type: str | None = None) -> bool:
  649. """Delete calibration for a printer and plate type."""
  650. if not OPENCV_AVAILABLE:
  651. return False
  652. detector = PlateDetector()
  653. return detector.delete_calibration(printer_id, plate_type)
  654. def is_plate_detection_available() -> bool:
  655. """Check if plate detection feature is available (OpenCV installed)."""
  656. return OPENCV_AVAILABLE