scale_reader.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. """Scale reader wrapper with stability detection and calibration."""
  2. import logging
  3. import time
  4. from collections import deque
  5. logger = logging.getLogger(__name__)
  6. MOVING_AVG_SIZE = 20
  7. class ScaleReader:
  8. def __init__(self, tare_offset: int = 0, calibration_factor: float = 1.0):
  9. self._scale = None
  10. self._tare_offset = tare_offset
  11. self._calibration_factor = calibration_factor
  12. self._samples: deque[float] = deque(maxlen=MOVING_AVG_SIZE)
  13. self._stability_history: deque[tuple[float, float]] = deque(maxlen=20)
  14. self._ok = False
  15. self._last_raw = 0
  16. try:
  17. from .nau7802 import NAU7802
  18. self._scale = NAU7802()
  19. self._scale.init()
  20. self._ok = True
  21. bus_num = getattr(self._scale, "_bus_num", "?")
  22. logger.info(
  23. "Scale initialized on I2C bus %s (tare=%d, cal=%.6f)",
  24. bus_num,
  25. tare_offset,
  26. calibration_factor,
  27. )
  28. except Exception as e:
  29. logger.info("Scale not available: %s", e)
  30. @property
  31. def ok(self) -> bool:
  32. return self._ok
  33. @property
  34. def last_raw(self) -> int:
  35. return self._last_raw
  36. def close(self):
  37. try:
  38. if self._scale:
  39. self._scale.close()
  40. except Exception:
  41. pass
  42. def update_calibration(self, tare_offset: int, calibration_factor: float):
  43. self._tare_offset = tare_offset
  44. self._calibration_factor = calibration_factor
  45. logger.info("Calibration updated: tare=%d, factor=%.6f", tare_offset, calibration_factor)
  46. def tare(self):
  47. """Set current raw reading as tare offset."""
  48. if self._last_raw:
  49. self._tare_offset = self._last_raw
  50. self._samples.clear()
  51. self._stability_history.clear()
  52. logger.info("Tared at raw=%d", self._tare_offset)
  53. return self._tare_offset
  54. def read(self) -> tuple[float, bool, int] | None:
  55. """Read current weight. Returns (grams, stable, raw_adc) or None."""
  56. try:
  57. if not self._scale.data_ready():
  58. return None
  59. raw = self._scale.read_raw()
  60. self._last_raw = raw
  61. self._ok = True
  62. grams = (raw - self._tare_offset) * self._calibration_factor
  63. self._samples.append(grams)
  64. # Moving average
  65. avg_grams = sum(self._samples) / len(self._samples)
  66. # Stability: track readings over time
  67. now = time.monotonic()
  68. self._stability_history.append((now, avg_grams))
  69. # Stable if all readings within 1s window are within 2g of each other
  70. stable = False
  71. if len(self._stability_history) >= 5:
  72. cutoff = now - 1.0
  73. recent = [g for t, g in self._stability_history if t >= cutoff]
  74. if len(recent) >= 3:
  75. spread = max(recent) - min(recent)
  76. stable = spread < 2.0
  77. return round(avg_grams, 1), stable, raw
  78. except Exception as e:
  79. logger.debug("Scale read error: %s", e)
  80. self._ok = False
  81. return None