plate_detection.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. """Build plate empty detection using OpenCV.
  2. Analyzes camera frames to detect if there are objects on the build plate.
  3. Uses calibration-based difference detection - compares current frame to
  4. a reference image of the empty plate.
  5. """
  6. from __future__ import annotations
  7. import logging
  8. from pathlib import Path
  9. logger = logging.getLogger(__name__)
  10. # Optional OpenCV import - feature disabled if not available
  11. try:
  12. import cv2
  13. import numpy as np
  14. OPENCV_AVAILABLE = True
  15. except ImportError:
  16. OPENCV_AVAILABLE = False
  17. logger.info("OpenCV not available - plate detection feature disabled")
  18. def _get_calibration_dir() -> Path:
  19. """Get the calibration directory from settings (ensures persistence in Docker)."""
  20. from backend.app.core.config import settings
  21. return settings.plate_calibration_dir
  22. class PlateDetectionResult:
  23. """Result of plate detection analysis."""
  24. def __init__(
  25. self,
  26. is_empty: bool,
  27. confidence: float,
  28. difference_percent: float,
  29. message: str,
  30. debug_image: bytes | None = None,
  31. needs_calibration: bool = False,
  32. ):
  33. self.is_empty = is_empty
  34. self.confidence = confidence # 0.0 to 1.0
  35. self.difference_percent = difference_percent # How different from reference
  36. self.message = message
  37. self.debug_image = debug_image # Optional annotated image for debugging
  38. self.needs_calibration = needs_calibration # True if no reference image exists
  39. def to_dict(self) -> dict:
  40. return {
  41. "is_empty": bool(self.is_empty),
  42. "confidence": float(round(self.confidence, 2)),
  43. "difference_percent": float(round(self.difference_percent, 2)),
  44. "message": self.message,
  45. "has_debug_image": self.debug_image is not None,
  46. "needs_calibration": bool(self.needs_calibration),
  47. }
  48. class PlateDetector:
  49. """Detects if the build plate is empty using calibration-based difference detection."""
  50. # Default region of interest (ROI) as percentage of image dimensions
  51. # These define where the build plate typically appears in the camera view
  52. # Format: (x_start%, y_start%, width%, height%)
  53. DEFAULT_ROI = (0.15, 0.35, 0.70, 0.55) # Center-lower portion of frame
  54. # Detection thresholds for difference detection
  55. # Using mean pixel difference (0-100% scale)
  56. # Small objects may only cause 1-2% mean difference
  57. DEFAULT_DIFFERENCE_THRESHOLD = 1.0
  58. DEFAULT_BLUR_SIZE = 21 # Gaussian blur kernel size (must be odd) - unused with edge detection
  59. def __init__(
  60. self,
  61. roi: tuple[float, float, float, float] | None = None,
  62. difference_threshold: float = DEFAULT_DIFFERENCE_THRESHOLD,
  63. blur_size: int = DEFAULT_BLUR_SIZE,
  64. ):
  65. """Initialize the plate detector.
  66. Args:
  67. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  68. difference_threshold: Percentage of pixels that must differ to trigger "not empty"
  69. blur_size: Gaussian blur kernel size for noise reduction
  70. """
  71. if not OPENCV_AVAILABLE:
  72. raise RuntimeError("OpenCV is not installed. Install with: pip install opencv-python-headless")
  73. self.roi = roi or self.DEFAULT_ROI
  74. self.difference_threshold = difference_threshold
  75. self.blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1 # Must be odd
  76. # Maximum number of reference images to store per printer
  77. MAX_REFERENCES = 5
  78. def _get_metadata_path(self, printer_id: int) -> Path:
  79. """Get the path to the metadata JSON file for a printer."""
  80. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  81. return _get_calibration_dir() / f"printer_{printer_id}_metadata.json"
  82. def _load_metadata(self, printer_id: int) -> dict:
  83. """Load metadata for a printer's references."""
  84. import json
  85. meta_path = self._get_metadata_path(printer_id)
  86. if meta_path.exists():
  87. try:
  88. with open(meta_path) as f:
  89. return json.load(f)
  90. except Exception:
  91. pass
  92. return {"references": {}}
  93. def _save_metadata(self, printer_id: int, metadata: dict) -> None:
  94. """Save metadata for a printer's references."""
  95. import json
  96. meta_path = self._get_metadata_path(printer_id)
  97. with open(meta_path, "w") as f:
  98. json.dump(metadata, f, indent=2)
  99. def _get_reference_paths(self, printer_id: int) -> list[Path]:
  100. """Get all existing reference image paths for a printer."""
  101. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  102. paths = []
  103. for i in range(self.MAX_REFERENCES):
  104. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  105. if path.exists():
  106. paths.append(path)
  107. return paths
  108. def _get_next_reference_slot(self, printer_id: int) -> Path:
  109. """Get the path for the next reference image slot (cycles through slots)."""
  110. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  111. # Find first empty slot, or use oldest (slot 0) and shift others
  112. for i in range(self.MAX_REFERENCES):
  113. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  114. if not path.exists():
  115. return path
  116. # All slots full - return slot 0 (will be overwritten, but we rotate first)
  117. return _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  118. def _rotate_references(self, printer_id: int) -> None:
  119. """Rotate references: delete oldest (0), shift others down."""
  120. # Delete slot 0
  121. slot0 = _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  122. if slot0.exists():
  123. logger.info(f"Rotating references: removing oldest {slot0}")
  124. slot0.unlink()
  125. # Shift others down
  126. for i in range(1, self.MAX_REFERENCES):
  127. old_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  128. new_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  129. if old_path.exists():
  130. old_path.rename(new_path)
  131. # Also rotate metadata
  132. metadata = self._load_metadata(printer_id)
  133. refs = metadata.get("references", {})
  134. new_refs = {}
  135. for i in range(1, self.MAX_REFERENCES):
  136. if str(i) in refs:
  137. new_refs[str(i - 1)] = refs[str(i)]
  138. metadata["references"] = new_refs
  139. self._save_metadata(printer_id, metadata)
  140. def get_references(self, printer_id: int) -> list[dict]:
  141. """Get all references with metadata for a printer.
  142. Returns list of dicts with: index, label, timestamp, has_image
  143. """
  144. metadata = self._load_metadata(printer_id)
  145. refs = metadata.get("references", {})
  146. result = []
  147. for i in range(self.MAX_REFERENCES):
  148. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  149. if path.exists():
  150. ref_meta = refs.get(str(i), {})
  151. result.append(
  152. {
  153. "index": i,
  154. "label": ref_meta.get("label", ""),
  155. "timestamp": ref_meta.get("timestamp", ""),
  156. "has_image": True,
  157. }
  158. )
  159. return result
  160. def update_reference_label(self, printer_id: int, index: int, label: str) -> bool:
  161. """Update the label for a reference."""
  162. if index < 0 or index >= self.MAX_REFERENCES:
  163. return False
  164. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  165. if not path.exists():
  166. return False
  167. metadata = self._load_metadata(printer_id)
  168. if "references" not in metadata:
  169. metadata["references"] = {}
  170. if str(index) not in metadata["references"]:
  171. metadata["references"][str(index)] = {}
  172. metadata["references"][str(index)]["label"] = label
  173. self._save_metadata(printer_id, metadata)
  174. return True
  175. def delete_reference(self, printer_id: int, index: int) -> bool:
  176. """Delete a specific reference by index."""
  177. if index < 0 or index >= self.MAX_REFERENCES:
  178. return False
  179. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  180. if not path.exists():
  181. return False
  182. # Delete image
  183. logger.info(f"Deleting reference {index} for printer {printer_id}: {path}")
  184. path.unlink()
  185. # Remove from metadata
  186. metadata = self._load_metadata(printer_id)
  187. refs = metadata.get("references", {})
  188. if str(index) in refs:
  189. del refs[str(index)]
  190. metadata["references"] = refs
  191. self._save_metadata(printer_id, metadata)
  192. # Shift remaining references down to fill the gap
  193. for i in range(index + 1, self.MAX_REFERENCES):
  194. old_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  195. new_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  196. if old_img.exists():
  197. old_img.rename(new_img)
  198. # Also shift metadata
  199. if str(i) in refs:
  200. refs[str(i - 1)] = refs[str(i)]
  201. del refs[str(i)]
  202. metadata["references"] = refs
  203. self._save_metadata(printer_id, metadata)
  204. return True
  205. def get_reference_thumbnail(self, printer_id: int, index: int, max_size: int = 150) -> bytes | None:
  206. """Get a thumbnail of a reference image.
  207. Returns JPEG bytes or None if not found.
  208. """
  209. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  210. if not path.exists():
  211. return None
  212. try:
  213. img = cv2.imread(str(path))
  214. if img is None:
  215. return None
  216. # Calculate thumbnail size maintaining aspect ratio
  217. h, w = img.shape[:2]
  218. if w > h:
  219. new_w = max_size
  220. new_h = int(h * max_size / w)
  221. else:
  222. new_h = max_size
  223. new_w = int(w * max_size / h)
  224. thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
  225. _, buffer = cv2.imencode(".jpg", thumb, [cv2.IMWRITE_JPEG_QUALITY, 80])
  226. return buffer.tobytes()
  227. except Exception as e:
  228. logger.error(f"Error creating thumbnail: {e}")
  229. return None
  230. def _extract_roi(self, frame: np.ndarray) -> tuple[np.ndarray, int, int, int, int]:
  231. """Extract the region of interest from a frame.
  232. Returns:
  233. Tuple of (roi_frame, x_start, y_start, roi_width, roi_height)
  234. """
  235. height, width = frame.shape[:2]
  236. x_start = int(width * self.roi[0])
  237. y_start = int(height * self.roi[1])
  238. roi_width = int(width * self.roi[2])
  239. roi_height = int(height * self.roi[3])
  240. roi_frame = frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  241. return roi_frame, x_start, y_start, roi_width, roi_height
  242. def _preprocess_for_comparison(self, frame: np.ndarray) -> np.ndarray:
  243. """Preprocess a frame for comparison.
  244. Uses heavy blur to create "blob" representation - smooths out texture
  245. and noise while preserving large objects. Then normalizes brightness
  246. to reduce lighting sensitivity.
  247. """
  248. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  249. # Very heavy blur to smooth texture, keep only large shapes
  250. blurred = cv2.GaussianBlur(gray, (51, 51), 0)
  251. # Normalize to 0-255 range to reduce brightness sensitivity
  252. normalized = cv2.normalize(blurred, None, 0, 255, cv2.NORM_MINMAX)
  253. return normalized
  254. def calibrate(self, image_data: bytes, printer_id: int, label: str | None = None) -> tuple[bool, str, int]:
  255. """Calibrate by saving a reference image of the empty plate.
  256. Stores up to MAX_REFERENCES (5) images per printer. When all slots are full,
  257. the oldest reference is removed and others are shifted.
  258. Args:
  259. image_data: JPEG image data as bytes
  260. printer_id: Printer database ID
  261. label: Optional label for this reference (e.g., "High Temp Plate")
  262. Returns:
  263. Tuple of (success, message, index) where index is the slot used
  264. """
  265. from datetime import datetime
  266. try:
  267. # Decode image
  268. nparr = np.frombuffer(image_data, np.uint8)
  269. frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  270. if frame is None:
  271. return False, "Failed to decode image", -1
  272. # Get existing references count
  273. existing_refs = self._get_reference_paths(printer_id)
  274. num_existing = len(existing_refs)
  275. # If all slots are full, rotate (remove oldest)
  276. if num_existing >= self.MAX_REFERENCES:
  277. self._rotate_references(printer_id)
  278. num_existing = self.MAX_REFERENCES - 1
  279. # Save to next available slot
  280. slot_index = num_existing
  281. reference_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{slot_index}.jpg"
  282. write_success = cv2.imwrite(str(reference_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
  283. if not write_success:
  284. logger.error(f"cv2.imwrite failed for {reference_path}")
  285. return False, "Failed to save reference image", -1
  286. # Verify the file actually exists and has content
  287. if not reference_path.exists():
  288. logger.error(f"Reference image not found after save: {reference_path}")
  289. return False, "Reference image not found after save", -1
  290. file_size = reference_path.stat().st_size
  291. if file_size < 1000: # JPEG should be at least 1KB
  292. logger.error(f"Reference image too small ({file_size} bytes): {reference_path}")
  293. reference_path.unlink() # Clean up invalid file
  294. return False, f"Reference image corrupted (only {file_size} bytes)", -1
  295. logger.info(f"Saved reference image: {reference_path} ({file_size} bytes)")
  296. # Save metadata
  297. metadata = self._load_metadata(printer_id)
  298. if "references" not in metadata:
  299. metadata["references"] = {}
  300. metadata["references"][str(slot_index)] = {
  301. "label": label or "",
  302. "timestamp": datetime.now().isoformat(),
  303. }
  304. self._save_metadata(printer_id, metadata)
  305. logger.info(
  306. f"Saved plate calibration reference {slot_index + 1}/{self.MAX_REFERENCES} for printer {printer_id}"
  307. )
  308. return True, f"Calibration saved ({slot_index + 1}/{self.MAX_REFERENCES} references)", slot_index
  309. except Exception as e:
  310. logger.exception("Error during plate calibration")
  311. # Don't expose exception details to user - log has full info
  312. error_type = type(e).__name__
  313. return False, f"Calibration error: {error_type}", -1
  314. def get_calibration_count(self, printer_id: int) -> int:
  315. """Get the number of calibration references for a printer."""
  316. return len(self._get_reference_paths(printer_id))
  317. def has_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  318. """Check if a printer has any calibration reference images."""
  319. return len(self._get_reference_paths(printer_id)) > 0
  320. def delete_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  321. """Delete all calibration reference images for a printer."""
  322. paths = self._get_reference_paths(printer_id)
  323. if not paths:
  324. return False
  325. for path in paths:
  326. path.unlink()
  327. logger.info(f"Deleted {len(paths)} plate calibration reference(s) for printer {printer_id}")
  328. return True
  329. def analyze_frame(
  330. self, image_data: bytes, printer_id: int, plate_type: str | None = None, include_debug_image: bool = False
  331. ) -> PlateDetectionResult:
  332. """Analyze a camera frame to detect if the plate is empty.
  333. Compares the current frame to all calibration reference images and uses
  334. the best match (lowest difference) for the final result.
  335. Args:
  336. image_data: JPEG image data as bytes
  337. printer_id: Printer database ID (for reference lookup)
  338. plate_type: Unused - kept for API compatibility
  339. include_debug_image: If True, include annotated image in result
  340. Returns:
  341. PlateDetectionResult with analysis results
  342. """
  343. try:
  344. # Check for calibration
  345. reference_paths = self._get_reference_paths(printer_id)
  346. if not reference_paths:
  347. return PlateDetectionResult(
  348. is_empty=True, # Default to empty when not calibrated
  349. confidence=0.0,
  350. difference_percent=0.0,
  351. message="No calibration - please calibrate with empty plate first",
  352. needs_calibration=True,
  353. )
  354. # Decode current image
  355. nparr = np.frombuffer(image_data, np.uint8)
  356. current_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  357. if current_frame is None:
  358. return PlateDetectionResult(
  359. is_empty=True,
  360. confidence=0.0,
  361. difference_percent=0.0,
  362. message="Failed to decode current image",
  363. )
  364. # Extract ROI from current frame
  365. current_roi, x_start, y_start, roi_width, roi_height = self._extract_roi(current_frame)
  366. current_processed = self._preprocess_for_comparison(current_roi)
  367. # Compare against all references, find best match (lowest difference)
  368. best_difference_percent = float("inf")
  369. best_ref_idx = -1
  370. best_diff = None
  371. for idx, ref_path in enumerate(reference_paths):
  372. # Load reference image
  373. reference_frame = cv2.imread(str(ref_path), cv2.IMREAD_COLOR)
  374. if reference_frame is None:
  375. continue
  376. # Ensure same dimensions
  377. if current_frame.shape != reference_frame.shape:
  378. reference_frame = cv2.resize(reference_frame, (current_frame.shape[1], current_frame.shape[0]))
  379. # Extract ROI and preprocess
  380. reference_roi, _, _, _, _ = self._extract_roi(reference_frame)
  381. reference_processed = self._preprocess_for_comparison(reference_roi)
  382. # Calculate absolute difference
  383. diff = cv2.absdiff(current_processed, reference_processed)
  384. # Calculate mean difference as percentage
  385. mean_diff = np.mean(diff)
  386. difference_percent = (mean_diff / 255.0) * 100
  387. if difference_percent < best_difference_percent:
  388. best_difference_percent = difference_percent
  389. best_ref_idx = idx
  390. best_diff = diff
  391. if best_ref_idx == -1:
  392. return PlateDetectionResult(
  393. is_empty=True,
  394. confidence=0.0,
  395. difference_percent=0.0,
  396. message="Failed to load any reference images - please recalibrate",
  397. needs_calibration=True,
  398. )
  399. difference_percent = best_difference_percent
  400. # Determine if plate is empty (use best match)
  401. is_empty = difference_percent < self.difference_threshold
  402. # Calculate confidence
  403. if is_empty:
  404. # Higher confidence when very little difference
  405. confidence = 1.0 - min(1.0, difference_percent / self.difference_threshold)
  406. else:
  407. # Higher confidence when clearly different
  408. confidence = min(1.0, difference_percent / (self.difference_threshold * 2))
  409. # Generate message
  410. num_refs = len(reference_paths)
  411. if is_empty:
  412. message = (
  413. f"Plate appears empty (difference: {difference_percent:.1f}%, ref {best_ref_idx + 1}/{num_refs})"
  414. )
  415. else:
  416. message = f"Objects detected on plate (difference: {difference_percent:.1f}%, best ref {best_ref_idx + 1}/{num_refs})"
  417. # Generate debug image if requested
  418. debug_image = None
  419. if include_debug_image and best_diff is not None:
  420. debug_frame = current_frame.copy()
  421. # Draw ROI rectangle
  422. cv2.rectangle(
  423. debug_frame,
  424. (x_start, y_start),
  425. (x_start + roi_width, y_start + roi_height),
  426. (0, 255, 0),
  427. 2,
  428. )
  429. # Create colored difference overlay
  430. # Red = areas that are different from reference
  431. # Amplify diff for visibility (multiply by 3, cap at 255)
  432. diff_amplified = np.minimum(best_diff * 3, 255).astype(np.uint8)
  433. diff_colored = cv2.cvtColor(diff_amplified, cv2.COLOR_GRAY2BGR)
  434. diff_colored[:, :, 0] = 0 # Remove blue
  435. diff_colored[:, :, 1] = 0 # Remove green
  436. # Red channel has the diff
  437. # Overlay difference on ROI
  438. roi_overlay = debug_frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  439. cv2.addWeighted(diff_colored, 0.5, roi_overlay, 0.5, 0, roi_overlay)
  440. # Add status text
  441. status_text = "EMPTY" if is_empty else "OBJECTS DETECTED"
  442. color = (0, 255, 0) if is_empty else (0, 0, 255)
  443. cv2.putText(debug_frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
  444. cv2.putText(
  445. debug_frame,
  446. f"Diff: {difference_percent:.1f}% (ref {best_ref_idx + 1}/{num_refs})",
  447. (10, 60),
  448. cv2.FONT_HERSHEY_SIMPLEX,
  449. 0.7,
  450. color,
  451. 2,
  452. )
  453. cv2.putText(
  454. debug_frame,
  455. f"Confidence: {confidence:.0%}",
  456. (10, 90),
  457. cv2.FONT_HERSHEY_SIMPLEX,
  458. 0.7,
  459. color,
  460. 2,
  461. )
  462. # Encode debug image as JPEG
  463. _, buffer = cv2.imencode(".jpg", debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
  464. debug_image = buffer.tobytes()
  465. return PlateDetectionResult(
  466. is_empty=is_empty,
  467. confidence=confidence,
  468. difference_percent=difference_percent,
  469. message=message,
  470. debug_image=debug_image,
  471. )
  472. except Exception as e:
  473. logger.exception("Error analyzing frame for plate detection")
  474. return PlateDetectionResult(
  475. is_empty=True, # Default to empty on error (don't block prints)
  476. confidence=0.0,
  477. difference_percent=0.0,
  478. message=f"Analysis error: {e!s}",
  479. )
  480. async def capture_camera_image(
  481. printer_id: int,
  482. ip_address: str,
  483. access_code: str,
  484. model: str,
  485. external_camera_url: str | None = None,
  486. external_camera_type: str | None = None,
  487. use_external: bool = False,
  488. ) -> tuple[bytes | None, str]:
  489. """Capture an image from the printer camera.
  490. If there's an active camera stream, uses the buffered frame instead of
  491. creating a new connection (which would fail while stream is active).
  492. Returns:
  493. Tuple of (image_data, camera_source) or (None, error_message)
  494. """
  495. image_data: bytes | None = None
  496. camera_source = "unknown"
  497. # Try external camera first if requested and available
  498. if use_external and external_camera_url and external_camera_type:
  499. try:
  500. from backend.app.services.external_camera import capture_frame
  501. image_data = await capture_frame(external_camera_url, external_camera_type)
  502. if image_data:
  503. camera_source = "external"
  504. logger.debug(f"Captured frame from external camera for printer {printer_id}")
  505. except Exception as e:
  506. logger.warning(f"Failed to capture from external camera: {e}")
  507. # Fall back to built-in camera
  508. if image_data is None:
  509. # First, check if there's an active stream with a buffered frame
  510. # This avoids blocking when camera viewer is open
  511. try:
  512. from backend.app.api.routes.camera import get_buffered_frame
  513. buffered = get_buffered_frame(printer_id)
  514. if buffered:
  515. image_data = buffered
  516. camera_source = "built-in (buffered)"
  517. logger.debug(f"Using buffered frame from active stream for printer {printer_id}")
  518. except Exception as e:
  519. logger.debug(f"Could not get buffered frame: {e}")
  520. # If no buffered frame, try to capture a new one
  521. if image_data is None:
  522. import tempfile
  523. from backend.app.services.camera import capture_camera_frame
  524. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
  525. tmp_path = Path(tmp.name)
  526. try:
  527. success = await capture_camera_frame(ip_address, access_code, model, tmp_path, timeout=10)
  528. if success:
  529. with open(tmp_path, "rb") as f:
  530. image_data = f.read()
  531. camera_source = "built-in"
  532. logger.debug(f"Captured frame from built-in camera for printer {printer_id}")
  533. finally:
  534. try:
  535. tmp_path.unlink()
  536. except Exception:
  537. pass
  538. return image_data, camera_source
  539. async def check_plate_empty(
  540. printer_id: int,
  541. ip_address: str,
  542. access_code: str,
  543. model: str,
  544. plate_type: str | None = None,
  545. include_debug_image: bool = False,
  546. external_camera_url: str | None = None,
  547. external_camera_type: str | None = None,
  548. use_external: bool = False,
  549. roi: tuple[float, float, float, float] | None = None,
  550. ) -> PlateDetectionResult:
  551. """Check if the build plate is empty for a printer.
  552. Args:
  553. printer_id: Printer database ID
  554. ip_address: Printer IP address
  555. access_code: Printer access code
  556. model: Printer model string
  557. plate_type: Type of build plate for calibration lookup
  558. include_debug_image: If True, include annotated image in result
  559. external_camera_url: URL of external camera (if configured)
  560. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  561. use_external: If True, prefer external camera over built-in
  562. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  563. Returns:
  564. PlateDetectionResult with analysis results
  565. """
  566. if not OPENCV_AVAILABLE:
  567. return PlateDetectionResult(
  568. is_empty=True,
  569. confidence=0.0,
  570. difference_percent=0.0,
  571. message="OpenCV not available - plate detection disabled",
  572. )
  573. image_data, camera_source = await capture_camera_image(
  574. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  575. )
  576. if image_data is None:
  577. return PlateDetectionResult(
  578. is_empty=True, # Default to empty on error
  579. confidence=0.0,
  580. difference_percent=0.0,
  581. message="Failed to capture camera frame from any source",
  582. )
  583. # Analyze the captured frame
  584. detector = PlateDetector(roi=roi)
  585. result = detector.analyze_frame(image_data, printer_id, plate_type, include_debug_image)
  586. # Add camera source to message
  587. result.message = f"[{camera_source}] {result.message}"
  588. return result
  589. async def calibrate_plate(
  590. printer_id: int,
  591. ip_address: str,
  592. access_code: str,
  593. model: str,
  594. label: str | None = None,
  595. external_camera_url: str | None = None,
  596. external_camera_type: str | None = None,
  597. use_external: bool = False,
  598. ) -> tuple[bool, str, int]:
  599. """Calibrate plate detection by capturing a reference image of the empty plate.
  600. Args:
  601. printer_id: Printer database ID
  602. ip_address: Printer IP address
  603. access_code: Printer access code
  604. model: Printer model string
  605. label: Optional label for this reference (e.g., "High Temp Plate")
  606. external_camera_url: URL of external camera (if configured)
  607. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  608. use_external: If True, prefer external camera over built-in
  609. Returns:
  610. Tuple of (success, message, index)
  611. """
  612. if not OPENCV_AVAILABLE:
  613. return False, "OpenCV not available - plate detection disabled", -1
  614. image_data, camera_source = await capture_camera_image(
  615. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  616. )
  617. if image_data is None:
  618. return False, "Failed to capture camera frame for calibration", -1
  619. detector = PlateDetector()
  620. success, message, index = detector.calibrate(image_data, printer_id, label)
  621. if success:
  622. message = f"[{camera_source}] {message}"
  623. return success, message, index
  624. def get_calibration_status(printer_id: int, plate_type: str | None = None) -> dict:
  625. """Get calibration status for a printer.
  626. Returns:
  627. Dict with calibration info including reference count
  628. """
  629. if not OPENCV_AVAILABLE:
  630. return {
  631. "available": False,
  632. "calibrated": False,
  633. "reference_count": 0,
  634. "max_references": 5,
  635. "message": "OpenCV not available",
  636. }
  637. detector = PlateDetector()
  638. calibrated = detector.has_calibration(printer_id)
  639. ref_count = detector.get_calibration_count(printer_id)
  640. if calibrated:
  641. message = f"Calibrated with {ref_count}/{detector.MAX_REFERENCES} reference(s)"
  642. else:
  643. message = "Not calibrated - please calibrate with empty plate"
  644. return {
  645. "available": True,
  646. "calibrated": calibrated,
  647. "reference_count": ref_count,
  648. "max_references": detector.MAX_REFERENCES,
  649. "message": message,
  650. }
  651. def delete_calibration(printer_id: int, plate_type: str | None = None) -> bool:
  652. """Delete calibration for a printer and plate type."""
  653. if not OPENCV_AVAILABLE:
  654. return False
  655. detector = PlateDetector()
  656. return detector.delete_calibration(printer_id, plate_type)
  657. def is_plate_detection_available() -> bool:
  658. """Check if plate detection feature is available (OpenCV installed)."""
  659. return OPENCV_AVAILABLE