nfc_reader.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. """NFC reader wrapper with state machine for tag presence detection."""
  2. import logging
  3. import time
  4. from enum import Enum, auto
  5. logger = logging.getLogger(__name__)
  6. MISS_THRESHOLD = 3 # Consecutive misses before declaring tag removed
  7. ERROR_RECOVERY_THRESHOLD = 10 # Consecutive errors before attempting RF reset
  8. class NFCState(Enum):
  9. IDLE = auto()
  10. TAG_PRESENT = auto()
  11. class NFCReader:
  12. def __init__(self):
  13. self._nfc = None
  14. self._state = NFCState.IDLE
  15. self._current_uid: str | None = None
  16. self._current_sak: int | None = None
  17. self._miss_count = 0
  18. self._ok = False
  19. self._error_count = 0
  20. self._poll_count = 0
  21. self._last_status_log = 0.0
  22. try:
  23. from .pn5180 import PN5180
  24. self._nfc = PN5180()
  25. self._init_rf()
  26. self._ok = True
  27. logger.info("NFC reader initialized")
  28. except Exception as e:
  29. logger.warning("NFC not available: %s", e)
  30. def _init_rf(self):
  31. """Full RF initialization sequence."""
  32. self._nfc.reset()
  33. self._nfc.load_rf_config(0x00, 0x80)
  34. time.sleep(0.010)
  35. self._nfc.rf_on()
  36. time.sleep(0.030)
  37. self._nfc.set_transceive_mode()
  38. def _full_reset(self):
  39. """Full hardware reset + RF init to recover from stuck state."""
  40. try:
  41. self._init_rf()
  42. self._error_count = 0
  43. logger.info("NFC reader recovered after full reset")
  44. return True
  45. except Exception as e:
  46. logger.warning("NFC full reset failed: %s", e)
  47. return False
  48. @property
  49. def reader_type(self) -> str:
  50. """Return NFC reader hardware type."""
  51. return "PN5180" if self._nfc is not None else "Unknown"
  52. @property
  53. def connection(self) -> str:
  54. """Return NFC reader connection type."""
  55. return "SPI" if self._nfc is not None else "None"
  56. @property
  57. def ok(self) -> bool:
  58. return self._ok
  59. @property
  60. def state(self) -> NFCState:
  61. return self._state
  62. @property
  63. def current_uid(self) -> str | None:
  64. return self._current_uid
  65. def close(self):
  66. try:
  67. self._nfc.rf_off()
  68. self._nfc.close()
  69. except Exception:
  70. pass
  71. @property
  72. def current_sak(self) -> int | None:
  73. return self._current_sak
  74. def write_ntag(self, data: bytes) -> tuple[bool, str]:
  75. """Write raw NDEF bytes to currently present NTAG tag.
  76. Requires tag in TAG_PRESENT state with SAK=0x00.
  77. Returns (success, message).
  78. """
  79. if self._state != NFCState.TAG_PRESENT:
  80. return False, "No tag present"
  81. if self._current_sak != 0x00:
  82. return False, f"Not an NTAG (SAK=0x{self._current_sak:02X})"
  83. if not self._nfc:
  84. return False, "NFC reader not available"
  85. try:
  86. # Reactivate card before writing
  87. result = self._nfc.reactivate_card()
  88. if result is None:
  89. return False, "Failed to reactivate card for write"
  90. uid_bytes, sak = result
  91. if uid_bytes.hex().upper() != self._current_uid:
  92. return False, "Tag UID changed during write"
  93. # Write starting at page 4
  94. success = self._nfc.ntag_write_pages(start_page=4, data=data)
  95. if success:
  96. logger.info("NTAG write successful: %d bytes to tag %s", len(data), self._current_uid)
  97. return True, "Write successful"
  98. else:
  99. return False, "Write or verification failed"
  100. except Exception as e:
  101. logger.error("NTAG write error: %s", e)
  102. return False, f"Write error: {e}"
  103. def poll(self) -> tuple[str, dict | None]:
  104. """Poll for tag. Returns (event_type, event_data).
  105. event_type: "none", "tag_detected", "tag_removed"
  106. """
  107. self._poll_count += 1
  108. # Periodic status log (every 60s)
  109. now = time.monotonic()
  110. if now - self._last_status_log >= 60.0:
  111. logger.info(
  112. "NFC status: state=%s, polls=%d, errors=%d, ok=%s",
  113. self._state.name,
  114. self._poll_count,
  115. self._error_count,
  116. self._ok,
  117. )
  118. self._last_status_log = now
  119. if self._state == NFCState.IDLE:
  120. # Full hardware reset before every idle poll. Each activate_type_a()
  121. # call that returns None corrupts the PN5180 state — subsequent calls
  122. # silently fail even when a tag is present. Only a full RST pin toggle
  123. # recovers the reader. ~240ms overhead per poll, giving ~1.8 Hz poll
  124. # rate which is fine for a spool tag reader.
  125. try:
  126. self._init_rf()
  127. except Exception as e:
  128. logger.warning("NFC pre-poll reset failed: %s", e)
  129. else:
  130. # Tag present: light RF cycle to reset card from ACTIVE back to IDLE
  131. # state after previous SELECT, so it responds to the next WUPA/REQA.
  132. try:
  133. self._nfc.rf_off()
  134. time.sleep(0.003)
  135. self._nfc.rf_on()
  136. time.sleep(0.010)
  137. except Exception:
  138. pass # Will be caught by activate_type_a() error handling below
  139. try:
  140. result = self._nfc.activate_type_a()
  141. except Exception as e:
  142. self._error_count += 1
  143. self._ok = False
  144. if self._error_count == 1:
  145. logger.warning("NFC poll error: %s", e)
  146. elif self._error_count == ERROR_RECOVERY_THRESHOLD:
  147. logger.warning(
  148. "NFC reader stuck (%d consecutive errors), attempting recovery...",
  149. self._error_count,
  150. )
  151. if self._full_reset():
  152. return "none", None
  153. # Reset failed — will keep trying on next threshold
  154. self._error_count = 0
  155. elif self._error_count % ERROR_RECOVERY_THRESHOLD == 0:
  156. logger.warning("NFC recovery attempt #%d", self._error_count // ERROR_RECOVERY_THRESHOLD)
  157. self._full_reset()
  158. return "none", None
  159. # Successful poll — clear error streak
  160. if self._error_count > 0:
  161. logger.info("NFC reader recovered after %d errors", self._error_count)
  162. self._error_count = 0
  163. self._ok = True
  164. if result is not None:
  165. uid_bytes, sak = result
  166. uid_hex = uid_bytes.hex().upper()
  167. self._miss_count = 0
  168. if self._state == NFCState.IDLE:
  169. self._state = NFCState.TAG_PRESENT
  170. self._current_uid = uid_hex
  171. self._current_sak = sak
  172. # Try reading Bambu tag data
  173. tray_uuid = None
  174. tag_type = "mifare_classic" if sak in (0x08, 0x18) else "ntag" if sak == 0x00 else "unknown"
  175. if sak in (0x08, 0x18):
  176. blocks = self._nfc.read_bambu_tag(uid_bytes)
  177. if blocks:
  178. tray_uuid = _extract_tray_uuid(blocks)
  179. logger.info("Tag detected: %s (SAK=0x%02X, type=%s)", uid_hex, sak, tag_type)
  180. return "tag_detected", {
  181. "tag_uid": uid_hex,
  182. "sak": sak,
  183. "tag_type": tag_type,
  184. "tray_uuid": tray_uuid,
  185. }
  186. # Tag still present — no event
  187. return "none", None
  188. # No tag found
  189. if self._state == NFCState.TAG_PRESENT:
  190. self._miss_count += 1
  191. if self._miss_count >= MISS_THRESHOLD:
  192. old_uid = self._current_uid
  193. self._state = NFCState.IDLE
  194. self._current_uid = None
  195. self._current_sak = None
  196. self._miss_count = 0
  197. logger.info("Tag removed: %s", old_uid)
  198. return "tag_removed", {"tag_uid": old_uid}
  199. return "none", None
  200. def _extract_tray_uuid(blocks: dict[int, bytes]) -> str | None:
  201. """Extract tray_uuid from Bambu MIFARE Classic data blocks."""
  202. # Block 4-5 contain the tray UUID as 32 ASCII hex chars across 32 bytes.
  203. if 4 in blocks and 5 in blocks:
  204. raw = blocks[4] + blocks[5]
  205. try:
  206. # Preferred path: decode full ASCII payload, keep only hex chars.
  207. ascii_candidate = raw.decode("ascii", errors="ignore")
  208. hex_chars = "".join(ch for ch in ascii_candidate if ch in "0123456789abcdefABCDEF")
  209. if len(hex_chars) >= 32:
  210. uuid_str = hex_chars[:32].upper()
  211. if uuid_str != "0" * 32:
  212. return uuid_str
  213. except Exception:
  214. pass
  215. try:
  216. # Fallback for partially decoded payloads: use first 16 raw bytes as hex.
  217. # This preserves compatibility with older decoding behavior.
  218. uuid_str = raw[:16].hex().upper()
  219. if uuid_str and uuid_str != "0" * 32:
  220. return uuid_str
  221. except Exception:
  222. pass
  223. return None