plate_detection.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. """Build plate empty detection using OpenCV.
  2. Analyzes camera frames to detect if there are objects on the build plate.
  3. Uses calibration-based difference detection - compares current frame to
  4. a reference image of the empty plate.
  5. """
  6. from __future__ import annotations
  7. import logging
  8. import os
  9. from pathlib import Path
  10. logger = logging.getLogger(__name__)
  11. # Optional OpenCV import - feature disabled if not available
  12. try:
  13. import cv2
  14. import numpy as np
  15. OPENCV_AVAILABLE = True
  16. except ImportError:
  17. OPENCV_AVAILABLE = False
  18. logger.info("OpenCV not available - plate detection feature disabled")
  19. def _get_calibration_dir() -> Path:
  20. """Get the calibration directory from settings (ensures persistence in Docker)."""
  21. from backend.app.core.config import settings
  22. return settings.plate_calibration_dir
  23. class PlateDetectionResult:
  24. """Result of plate detection analysis."""
  25. def __init__(
  26. self,
  27. is_empty: bool,
  28. confidence: float,
  29. difference_percent: float,
  30. message: str,
  31. debug_image: bytes | None = None,
  32. needs_calibration: bool = False,
  33. ):
  34. self.is_empty = is_empty
  35. self.confidence = confidence # 0.0 to 1.0
  36. self.difference_percent = difference_percent # How different from reference
  37. self.message = message
  38. self.debug_image = debug_image # Optional annotated image for debugging
  39. self.needs_calibration = needs_calibration # True if no reference image exists
  40. def to_dict(self) -> dict:
  41. return {
  42. "is_empty": bool(self.is_empty),
  43. "confidence": float(round(self.confidence, 2)),
  44. "difference_percent": float(round(self.difference_percent, 2)),
  45. "message": self.message,
  46. "has_debug_image": self.debug_image is not None,
  47. "needs_calibration": bool(self.needs_calibration),
  48. }
  49. class PlateDetector:
  50. """Detects if the build plate is empty using calibration-based difference detection."""
  51. # Default region of interest (ROI) as percentage of image dimensions
  52. # These define where the build plate typically appears in the camera view
  53. # Format: (x_start%, y_start%, width%, height%)
  54. DEFAULT_ROI = (0.15, 0.35, 0.70, 0.55) # Center-lower portion of frame
  55. # Detection thresholds for difference detection
  56. # Using mean pixel difference (0-100% scale)
  57. # Small objects may only cause 1-2% mean difference
  58. DEFAULT_DIFFERENCE_THRESHOLD = 1.0
  59. DEFAULT_BLUR_SIZE = 21 # Gaussian blur kernel size (must be odd) - unused with edge detection
  60. def __init__(
  61. self,
  62. roi: tuple[float, float, float, float] | None = None,
  63. difference_threshold: float = DEFAULT_DIFFERENCE_THRESHOLD,
  64. blur_size: int = DEFAULT_BLUR_SIZE,
  65. ):
  66. """Initialize the plate detector.
  67. Args:
  68. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  69. difference_threshold: Percentage of pixels that must differ to trigger "not empty"
  70. blur_size: Gaussian blur kernel size for noise reduction
  71. """
  72. if not OPENCV_AVAILABLE:
  73. raise RuntimeError("OpenCV is not installed. Install with: pip install opencv-python-headless")
  74. self.roi = roi or self.DEFAULT_ROI
  75. self.difference_threshold = difference_threshold
  76. self.blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1 # Must be odd
  77. # Maximum number of reference images to store per printer
  78. MAX_REFERENCES = 5
  79. def _get_metadata_path(self, printer_id: int) -> Path:
  80. """Get the path to the metadata JSON file for a printer."""
  81. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  82. return _get_calibration_dir() / f"printer_{printer_id}_metadata.json"
  83. def _load_metadata(self, printer_id: int) -> dict:
  84. """Load metadata for a printer's references."""
  85. import json
  86. meta_path = self._get_metadata_path(printer_id)
  87. if meta_path.exists():
  88. try:
  89. with open(meta_path) as f:
  90. return json.load(f)
  91. except (json.JSONDecodeError, OSError, KeyError, ValueError):
  92. pass
  93. return {"references": {}}
  94. def _save_metadata(self, printer_id: int, metadata: dict) -> None:
  95. """Save metadata for a printer's references."""
  96. import json
  97. meta_path = self._get_metadata_path(printer_id)
  98. with open(meta_path, "w") as f:
  99. json.dump(metadata, f, indent=2)
  100. def _get_reference_paths(self, printer_id: int) -> list[Path]:
  101. """Get all existing reference image paths for a printer."""
  102. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  103. paths = []
  104. for i in range(self.MAX_REFERENCES):
  105. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  106. if path.exists():
  107. paths.append(path)
  108. return paths
  109. def _get_next_reference_slot(self, printer_id: int) -> Path:
  110. """Get the path for the next reference image slot (cycles through slots)."""
  111. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  112. # Find first empty slot, or use oldest (slot 0) and shift others
  113. for i in range(self.MAX_REFERENCES):
  114. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  115. if not path.exists():
  116. return path
  117. # All slots full - return slot 0 (will be overwritten, but we rotate first)
  118. return _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  119. def _rotate_references(self, printer_id: int) -> None:
  120. """Rotate references: delete oldest (0), shift others down."""
  121. # Delete slot 0
  122. slot0 = _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  123. if slot0.exists():
  124. logger.info("Rotating references: removing oldest %s", slot0)
  125. slot0.unlink()
  126. # Shift others down
  127. for i in range(1, self.MAX_REFERENCES):
  128. old_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  129. new_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  130. if old_path.exists():
  131. old_path.rename(new_path)
  132. # Also rotate metadata
  133. metadata = self._load_metadata(printer_id)
  134. refs = metadata.get("references", {})
  135. new_refs = {}
  136. for i in range(1, self.MAX_REFERENCES):
  137. if str(i) in refs:
  138. new_refs[str(i - 1)] = refs[str(i)]
  139. metadata["references"] = new_refs
  140. self._save_metadata(printer_id, metadata)
  141. def get_references(self, printer_id: int) -> list[dict]:
  142. """Get all references with metadata for a printer.
  143. Returns list of dicts with: index, label, timestamp, has_image
  144. """
  145. metadata = self._load_metadata(printer_id)
  146. refs = metadata.get("references", {})
  147. result = []
  148. for i in range(self.MAX_REFERENCES):
  149. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  150. if path.exists():
  151. ref_meta = refs.get(str(i), {})
  152. result.append(
  153. {
  154. "index": i,
  155. "label": ref_meta.get("label", ""),
  156. "timestamp": ref_meta.get("timestamp", ""),
  157. "has_image": True,
  158. }
  159. )
  160. return result
  161. def update_reference_label(self, printer_id: int, index: int, label: str) -> bool:
  162. """Update the label for a reference."""
  163. if index < 0 or index >= self.MAX_REFERENCES:
  164. return False
  165. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  166. if not path.exists():
  167. return False
  168. metadata = self._load_metadata(printer_id)
  169. if "references" not in metadata:
  170. metadata["references"] = {}
  171. if str(index) not in metadata["references"]:
  172. metadata["references"][str(index)] = {}
  173. metadata["references"][str(index)]["label"] = label
  174. self._save_metadata(printer_id, metadata)
  175. return True
  176. def delete_reference(self, printer_id: int, index: int) -> bool:
  177. """Delete a specific reference by index."""
  178. if index < 0 or index >= self.MAX_REFERENCES:
  179. return False
  180. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  181. if not path.exists():
  182. return False
  183. # Delete image
  184. logger.info("Deleting reference %s for printer %s: %s", index, printer_id, path)
  185. path.unlink()
  186. # Remove from metadata
  187. metadata = self._load_metadata(printer_id)
  188. refs = metadata.get("references", {})
  189. if str(index) in refs:
  190. del refs[str(index)]
  191. metadata["references"] = refs
  192. self._save_metadata(printer_id, metadata)
  193. # Shift remaining references down to fill the gap
  194. for i in range(index + 1, self.MAX_REFERENCES):
  195. old_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  196. new_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  197. if old_img.exists():
  198. old_img.rename(new_img)
  199. # Also shift metadata
  200. if str(i) in refs:
  201. refs[str(i - 1)] = refs[str(i)]
  202. del refs[str(i)]
  203. metadata["references"] = refs
  204. self._save_metadata(printer_id, metadata)
  205. return True
  206. def get_reference_thumbnail(self, printer_id: int, index: int, max_size: int = 150) -> bytes | None:
  207. """Get a thumbnail of a reference image.
  208. Returns JPEG bytes or None if not found.
  209. """
  210. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  211. if not path.exists():
  212. return None
  213. try:
  214. img = cv2.imread(str(path))
  215. if img is None:
  216. return None
  217. # Calculate thumbnail size maintaining aspect ratio
  218. h, w = img.shape[:2]
  219. if w > h:
  220. new_w = max_size
  221. new_h = int(h * max_size / w)
  222. else:
  223. new_h = max_size
  224. new_w = int(w * max_size / h)
  225. thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
  226. _, buffer = cv2.imencode(".jpg", thumb, [cv2.IMWRITE_JPEG_QUALITY, 80])
  227. return buffer.tobytes()
  228. except Exception as e:
  229. logger.error("Error creating thumbnail: %s", e)
  230. return None
  231. def _extract_roi(self, frame: np.ndarray) -> tuple[np.ndarray, int, int, int, int]:
  232. """Extract the region of interest from a frame.
  233. Returns:
  234. Tuple of (roi_frame, x_start, y_start, roi_width, roi_height)
  235. """
  236. height, width = frame.shape[:2]
  237. x_start = int(width * self.roi[0])
  238. y_start = int(height * self.roi[1])
  239. roi_width = int(width * self.roi[2])
  240. roi_height = int(height * self.roi[3])
  241. roi_frame = frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  242. return roi_frame, x_start, y_start, roi_width, roi_height
  243. def _preprocess_for_comparison(self, frame: np.ndarray) -> np.ndarray:
  244. """Preprocess a frame for comparison.
  245. Uses heavy blur to create "blob" representation - smooths out texture
  246. and noise while preserving large objects. Then normalizes brightness
  247. to reduce lighting sensitivity.
  248. """
  249. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  250. # Very heavy blur to smooth texture, keep only large shapes
  251. blurred = cv2.GaussianBlur(gray, (51, 51), 0)
  252. # Normalize to 0-255 range to reduce brightness sensitivity
  253. normalized = cv2.normalize(blurred, None, 0, 255, cv2.NORM_MINMAX)
  254. return normalized
  255. def calibrate(self, image_data: bytes, printer_id: int, label: str | None = None) -> tuple[bool, str, int]:
  256. """Calibrate by saving a reference image of the empty plate.
  257. Stores up to MAX_REFERENCES (5) images per printer. When all slots are full,
  258. the oldest reference is removed and others are shifted.
  259. Args:
  260. image_data: JPEG image data as bytes
  261. printer_id: Printer database ID
  262. label: Optional label for this reference (e.g., "High Temp Plate")
  263. Returns:
  264. Tuple of (success, message, index) where index is the slot used
  265. """
  266. from datetime import datetime
  267. try:
  268. # Decode image
  269. nparr = np.frombuffer(image_data, np.uint8)
  270. frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  271. if frame is None:
  272. return False, "Failed to decode image", -1
  273. # Get existing references count
  274. existing_refs = self._get_reference_paths(printer_id)
  275. num_existing = len(existing_refs)
  276. # If all slots are full, rotate (remove oldest)
  277. if num_existing >= self.MAX_REFERENCES:
  278. self._rotate_references(printer_id)
  279. num_existing = self.MAX_REFERENCES - 1
  280. # Save to next available slot
  281. slot_index = num_existing
  282. reference_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{slot_index}.jpg"
  283. write_success = cv2.imwrite(str(reference_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
  284. if not write_success:
  285. logger.error("cv2.imwrite failed for %s", reference_path)
  286. return False, "Failed to save reference image", -1
  287. # Verify the file actually exists and has content
  288. if not reference_path.exists():
  289. logger.error("Reference image not found after save: %s", reference_path)
  290. return False, "Reference image not found after save", -1
  291. file_size = reference_path.stat().st_size
  292. if file_size < 1000: # JPEG should be at least 1KB
  293. logger.error("Reference image too small (%s bytes): %s", file_size, reference_path)
  294. reference_path.unlink() # Clean up invalid file
  295. return False, f"Reference image corrupted (only {file_size} bytes)", -1
  296. logger.info("Saved reference image: %s (%s bytes)", reference_path, file_size)
  297. # Save metadata
  298. metadata = self._load_metadata(printer_id)
  299. if "references" not in metadata:
  300. metadata["references"] = {}
  301. metadata["references"][str(slot_index)] = {
  302. "label": label or "",
  303. "timestamp": datetime.now().isoformat(),
  304. }
  305. self._save_metadata(printer_id, metadata)
  306. logger.info(
  307. f"Saved plate calibration reference {slot_index + 1}/{self.MAX_REFERENCES} for printer {printer_id}"
  308. )
  309. return True, f"Calibration saved ({slot_index + 1}/{self.MAX_REFERENCES} references)", slot_index
  310. except Exception as e:
  311. logger.exception("Error during plate calibration")
  312. # Don't expose exception details to user - log has full info
  313. error_type = type(e).__name__
  314. return False, f"Calibration error: {error_type}", -1
  315. def get_calibration_count(self, printer_id: int) -> int:
  316. """Get the number of calibration references for a printer."""
  317. return len(self._get_reference_paths(printer_id))
  318. def has_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  319. """Check if a printer has any calibration reference images."""
  320. return len(self._get_reference_paths(printer_id)) > 0
  321. def delete_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  322. """Delete all calibration reference images for a printer."""
  323. paths = self._get_reference_paths(printer_id)
  324. if not paths:
  325. return False
  326. for path in paths:
  327. path.unlink()
  328. logger.info("Deleted %s plate calibration reference(s) for printer %s", len(paths), printer_id)
  329. return True
  330. def analyze_frame(
  331. self, image_data: bytes, printer_id: int, plate_type: str | None = None, include_debug_image: bool = False
  332. ) -> PlateDetectionResult:
  333. """Analyze a camera frame to detect if the plate is empty.
  334. Compares the current frame to all calibration reference images and uses
  335. the best match (lowest difference) for the final result.
  336. Args:
  337. image_data: JPEG image data as bytes
  338. printer_id: Printer database ID (for reference lookup)
  339. plate_type: Unused - kept for API compatibility
  340. include_debug_image: If True, include annotated image in result
  341. Returns:
  342. PlateDetectionResult with analysis results
  343. """
  344. try:
  345. # Check for calibration
  346. reference_paths = self._get_reference_paths(printer_id)
  347. if not reference_paths:
  348. return PlateDetectionResult(
  349. is_empty=True, # Default to empty when not calibrated
  350. confidence=0.0,
  351. difference_percent=0.0,
  352. message="No calibration - please calibrate with empty plate first",
  353. needs_calibration=True,
  354. )
  355. # Decode current image
  356. nparr = np.frombuffer(image_data, np.uint8)
  357. current_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  358. if current_frame is None:
  359. return PlateDetectionResult(
  360. is_empty=True,
  361. confidence=0.0,
  362. difference_percent=0.0,
  363. message="Failed to decode current image",
  364. )
  365. # Extract ROI from current frame
  366. current_roi, x_start, y_start, roi_width, roi_height = self._extract_roi(current_frame)
  367. current_processed = self._preprocess_for_comparison(current_roi)
  368. # Compare against all references, find best match (lowest difference)
  369. best_difference_percent = float("inf")
  370. best_ref_idx = -1
  371. best_diff = None
  372. for idx, ref_path in enumerate(reference_paths):
  373. # Load reference image
  374. reference_frame = cv2.imread(str(ref_path), cv2.IMREAD_COLOR)
  375. if reference_frame is None:
  376. continue
  377. # Ensure same dimensions
  378. if current_frame.shape != reference_frame.shape:
  379. reference_frame = cv2.resize(reference_frame, (current_frame.shape[1], current_frame.shape[0]))
  380. # Extract ROI and preprocess
  381. reference_roi, _, _, _, _ = self._extract_roi(reference_frame)
  382. reference_processed = self._preprocess_for_comparison(reference_roi)
  383. # Calculate absolute difference
  384. diff = cv2.absdiff(current_processed, reference_processed)
  385. # Calculate mean difference as percentage
  386. mean_diff = np.mean(diff)
  387. difference_percent = (mean_diff / 255.0) * 100
  388. if difference_percent < best_difference_percent:
  389. best_difference_percent = difference_percent
  390. best_ref_idx = idx
  391. best_diff = diff
  392. if best_ref_idx == -1:
  393. return PlateDetectionResult(
  394. is_empty=True,
  395. confidence=0.0,
  396. difference_percent=0.0,
  397. message="Failed to load any reference images - please recalibrate",
  398. needs_calibration=True,
  399. )
  400. difference_percent = best_difference_percent
  401. # Determine if plate is empty (use best match)
  402. is_empty = difference_percent < self.difference_threshold
  403. # Calculate confidence
  404. if is_empty:
  405. # Higher confidence when very little difference
  406. confidence = 1.0 - min(1.0, difference_percent / self.difference_threshold)
  407. else:
  408. # Higher confidence when clearly different
  409. confidence = min(1.0, difference_percent / (self.difference_threshold * 2))
  410. # Generate message
  411. num_refs = len(reference_paths)
  412. if is_empty:
  413. message = (
  414. f"Plate appears empty (difference: {difference_percent:.1f}%, ref {best_ref_idx + 1}/{num_refs})"
  415. )
  416. else:
  417. message = f"Objects detected on plate (difference: {difference_percent:.1f}%, best ref {best_ref_idx + 1}/{num_refs})"
  418. # Generate debug image if requested
  419. debug_image = None
  420. if include_debug_image and best_diff is not None:
  421. debug_frame = current_frame.copy()
  422. # Draw ROI rectangle
  423. cv2.rectangle(
  424. debug_frame,
  425. (x_start, y_start),
  426. (x_start + roi_width, y_start + roi_height),
  427. (0, 255, 0),
  428. 2,
  429. )
  430. # Create colored difference overlay
  431. # Red = areas that are different from reference
  432. # Amplify diff for visibility (multiply by 3, cap at 255)
  433. diff_amplified = np.minimum(best_diff * 3, 255).astype(np.uint8)
  434. diff_colored = cv2.cvtColor(diff_amplified, cv2.COLOR_GRAY2BGR)
  435. diff_colored[:, :, 0] = 0 # Remove blue
  436. diff_colored[:, :, 1] = 0 # Remove green
  437. # Red channel has the diff
  438. # Overlay difference on ROI
  439. roi_overlay = debug_frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  440. cv2.addWeighted(diff_colored, 0.5, roi_overlay, 0.5, 0, roi_overlay)
  441. # Add status text
  442. status_text = "EMPTY" if is_empty else "OBJECTS DETECTED"
  443. color = (0, 255, 0) if is_empty else (0, 0, 255)
  444. cv2.putText(debug_frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
  445. cv2.putText(
  446. debug_frame,
  447. f"Diff: {difference_percent:.1f}% (ref {best_ref_idx + 1}/{num_refs})",
  448. (10, 60),
  449. cv2.FONT_HERSHEY_SIMPLEX,
  450. 0.7,
  451. color,
  452. 2,
  453. )
  454. cv2.putText(
  455. debug_frame,
  456. f"Confidence: {confidence:.0%}",
  457. (10, 90),
  458. cv2.FONT_HERSHEY_SIMPLEX,
  459. 0.7,
  460. color,
  461. 2,
  462. )
  463. # Encode debug image as JPEG
  464. _, buffer = cv2.imencode(".jpg", debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
  465. debug_image = buffer.tobytes()
  466. return PlateDetectionResult(
  467. is_empty=is_empty,
  468. confidence=confidence,
  469. difference_percent=difference_percent,
  470. message=message,
  471. debug_image=debug_image,
  472. )
  473. except Exception as e:
  474. logger.exception("Error analyzing frame for plate detection")
  475. return PlateDetectionResult(
  476. is_empty=True, # Default to empty on error (don't block prints)
  477. confidence=0.0,
  478. difference_percent=0.0,
  479. message=f"Analysis error: {e!s}",
  480. )
  481. async def capture_camera_image(
  482. printer_id: int,
  483. ip_address: str,
  484. access_code: str,
  485. model: str,
  486. external_camera_url: str | None = None,
  487. external_camera_type: str | None = None,
  488. use_external: bool = False,
  489. external_camera_snapshot_url: str | None = None,
  490. ) -> tuple[bytes | None, str]:
  491. """Capture an image from the printer camera.
  492. If there's an active camera stream, uses the buffered frame instead of
  493. creating a new connection (which would fail while stream is active).
  494. Returns:
  495. Tuple of (image_data, camera_source) or (None, error_message)
  496. """
  497. image_data: bytes | None = None
  498. camera_source = "unknown"
  499. # Try external camera first if requested and available
  500. if use_external and external_camera_url and external_camera_type:
  501. try:
  502. from backend.app.services.external_camera import capture_frame
  503. image_data = await capture_frame(
  504. external_camera_url,
  505. external_camera_type,
  506. snapshot_url=external_camera_snapshot_url,
  507. )
  508. if image_data:
  509. camera_source = "external"
  510. logger.debug("Captured frame from external camera for printer %s", printer_id)
  511. except Exception as e:
  512. logger.warning("Failed to capture from external camera: %s", e)
  513. # Fall back to built-in camera
  514. if image_data is None:
  515. # First, check if there's an active stream with a buffered frame
  516. # This avoids blocking when camera viewer is open
  517. try:
  518. from backend.app.api.routes.camera import get_buffered_frame
  519. buffered = get_buffered_frame(printer_id)
  520. if buffered:
  521. image_data = buffered
  522. camera_source = "built-in (buffered)"
  523. logger.debug("Using buffered frame from active stream for printer %s", printer_id)
  524. except Exception as e:
  525. logger.debug("Could not get buffered frame: %s", e)
  526. # If no buffered frame, try to capture a new one
  527. if image_data is None:
  528. import tempfile
  529. from backend.app.services.camera import capture_camera_frame
  530. fd, tmp_name = tempfile.mkstemp(suffix=".jpg")
  531. os.close(fd)
  532. tmp_path = Path(tmp_name)
  533. tmp_path.chmod(0o600)
  534. try:
  535. success = await capture_camera_frame(ip_address, access_code, model, tmp_path, timeout=10)
  536. if success:
  537. with open(tmp_path, "rb") as f:
  538. image_data = f.read()
  539. camera_source = "built-in"
  540. logger.debug("Captured frame from built-in camera for printer %s", printer_id)
  541. finally:
  542. try:
  543. tmp_path.unlink()
  544. except OSError:
  545. pass # Best-effort cleanup of temporary camera capture file
  546. return image_data, camera_source
  547. async def check_plate_empty(
  548. printer_id: int,
  549. ip_address: str,
  550. access_code: str,
  551. model: str,
  552. plate_type: str | None = None,
  553. include_debug_image: bool = False,
  554. external_camera_url: str | None = None,
  555. external_camera_type: str | None = None,
  556. use_external: bool = False,
  557. roi: tuple[float, float, float, float] | None = None,
  558. external_camera_snapshot_url: str | None = None,
  559. ) -> PlateDetectionResult:
  560. """Check if the build plate is empty for a printer.
  561. Args:
  562. printer_id: Printer database ID
  563. ip_address: Printer IP address
  564. access_code: Printer access code
  565. model: Printer model string
  566. plate_type: Type of build plate for calibration lookup
  567. include_debug_image: If True, include annotated image in result
  568. external_camera_url: URL of external camera (if configured)
  569. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  570. use_external: If True, prefer external camera over built-in
  571. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  572. Returns:
  573. PlateDetectionResult with analysis results
  574. """
  575. if not OPENCV_AVAILABLE:
  576. return PlateDetectionResult(
  577. is_empty=True,
  578. confidence=0.0,
  579. difference_percent=0.0,
  580. message="OpenCV not available - plate detection disabled",
  581. )
  582. image_data, camera_source = await capture_camera_image(
  583. printer_id,
  584. ip_address,
  585. access_code,
  586. model,
  587. external_camera_url,
  588. external_camera_type,
  589. use_external,
  590. external_camera_snapshot_url=external_camera_snapshot_url,
  591. )
  592. if image_data is None:
  593. return PlateDetectionResult(
  594. is_empty=True, # Default to empty on error
  595. confidence=0.0,
  596. difference_percent=0.0,
  597. message="Failed to capture camera frame from any source",
  598. )
  599. # Analyze the captured frame
  600. detector = PlateDetector(roi=roi)
  601. result = detector.analyze_frame(image_data, printer_id, plate_type, include_debug_image)
  602. # Add camera source to message
  603. result.message = f"[{camera_source}] {result.message}"
  604. return result
  605. async def calibrate_plate(
  606. printer_id: int,
  607. ip_address: str,
  608. access_code: str,
  609. model: str,
  610. label: str | None = None,
  611. external_camera_url: str | None = None,
  612. external_camera_type: str | None = None,
  613. use_external: bool = False,
  614. external_camera_snapshot_url: str | None = None,
  615. ) -> tuple[bool, str, int]:
  616. """Calibrate plate detection by capturing a reference image of the empty plate.
  617. Args:
  618. printer_id: Printer database ID
  619. ip_address: Printer IP address
  620. access_code: Printer access code
  621. model: Printer model string
  622. label: Optional label for this reference (e.g., "High Temp Plate")
  623. external_camera_url: URL of external camera (if configured)
  624. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  625. use_external: If True, prefer external camera over built-in
  626. Returns:
  627. Tuple of (success, message, index)
  628. """
  629. if not OPENCV_AVAILABLE:
  630. return False, "OpenCV not available - plate detection disabled", -1
  631. image_data, camera_source = await capture_camera_image(
  632. printer_id,
  633. ip_address,
  634. access_code,
  635. model,
  636. external_camera_url,
  637. external_camera_type,
  638. use_external,
  639. external_camera_snapshot_url=external_camera_snapshot_url,
  640. )
  641. if image_data is None:
  642. return False, "Failed to capture camera frame for calibration", -1
  643. detector = PlateDetector()
  644. success, message, index = detector.calibrate(image_data, printer_id, label)
  645. if success:
  646. message = f"[{camera_source}] {message}"
  647. return success, message, index
  648. def get_calibration_status(printer_id: int, plate_type: str | None = None) -> dict:
  649. """Get calibration status for a printer.
  650. Returns:
  651. Dict with calibration info including reference count
  652. """
  653. if not OPENCV_AVAILABLE:
  654. return {
  655. "available": False,
  656. "calibrated": False,
  657. "reference_count": 0,
  658. "max_references": 5,
  659. "message": "OpenCV not available",
  660. }
  661. detector = PlateDetector()
  662. calibrated = detector.has_calibration(printer_id)
  663. ref_count = detector.get_calibration_count(printer_id)
  664. if calibrated:
  665. message = f"Calibrated with {ref_count}/{detector.MAX_REFERENCES} reference(s)"
  666. else:
  667. message = "Not calibrated - please calibrate with empty plate"
  668. return {
  669. "available": True,
  670. "calibrated": calibrated,
  671. "reference_count": ref_count,
  672. "max_references": detector.MAX_REFERENCES,
  673. "message": message,
  674. }
  675. def delete_calibration(printer_id: int, plate_type: str | None = None) -> bool:
  676. """Delete calibration for a printer and plate type."""
  677. if not OPENCV_AVAILABLE:
  678. return False
  679. detector = PlateDetector()
  680. return detector.delete_calibration(printer_id, plate_type)
  681. def is_plate_detection_available() -> bool:
  682. """Check if plate detection feature is available (OpenCV installed)."""
  683. return OPENCV_AVAILABLE