user_totp.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from __future__ import annotations
  2. import json
  3. from datetime import datetime
  4. from fastapi import HTTPException, status
  5. from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text, func
  6. from sqlalchemy.orm import Mapped, mapped_column
  7. from backend.app.core.database import Base
  8. from backend.app.core.encryption import mfa_decrypt, mfa_encrypt
  9. class UserTOTP(Base):
  10. """TOTP (Time-based One-Time Password) secret for a user.
  11. Stores the TOTP secret used by authenticator apps (Google Authenticator,
  12. Proton Authenticator, Aegis, etc.). One record per user; is_enabled=False
  13. while the setup is pending confirmation.
  14. """
  15. __tablename__ = "user_totp"
  16. id: Mapped[int] = mapped_column(primary_key=True)
  17. user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id", ondelete="CASCADE"), unique=True, index=True)
  18. # TOTP secret — encrypted at rest when MFA_ENCRYPTION_KEY is set.
  19. # Use .secret / .set_secret() rather than accessing _secret_enc directly.
  20. _secret_enc: Mapped[str] = mapped_column("secret", String(512))
  21. is_enabled: Mapped[bool] = mapped_column(Boolean, default=False)
  22. # Hashed backup codes stored as JSON array of strings
  23. # Each entry is a hashed one-time-use recovery code
  24. backup_codes_json: Mapped[str | None] = mapped_column(Text, nullable=True, default=None)
  25. # TOTP replay protection: stores the 30-second time-step counter of the last
  26. # accepted code so the same code cannot be used twice within one window.
  27. last_totp_counter: Mapped[int | None] = mapped_column(BigInteger, nullable=True, default=None)
  28. created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
  29. updated_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
  30. @property
  31. def secret(self) -> str:
  32. """Return the decrypted TOTP secret."""
  33. return mfa_decrypt(self._secret_enc)
  34. @secret.setter
  35. def secret(self, value: str) -> None:
  36. """Store the TOTP secret, encrypting it when MFA_ENCRYPTION_KEY is set."""
  37. self._secret_enc = mfa_encrypt(value)
  38. @property
  39. def backup_code_hashes(self) -> list[str]:
  40. """T5: Get stored backup-code hashes as a list.
  41. The name makes clear that these are *hashes*, not plaintext codes,
  42. so callers know they must verify with a password-hashing library
  43. rather than compare directly.
  44. """
  45. if not self.backup_codes_json:
  46. return []
  47. return json.loads(self.backup_codes_json)
  48. @backup_code_hashes.setter
  49. def backup_code_hashes(self, hashes: list[str]) -> None:
  50. """Persist backup-code hashes as a JSON array."""
  51. self.backup_codes_json = json.dumps(hashes)
  52. def accept_counter(self, new_counter: int) -> None:
  53. """T4: Record an accepted TOTP time-step counter, rejecting backward movement.
  54. Raises ``HTTPException(400)`` if ``new_counter`` is not strictly greater
  55. than ``last_totp_counter``, preventing counter roll-back attacks (e.g. an
  56. attacker who replays a previously accepted code after the counter wraps or
  57. the clock is skewed backward).
  58. The caller is responsible for flushing/committing the change to the DB.
  59. """
  60. if self.last_totp_counter is not None and new_counter <= self.last_totp_counter:
  61. raise HTTPException(
  62. status_code=status.HTTP_400_BAD_REQUEST,
  63. detail="TOTP code already used",
  64. )
  65. self.last_totp_counter = new_counter
  66. def __repr__(self) -> str:
  67. return f"<UserTOTP user_id={self.user_id} enabled={self.is_enabled}>"