scale_reader.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. """Scale reader wrapper with stability detection and calibration."""
  2. import logging
  3. import time
  4. from collections import deque
  5. logger = logging.getLogger(__name__)
  6. MOVING_AVG_SIZE = 20
  7. class ScaleReader:
  8. def __init__(self, tare_offset: int = 0, calibration_factor: float = 1.0):
  9. self._scale = None
  10. self._tare_offset = tare_offset
  11. self._calibration_factor = calibration_factor
  12. self._samples: deque[float] = deque(maxlen=MOVING_AVG_SIZE)
  13. self._stability_history: deque[tuple[float, float]] = deque(maxlen=20)
  14. self._ok = False
  15. self._last_raw = 0
  16. try:
  17. from scale_diag import NAU7802
  18. self._scale = NAU7802()
  19. self._scale.init()
  20. self._ok = True
  21. logger.info("Scale initialized (tare=%d, cal=%.6f)", tare_offset, calibration_factor)
  22. except Exception as e:
  23. logger.info("Scale not available: %s", e)
  24. @property
  25. def ok(self) -> bool:
  26. return self._ok
  27. @property
  28. def last_raw(self) -> int:
  29. return self._last_raw
  30. def close(self):
  31. try:
  32. if self._scale:
  33. self._scale.close()
  34. except Exception:
  35. pass
  36. def update_calibration(self, tare_offset: int, calibration_factor: float):
  37. self._tare_offset = tare_offset
  38. self._calibration_factor = calibration_factor
  39. logger.info("Calibration updated: tare=%d, factor=%.6f", tare_offset, calibration_factor)
  40. def tare(self):
  41. """Set current raw reading as tare offset."""
  42. if self._last_raw:
  43. self._tare_offset = self._last_raw
  44. self._samples.clear()
  45. self._stability_history.clear()
  46. logger.info("Tared at raw=%d", self._tare_offset)
  47. return self._tare_offset
  48. def read(self) -> tuple[float, bool, int] | None:
  49. """Read current weight. Returns (grams, stable, raw_adc) or None."""
  50. try:
  51. if not self._scale.data_ready():
  52. return None
  53. raw = self._scale.read_raw()
  54. self._last_raw = raw
  55. self._ok = True
  56. grams = (raw - self._tare_offset) * self._calibration_factor
  57. self._samples.append(grams)
  58. # Moving average
  59. avg_grams = sum(self._samples) / len(self._samples)
  60. # Stability: track readings over time
  61. now = time.monotonic()
  62. self._stability_history.append((now, avg_grams))
  63. # Stable if all readings within 1s window are within 2g of each other
  64. stable = False
  65. if len(self._stability_history) >= 5:
  66. cutoff = now - 1.0
  67. recent = [g for t, g in self._stability_history if t >= cutoff]
  68. if len(recent) >= 3:
  69. spread = max(recent) - min(recent)
  70. stable = spread < 2.0
  71. return round(avg_grams, 1), stable, raw
  72. except Exception as e:
  73. logger.debug("Scale read error: %s", e)
  74. self._ok = False
  75. return None