| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- """Unit tests for the long-lived camera-token service (#1108).
- Drives the service directly against a real SQLAlchemy session so the
- hash/lookup/expiry/revoke logic is exercised end-to-end with no HTTP.
- """
- from __future__ import annotations
- from datetime import datetime, timedelta, timezone
- import pytest
- from backend.app.models.long_lived_token import LongLivedToken
- from backend.app.models.user import User
- from backend.app.services.long_lived_tokens import (
- ALLOWED_SCOPES,
- MAX_TOKEN_LIFETIME_DAYS,
- create_token,
- list_all_tokens,
- list_user_tokens,
- revoke_token,
- verify_token,
- )
- pytestmark = [pytest.mark.asyncio, pytest.mark.integration]
- # ---------------------------------------------------------------------------
- # Fixtures
- # ---------------------------------------------------------------------------
- @pytest.fixture
- async def alice(db_session) -> User:
- user = User(
- username="alice",
- email="alice@example.test",
- password_hash="x",
- is_active=True,
- )
- db_session.add(user)
- await db_session.commit()
- await db_session.refresh(user)
- return user
- @pytest.fixture
- async def bob(db_session) -> User:
- user = User(
- username="bob",
- email="bob@example.test",
- password_hash="x",
- is_active=True,
- )
- db_session.add(user)
- await db_session.commit()
- await db_session.refresh(user)
- return user
- # ---------------------------------------------------------------------------
- # Create
- # ---------------------------------------------------------------------------
- async def test_create_returns_plaintext_once_and_stores_hash(db_session, alice: User):
- """Create returns the plaintext token; the DB only stores its hash."""
- created = await create_token(
- db_session,
- user_id=alice.id,
- name="Home Assistant",
- expires_in_days=30,
- )
- assert created.plaintext.startswith("bblt_")
- assert created.record.id is not None
- assert created.record.user_id == alice.id
- assert created.record.name == "Home Assistant"
- assert created.record.scope == "camera_stream"
- assert created.record.lookup_prefix in created.plaintext
- # Hash never matches plaintext.
- assert created.record.secret_hash != created.plaintext
- # Expiry roughly 30 days from now (allow a few seconds of clock drift).
- delta = created.record.expires_at - datetime.utcnow()
- assert timedelta(days=29, hours=23) < delta < timedelta(days=30, minutes=1)
- async def test_create_rejects_zero_or_negative_expiry(db_session, alice: User):
- """Issue #1108 explicitly forbids ``expire_in: 0``."""
- with pytest.raises(ValueError, match="positive"):
- await create_token(db_session, user_id=alice.id, name="x", expires_in_days=0)
- with pytest.raises(ValueError, match="positive"):
- await create_token(db_session, user_id=alice.id, name="x", expires_in_days=-5)
- async def test_create_rejects_expiry_above_policy_cap(db_session, alice: User):
- """Above the 365-day ceiling → reject. UI layer also clamps but the
- service is the canonical guard.
- """
- with pytest.raises(ValueError, match="exceeds policy maximum"):
- await create_token(
- db_session,
- user_id=alice.id,
- name="x",
- expires_in_days=MAX_TOKEN_LIFETIME_DAYS + 1,
- )
- async def test_create_rejects_unsupported_scope(db_session, alice: User):
- """V1 only allows ``camera_stream``."""
- assert {"camera_stream"} == set(ALLOWED_SCOPES)
- with pytest.raises(ValueError, match="unsupported scope"):
- await create_token(
- db_session,
- user_id=alice.id,
- name="x",
- expires_in_days=7,
- scope="anything_else",
- )
- async def test_create_rejects_blank_or_oversize_name(db_session, alice: User):
- with pytest.raises(ValueError, match="name is required"):
- await create_token(db_session, user_id=alice.id, name=" ", expires_in_days=7)
- with pytest.raises(ValueError, match="100"):
- await create_token(db_session, user_id=alice.id, name="x" * 101, expires_in_days=7)
- # ---------------------------------------------------------------------------
- # Verify
- # ---------------------------------------------------------------------------
- async def test_verify_happy_path_returns_record_and_updates_last_used(db_session, alice: User):
- created = await create_token(db_session, user_id=alice.id, name="Frigate", expires_in_days=7)
- assert created.record.last_used_at is None
- record = await verify_token(db_session, created.plaintext)
- assert record is not None
- assert record.id == created.record.id
- assert record.last_used_at is not None
- async def test_verify_returns_none_for_garbage_token(db_session, alice: User):
- await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
- assert await verify_token(db_session, "not-a-real-token") is None
- assert await verify_token(db_session, "bblt_short") is None
- # Wrong prefix entirely.
- assert await verify_token(db_session, "abc_12345678_zzz") is None
- async def test_verify_returns_none_for_expired_token(db_session, alice: User):
- created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=1)
- # Force expiry into the past.
- created.record.expires_at = datetime.now(timezone.utc) - timedelta(seconds=1)
- await db_session.commit()
- assert await verify_token(db_session, created.plaintext) is None
- async def test_verify_returns_none_for_revoked_token(db_session, alice: User):
- created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
- revoked = await revoke_token(db_session, created.record.id)
- assert revoked is True
- assert await verify_token(db_session, created.plaintext) is None
- async def test_verify_returns_none_when_scope_mismatched(db_session, alice: User):
- """A camera_stream-scoped token must NOT validate against any other scope.
- No other scopes exist today, but if/when they do, this guard prevents a
- camera token from being accepted by, say, a control endpoint.
- """
- created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
- assert await verify_token(db_session, created.plaintext, scope="other") is None
- async def test_verify_does_not_collide_across_users_with_same_prefix(db_session, alice: User, bob: User, monkeypatch):
- """If two tokens happened to land on the same lookup_prefix, only the
- one whose hash matches must verify. We force the collision by patching
- the token-part generator and asserting verify returns the right record.
- """
- from backend.app.services import long_lived_tokens
- real = long_lived_tokens._generate_token_parts
- sequence = iter(["aliceaaa", "bobbbbbb"])
- def _fixed_prefix():
- # First call (alice's token) gets the real generator output but with
- # the prefix forced to a known value.
- plaintext, _, hash_input = real()
- prefix = next(sequence)
- # Splice the forced prefix into the plaintext + hash_input.
- new_plaintext = "bblt_" + prefix + plaintext[len("bblt_") + 8 :]
- return new_plaintext, prefix, new_plaintext
- monkeypatch.setattr(long_lived_tokens, "_generate_token_parts", _fixed_prefix)
- a = await create_token(db_session, user_id=alice.id, name="a", expires_in_days=7)
- b = await create_token(db_session, user_id=bob.id, name="b", expires_in_days=7)
- assert a.record.lookup_prefix != b.record.lookup_prefix # sanity
- # Cross-verify: alice's plaintext must only match alice's record.
- assert (await verify_token(db_session, a.plaintext)).id == a.record.id
- assert (await verify_token(db_session, b.plaintext)).id == b.record.id
- # ---------------------------------------------------------------------------
- # List + revoke
- # ---------------------------------------------------------------------------
- async def test_list_user_tokens_returns_only_owners_active_tokens(db_session, alice: User, bob: User):
- a1 = await create_token(db_session, user_id=alice.id, name="a1", expires_in_days=7)
- await create_token(db_session, user_id=alice.id, name="a2", expires_in_days=7)
- await create_token(db_session, user_id=bob.id, name="b1", expires_in_days=7)
- await revoke_token(db_session, a1.record.id)
- alice_tokens = await list_user_tokens(db_session, alice.id)
- names = {t.name for t in alice_tokens}
- assert names == {"a2"} # a1 revoked, b1 belongs to bob
- async def test_list_all_tokens_returns_every_active_token(db_session, alice: User, bob: User):
- await create_token(db_session, user_id=alice.id, name="a", expires_in_days=7)
- b = await create_token(db_session, user_id=bob.id, name="b", expires_in_days=7)
- await revoke_token(db_session, b.record.id)
- all_tokens = await list_all_tokens(db_session)
- names = {t.name for t in all_tokens}
- assert "a" in names
- assert "b" not in names # revoked excluded
- async def test_revoke_is_idempotent(db_session, alice: User):
- created = await create_token(db_session, user_id=alice.id, name="x", expires_in_days=7)
- assert await revoke_token(db_session, created.record.id) is True
- # Second revoke is a no-op (returns False, never raises).
- assert await revoke_token(db_session, created.record.id) is False
- async def test_revoke_unknown_id_returns_false(db_session):
- assert await revoke_token(db_session, 99_999) is False
|