nfc_reader.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. """NFC reader wrapper with state machine for tag presence detection."""
  2. import logging
  3. import time
  4. from enum import Enum, auto
  5. logger = logging.getLogger(__name__)
  6. MISS_THRESHOLD = 3 # Consecutive misses before declaring tag removed
  7. ERROR_RECOVERY_THRESHOLD = 10 # Consecutive errors before attempting RF reset
  8. RF_CYCLE_INTERVAL = 60.0 # Seconds between preventive RF cycles (when idle)
  9. class NFCState(Enum):
  10. IDLE = auto()
  11. TAG_PRESENT = auto()
  12. class NFCReader:
  13. def __init__(self):
  14. self._nfc = None
  15. self._state = NFCState.IDLE
  16. self._current_uid: str | None = None
  17. self._current_sak: int | None = None
  18. self._miss_count = 0
  19. self._ok = False
  20. self._error_count = 0
  21. self._last_rf_cycle = 0.0
  22. self._poll_count = 0
  23. self._last_status_log = 0.0
  24. try:
  25. from read_tag import PN5180
  26. self._nfc = PN5180()
  27. self._init_rf()
  28. self._ok = True
  29. logger.info("NFC reader initialized")
  30. except Exception as e:
  31. logger.warning("NFC not available: %s", e)
  32. def _init_rf(self):
  33. """Full RF initialization sequence."""
  34. self._nfc.reset()
  35. self._nfc.load_rf_config(0x00, 0x80)
  36. time.sleep(0.010)
  37. self._nfc.rf_on()
  38. time.sleep(0.030)
  39. self._nfc.set_transceive_mode()
  40. self._last_rf_cycle = time.monotonic()
  41. def _rf_cycle(self):
  42. """RF off/on cycle to recover from stuck state."""
  43. try:
  44. self._nfc.rf_off()
  45. time.sleep(0.010)
  46. self._nfc.load_rf_config(0x00, 0x80)
  47. time.sleep(0.005)
  48. self._nfc.rf_on()
  49. time.sleep(0.020)
  50. self._nfc.set_transceive_mode()
  51. self._last_rf_cycle = time.monotonic()
  52. return True
  53. except Exception as e:
  54. logger.warning("NFC RF cycle failed: %s", e)
  55. return False
  56. def _full_reset(self):
  57. """Full hardware reset + RF init to recover from stuck state."""
  58. try:
  59. self._init_rf()
  60. self._error_count = 0
  61. logger.info("NFC reader recovered after full reset")
  62. return True
  63. except Exception as e:
  64. logger.warning("NFC full reset failed: %s", e)
  65. return False
  66. @property
  67. def ok(self) -> bool:
  68. return self._ok
  69. @property
  70. def state(self) -> NFCState:
  71. return self._state
  72. @property
  73. def current_uid(self) -> str | None:
  74. return self._current_uid
  75. def close(self):
  76. try:
  77. self._nfc.rf_off()
  78. self._nfc.close()
  79. except Exception:
  80. pass
  81. def poll(self) -> tuple[str, dict | None]:
  82. """Poll for tag. Returns (event_type, event_data).
  83. event_type: "none", "tag_detected", "tag_removed"
  84. """
  85. self._poll_count += 1
  86. # Periodic status log (every 60s)
  87. now = time.monotonic()
  88. if now - self._last_status_log >= 60.0:
  89. logger.info(
  90. "NFC status: state=%s, polls=%d, errors=%d, ok=%s",
  91. self._state.name,
  92. self._poll_count,
  93. self._error_count,
  94. self._ok,
  95. )
  96. self._last_status_log = now
  97. # Preventive full hardware reset when idle (prevents reader drift into
  98. # stuck state where activate_type_a() silently returns None without errors)
  99. if self._state == NFCState.IDLE and now - self._last_rf_cycle >= RF_CYCLE_INTERVAL:
  100. try:
  101. self._init_rf()
  102. logger.debug("Preventive NFC hardware reset completed")
  103. except Exception as e:
  104. logger.warning("Preventive NFC reset failed: %s", e)
  105. try:
  106. result = self._nfc.activate_type_a()
  107. except Exception as e:
  108. self._error_count += 1
  109. self._ok = False
  110. if self._error_count == 1:
  111. logger.warning("NFC poll error: %s", e)
  112. elif self._error_count == ERROR_RECOVERY_THRESHOLD:
  113. logger.warning(
  114. "NFC reader stuck (%d consecutive errors), attempting recovery...",
  115. self._error_count,
  116. )
  117. if self._full_reset():
  118. return "none", None
  119. # Reset failed — will keep trying on next threshold
  120. self._error_count = 0
  121. elif self._error_count % ERROR_RECOVERY_THRESHOLD == 0:
  122. logger.warning("NFC recovery attempt #%d", self._error_count // ERROR_RECOVERY_THRESHOLD)
  123. self._full_reset()
  124. return "none", None
  125. # Successful poll — clear error streak
  126. if self._error_count > 0:
  127. logger.info("NFC reader recovered after %d errors", self._error_count)
  128. self._error_count = 0
  129. self._ok = True
  130. if result is not None:
  131. uid_bytes, sak = result
  132. uid_hex = uid_bytes.hex().upper()
  133. self._miss_count = 0
  134. if self._state == NFCState.IDLE:
  135. self._state = NFCState.TAG_PRESENT
  136. self._current_uid = uid_hex
  137. self._current_sak = sak
  138. # Try reading Bambu tag data
  139. tray_uuid = None
  140. tag_type = "mifare_classic" if sak in (0x08, 0x18) else "ntag" if sak == 0x00 else "unknown"
  141. if sak in (0x08, 0x18):
  142. blocks = self._nfc.read_bambu_tag(uid_bytes)
  143. if blocks:
  144. tray_uuid = _extract_tray_uuid(blocks)
  145. logger.info("Tag detected: %s (SAK=0x%02X, type=%s)", uid_hex, sak, tag_type)
  146. return "tag_detected", {
  147. "tag_uid": uid_hex,
  148. "sak": sak,
  149. "tag_type": tag_type,
  150. "tray_uuid": tray_uuid,
  151. }
  152. # Tag still present — no event
  153. return "none", None
  154. # No tag found
  155. if self._state == NFCState.TAG_PRESENT:
  156. self._miss_count += 1
  157. if self._miss_count >= MISS_THRESHOLD:
  158. old_uid = self._current_uid
  159. self._state = NFCState.IDLE
  160. self._current_uid = None
  161. self._current_sak = None
  162. self._miss_count = 0
  163. logger.info("Tag removed: %s", old_uid)
  164. return "tag_removed", {"tag_uid": old_uid}
  165. return "none", None
  166. def _extract_tray_uuid(blocks: dict[int, bytes]) -> str | None:
  167. """Extract tray_uuid from Bambu MIFARE Classic data blocks."""
  168. # Block 4-5 contain the 32-char tray UUID (first 16 bytes from block 4 + 5)
  169. if 4 in blocks and 5 in blocks:
  170. raw = blocks[4] + blocks[5]
  171. # UUID is stored as ASCII hex in the first 16 bytes of blocks 4-5
  172. uuid_bytes = raw[:16]
  173. try:
  174. uuid_str = uuid_bytes.hex().upper()
  175. if uuid_str and uuid_str != "0" * 32:
  176. return uuid_str
  177. except Exception:
  178. pass
  179. return None