test_long_lived_tokens.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. """Unit tests for the long-lived camera-token service (#1108).
  2. Drives the service directly against a real SQLAlchemy session so the
  3. hash/lookup/expiry/revoke logic is exercised end-to-end with no HTTP.
  4. """
  5. from __future__ import annotations
  6. from datetime import datetime, timedelta, timezone
  7. import pytest
  8. from backend.app.models.long_lived_token import LongLivedToken
  9. from backend.app.models.user import User
  10. from backend.app.services.long_lived_tokens import (
  11. ALLOWED_SCOPES,
  12. MAX_TOKEN_LIFETIME_DAYS,
  13. create_token,
  14. list_all_tokens,
  15. list_user_tokens,
  16. revoke_token,
  17. verify_token,
  18. )
  19. pytestmark = [pytest.mark.asyncio, pytest.mark.integration]
  20. # ---------------------------------------------------------------------------
  21. # Fixtures
  22. # ---------------------------------------------------------------------------
  23. @pytest.fixture
  24. async def alice(db_session) -> User:
  25. user = User(
  26. username="alice",
  27. email="alice@example.test",
  28. password_hash="x",
  29. is_active=True,
  30. )
  31. db_session.add(user)
  32. await db_session.commit()
  33. await db_session.refresh(user)
  34. return user
  35. @pytest.fixture
  36. async def bob(db_session) -> User:
  37. user = User(
  38. username="bob",
  39. email="bob@example.test",
  40. password_hash="x",
  41. is_active=True,
  42. )
  43. db_session.add(user)
  44. await db_session.commit()
  45. await db_session.refresh(user)
  46. return user
  47. # ---------------------------------------------------------------------------
  48. # Create
  49. # ---------------------------------------------------------------------------
  50. async def test_create_returns_plaintext_once_and_stores_hash(db_session, alice: User):
  51. """Create returns the plaintext token; the DB only stores its hash."""
  52. created = await create_token(
  53. db_session,
  54. user_id=alice.id,
  55. name="Home Assistant",
  56. expires_in_days=30,
  57. )
  58. assert created.plaintext.startswith("bblt_")
  59. assert created.record.id is not None
  60. assert created.record.user_id == alice.id
  61. assert created.record.name == "Home Assistant"
  62. assert created.record.scope == "camera_stream"
  63. assert created.record.lookup_prefix in created.plaintext
  64. # Hash never matches plaintext.
  65. assert created.record.secret_hash != created.plaintext
  66. # Expiry roughly 30 days from now (allow a few seconds of clock drift).
  67. delta = created.record.expires_at - datetime.utcnow()
  68. assert timedelta(days=29, hours=23) < delta < timedelta(days=30, minutes=1)
  69. async def test_create_rejects_zero_or_negative_expiry(db_session, alice: User):
  70. """Issue #1108 explicitly forbids ``expire_in: 0``."""
  71. with pytest.raises(ValueError, match="positive"):
  72. await create_token(db_session, user_id=alice.id, name="x", expires_in_days=0)
  73. with pytest.raises(ValueError, match="positive"):
  74. await create_token(db_session, user_id=alice.id, name="x", expires_in_days=-5)
  75. async def test_create_rejects_expiry_above_policy_cap(db_session, alice: User):
  76. """Above the 365-day ceiling → reject. UI layer also clamps but the
  77. service is the canonical guard.
  78. """
  79. with pytest.raises(ValueError, match="exceeds policy maximum"):
  80. await create_token(
  81. db_session,
  82. user_id=alice.id,
  83. name="x",
  84. expires_in_days=MAX_TOKEN_LIFETIME_DAYS + 1,
  85. )
  86. async def test_create_rejects_unsupported_scope(db_session, alice: User):
  87. """V1 only allows ``camera_stream``."""
  88. assert {"camera_stream"} == set(ALLOWED_SCOPES)
  89. with pytest.raises(ValueError, match="unsupported scope"):
  90. await create_token(
  91. db_session,
  92. user_id=alice.id,
  93. name="x",
  94. expires_in_days=7,
  95. scope="anything_else",
  96. )
  97. async def test_create_rejects_blank_or_oversize_name(db_session, alice: User):
  98. with pytest.raises(ValueError, match="name is required"):
  99. await create_token(db_session, user_id=alice.id, name=" ", expires_in_days=7)
  100. with pytest.raises(ValueError, match="100"):
  101. await create_token(db_session, user_id=alice.id, name="x" * 101, expires_in_days=7)
  102. # ---------------------------------------------------------------------------
  103. # Verify
  104. # ---------------------------------------------------------------------------
  105. async def test_verify_happy_path_returns_record_and_updates_last_used(db_session, alice: User):
  106. created = await create_token(db_session, user_id=alice.id, name="Frigate", expires_in_days=7)
  107. assert created.record.last_used_at is None
  108. record = await verify_token(db_session, created.plaintext)
  109. assert record is not None
  110. assert record.id == created.record.id
  111. assert record.last_used_at is not None
  112. async def test_verify_returns_none_for_garbage_token(db_session, alice: User):
  113. await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
  114. assert await verify_token(db_session, "not-a-real-token") is None
  115. assert await verify_token(db_session, "bblt_short") is None
  116. # Wrong prefix entirely.
  117. assert await verify_token(db_session, "abc_12345678_zzz") is None
  118. async def test_verify_returns_none_for_expired_token(db_session, alice: User):
  119. created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=1)
  120. # Force expiry into the past.
  121. created.record.expires_at = datetime.now(timezone.utc) - timedelta(seconds=1)
  122. await db_session.commit()
  123. assert await verify_token(db_session, created.plaintext) is None
  124. async def test_verify_returns_none_for_revoked_token(db_session, alice: User):
  125. created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
  126. revoked = await revoke_token(db_session, created.record.id)
  127. assert revoked is True
  128. assert await verify_token(db_session, created.plaintext) is None
  129. async def test_verify_returns_none_when_scope_mismatched(db_session, alice: User):
  130. """A camera_stream-scoped token must NOT validate against any other scope.
  131. No other scopes exist today, but if/when they do, this guard prevents a
  132. camera token from being accepted by, say, a control endpoint.
  133. """
  134. created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
  135. assert await verify_token(db_session, created.plaintext, scope="other") is None
  136. async def test_verify_does_not_collide_across_users_with_same_prefix(db_session, alice: User, bob: User, monkeypatch):
  137. """If two tokens happened to land on the same lookup_prefix, only the
  138. one whose hash matches must verify. We force the collision by patching
  139. the token-part generator and asserting verify returns the right record.
  140. """
  141. from backend.app.services import long_lived_tokens
  142. real = long_lived_tokens._generate_token_parts
  143. sequence = iter(["aliceaaa", "bobbbbbb"])
  144. def _fixed_prefix():
  145. # First call (alice's token) gets the real generator output but with
  146. # the prefix forced to a known value.
  147. plaintext, _, hash_input = real()
  148. prefix = next(sequence)
  149. # Splice the forced prefix into the plaintext + hash_input.
  150. new_plaintext = "bblt_" + prefix + plaintext[len("bblt_") + 8 :]
  151. return new_plaintext, prefix, new_plaintext
  152. monkeypatch.setattr(long_lived_tokens, "_generate_token_parts", _fixed_prefix)
  153. a = await create_token(db_session, user_id=alice.id, name="a", expires_in_days=7)
  154. b = await create_token(db_session, user_id=bob.id, name="b", expires_in_days=7)
  155. assert a.record.lookup_prefix != b.record.lookup_prefix # sanity
  156. # Cross-verify: alice's plaintext must only match alice's record.
  157. assert (await verify_token(db_session, a.plaintext)).id == a.record.id
  158. assert (await verify_token(db_session, b.plaintext)).id == b.record.id
  159. # ---------------------------------------------------------------------------
  160. # List + revoke
  161. # ---------------------------------------------------------------------------
  162. async def test_list_user_tokens_returns_only_owners_active_tokens(db_session, alice: User, bob: User):
  163. a1 = await create_token(db_session, user_id=alice.id, name="a1", expires_in_days=7)
  164. await create_token(db_session, user_id=alice.id, name="a2", expires_in_days=7)
  165. await create_token(db_session, user_id=bob.id, name="b1", expires_in_days=7)
  166. await revoke_token(db_session, a1.record.id)
  167. alice_tokens = await list_user_tokens(db_session, alice.id)
  168. names = {t.name for t in alice_tokens}
  169. assert names == {"a2"} # a1 revoked, b1 belongs to bob
  170. async def test_list_all_tokens_returns_every_active_token(db_session, alice: User, bob: User):
  171. await create_token(db_session, user_id=alice.id, name="a", expires_in_days=7)
  172. b = await create_token(db_session, user_id=bob.id, name="b", expires_in_days=7)
  173. await revoke_token(db_session, b.record.id)
  174. all_tokens = await list_all_tokens(db_session)
  175. names = {t.name for t in all_tokens}
  176. assert "a" in names
  177. assert "b" not in names # revoked excluded
  178. async def test_revoke_is_idempotent(db_session, alice: User):
  179. created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
  180. assert await revoke_token(db_session, created.record.id) is True
  181. # Second revoke is a no-op (returns False, never raises).
  182. assert await revoke_token(db_session, created.record.id) is False
  183. async def test_revoke_unknown_id_returns_false(db_session):
  184. assert await revoke_token(db_session, 99_999) is False