| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- """Temporal smoothing for Obico ML detection scores.
- Ports Obico's failure-detection math:
- - per-frame `current_p` = sum of detection confidences
- - `ewm_mean` = exponentially weighted mean (alpha = 2 / (span + 1), span = 12)
- - `rolling_mean_short` = ~310 frames of recent activity (≈52 min at 10s/frame)
- - `rolling_mean_long` = ~7200 frames of long-term baseline noise
- - First `WARMUP_FRAMES` frames always report "safe" while the state settles
- - Final score = max(ewm_mean, rolling_mean_short - rolling_mean_long)
- - Thresholds: LOW < score < HIGH is "warning", >= HIGH is "failure"
- """
- import math
- from collections import deque
- from dataclasses import dataclass, field
- EWM_SPAN = 12
- EWM_ALPHA = 2.0 / (EWM_SPAN + 1)
- ROLLING_SHORT = 310
- ROLLING_LONG = 7200
- WARMUP_FRAMES = 30
- # Base thresholds; sensitivity multipliers adjust them
- BASE_LOW = 0.38
- BASE_HIGH = 0.78
- SENSITIVITY_MULT = {
- "low": 1.25, # harder to trigger — higher thresholds
- "medium": 1.0,
- "high": 0.75, # easier to trigger — lower thresholds
- }
- def thresholds(sensitivity: str) -> tuple[float, float]:
- mult = SENSITIVITY_MULT.get(sensitivity, 1.0)
- return BASE_LOW * mult, BASE_HIGH * mult
- @dataclass
- class PrintState:
- """Per-print smoothing state. Reset when a new print starts."""
- frame_count: int = 0
- ewm_mean: float = 0.0
- short_sum: float = 0.0
- long_sum: float = 0.0
- short_buf: deque = field(default_factory=lambda: deque(maxlen=ROLLING_SHORT))
- long_buf: deque = field(default_factory=lambda: deque(maxlen=ROLLING_LONG))
- def update(self, current_p: float) -> float:
- """Feed a new per-frame score and return the smoothed score.
- Returns 0.0 during warmup so early noise doesn't trigger actions.
- """
- self.frame_count += 1
- if self.frame_count == 1:
- self.ewm_mean = current_p
- else:
- self.ewm_mean = EWM_ALPHA * current_p + (1 - EWM_ALPHA) * self.ewm_mean
- if len(self.short_buf) == self.short_buf.maxlen:
- self.short_sum -= self.short_buf[0]
- self.short_buf.append(current_p)
- self.short_sum += current_p
- if len(self.long_buf) == self.long_buf.maxlen:
- self.long_sum -= self.long_buf[0]
- self.long_buf.append(current_p)
- self.long_sum += current_p
- if self.frame_count <= WARMUP_FRAMES:
- return 0.0
- short_mean = self.short_sum / len(self.short_buf)
- long_mean = self.long_sum / len(self.long_buf)
- return max(self.ewm_mean, short_mean - long_mean)
- def classify(score: float, sensitivity: str) -> str:
- """Return 'safe', 'warning', or 'failure' for a smoothed score."""
- low, high = thresholds(sensitivity)
- if score >= high:
- return "failure"
- if score >= low:
- return "warning"
- return "safe"
- def score_from_detections(detections: list) -> float:
- """Sum confidences from the ML API `detections` array.
- Each detection is `[label, confidence, [x, y, w, h]]`. We only care about
- the confidence column — label is always "failure" for the single-class model.
- """
- total = 0.0
- for det in detections or []:
- try:
- value = float(det[1])
- except (IndexError, TypeError, ValueError):
- continue
- if math.isnan(value) or math.isinf(value):
- continue
- total += value
- return total
|