| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192 |
- from __future__ import annotations
- import logging
- import os
- import secrets
- from datetime import datetime, timedelta, timezone
- from pathlib import Path
- from typing import Annotated
- import jwt
- from fastapi import Depends, Header, HTTPException, status
- from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
- from jwt.exceptions import PyJWTError as JWTError
- from passlib.context import CryptContext
- from sqlalchemy import delete, func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import selectinload
- from backend.app.core.database import async_session, get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.api_key import APIKey
- from backend.app.models.auth_ephemeral import AuthEphemeralToken, TokenType
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- logger = logging.getLogger(__name__)
- # Password hashing
- # Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues
- # pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations
- pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- def _get_jwt_secret() -> str:
- """Get the JWT secret key from environment, file, or generate a new one.
- Priority:
- 1. JWT_SECRET_KEY environment variable
- 2. .jwt_secret file in data directory
- 3. Generate new random secret and save to file
- Returns:
- The JWT secret key
- """
- # 1. Check environment variable first
- env_secret = os.environ.get("JWT_SECRET_KEY")
- if env_secret:
- logger.info("Using JWT secret from JWT_SECRET_KEY environment variable")
- return env_secret
- # 2. Check for secret file in data directory
- # Use DATA_DIR env var (same as rest of app), fallback to data/ subdirectory
- data_dir_env = os.environ.get("DATA_DIR")
- if data_dir_env:
- data_dir = Path(data_dir_env)
- else:
- # Fallback to data/ subdirectory under project root (not project root itself!)
- data_dir = Path(__file__).parent.parent.parent.parent / "data"
- secret_file = data_dir / ".jwt_secret"
- if secret_file.exists():
- try:
- secret = secret_file.read_text().strip()
- if secret and len(secret) >= 32:
- logger.info("Using JWT secret from %s", secret_file)
- return secret
- except OSError as e:
- logger.warning("Failed to read JWT secret file: %s", e)
- # 3. Generate new random secret
- new_secret = secrets.token_urlsafe(64)
- # Try to save it
- try:
- data_dir.mkdir(parents=True, exist_ok=True)
- # Note: CodeQL flags this as "clear-text storage of sensitive information" but this is
- # intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions,
- # and this is standard practice for self-hosted applications (same as .env files).
- secret_file.write_text(new_secret) # nosec B105
- # Restrict permissions (owner read/write only)
- secret_file.chmod(0o600)
- logger.info("Generated new JWT secret and saved to %s", secret_file)
- except OSError as e:
- logger.warning(
- "Could not save JWT secret to file (%s). "
- "Secret will be regenerated on restart, invalidating existing tokens. "
- "Set JWT_SECRET_KEY environment variable for persistence.",
- e,
- )
- return new_secret
- # JWT settings
- SECRET_KEY = _get_jwt_secret()
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (M-2: reduced from 7 days)
- # HTTP Bearer token
- security = HTTPBearer(auto_error=False)
- # --- Slicer download tokens ---
- # Short-lived, single-use tokens for slicer protocol handlers that can't send
- # auth headers. Stored in AuthEphemeralToken (token_type=TokenType.SLICER_DOWNLOAD)
- # so they survive server restarts and work in multi-worker deployments (M-3).
- SLICER_TOKEN_EXPIRE_MINUTES = 5
- async def create_slicer_download_token(resource_type: str, resource_id: int) -> str:
- """Create a short-lived, single-use download token for slicer protocol handlers."""
- now = datetime.now(timezone.utc)
- expires_at = now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES)
- token = secrets.token_urlsafe(24)
- resource_key = f"{resource_type}:{resource_id}"
- async with async_session() as db:
- # Prune expired tokens opportunistically
- await db.execute(
- delete(AuthEphemeralToken).where(
- AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD,
- AuthEphemeralToken.expires_at < now,
- )
- )
- db.add(
- AuthEphemeralToken(
- token=token,
- token_type=TokenType.SLICER_DOWNLOAD,
- nonce=resource_key,
- expires_at=expires_at,
- )
- )
- await db.commit()
- return token
- async def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool:
- """Verify and atomically consume a slicer download token.
- Returns True only if the token is valid, unexpired, and bound to the given resource.
- DELETE...RETURNING ensures the token is single-use even under concurrent requests.
- M-NEW-1 fix: nonce (resource key) is included in the WHERE clause so the DELETE
- only succeeds when the token is presented to the *correct* resource endpoint.
- Previously the token was consumed (committed) even when stored_key != expected_key,
- permanently invalidating it while returning False to the caller.
- """
- expected_key = f"{resource_type}:{resource_id}"
- now = datetime.now(timezone.utc)
- async with async_session() as db:
- result = await db.execute(
- delete(AuthEphemeralToken)
- .where(
- AuthEphemeralToken.token == token,
- AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD,
- AuthEphemeralToken.nonce == expected_key,
- AuthEphemeralToken.expires_at > now,
- )
- .returning(AuthEphemeralToken.id)
- )
- if result.one_or_none() is None:
- return False
- await db.commit()
- return True
- # --- Camera stream tokens ---
- # Reusable tokens for camera stream/snapshot endpoints loaded via <img>/<video>
- # tags (these cannot send Authorization headers). Unlike slicer tokens they are
- # NOT single-use — streams reconnect on errors. Stored in AuthEphemeralToken
- # (token_type="camera_stream") for multi-worker compatibility (M-3).
- CAMERA_STREAM_TOKEN_EXPIRE_MINUTES = 60
- async def create_camera_stream_token() -> str:
- """Create a reusable token for camera stream/snapshot access."""
- now = datetime.now(timezone.utc)
- expires_at = now + timedelta(minutes=CAMERA_STREAM_TOKEN_EXPIRE_MINUTES)
- token = secrets.token_urlsafe(24)
- async with async_session() as db:
- # Prune expired tokens opportunistically
- await db.execute(
- delete(AuthEphemeralToken).where(
- AuthEphemeralToken.token_type == "camera_stream",
- AuthEphemeralToken.expires_at < now,
- )
- )
- db.add(
- AuthEphemeralToken(
- token=token,
- token_type="camera_stream",
- expires_at=expires_at,
- )
- )
- await db.commit()
- return token
- async def verify_camera_stream_token(token: str) -> bool:
- """Verify a camera stream token is valid (reusable — does not consume it).
- Tries the ephemeral 60-minute token first (the common, browser-bound case)
- and falls through to long-lived tokens (#1108) for HA / kiosk integrations
- that paste a token once and expect it to keep working for days.
- """
- now = datetime.now(timezone.utc)
- async with async_session() as db:
- result = await db.execute(
- select(AuthEphemeralToken).where(
- AuthEphemeralToken.token == token,
- AuthEphemeralToken.token_type == "camera_stream",
- AuthEphemeralToken.expires_at > now,
- )
- )
- if result.scalar_one_or_none() is not None:
- return True
- # Long-lived path. Imported lazily so the auth module stays importable
- # at startup before the long_lived_tokens model is registered.
- from backend.app.services.long_lived_tokens import verify_token as verify_long_lived
- record = await verify_long_lived(db, token, scope="camera_stream")
- return record is not None
- def verify_password(plain_password: str, hashed_password: str) -> bool:
- """Verify a password against a hash.
- Uses pbkdf2_sha256 which handles long passwords automatically.
- """
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password: str) -> str:
- """Hash a password.
- Uses pbkdf2_sha256 which is secure and has no password length limit.
- """
- return pwd_context.hash(password)
- def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
- """Create a JWT access token with jti (revocation) and iat (freshness) claims."""
- to_encode = data.copy()
- now = datetime.now(timezone.utc)
- if expires_delta:
- expire = now + expires_delta
- else:
- expire = now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- jti = secrets.token_hex(16)
- to_encode.update({"exp": expire, "jti": jti, "iat": now})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- def _is_token_fresh(iat: int | float | None, user: User) -> bool:
- """Return False if the token was issued before the user's last password change.
- Used to invalidate all sessions after a password reset/change (M-R7-B).
- All tokens without an iat claim are unconditionally rejected — every token
- issued by this server carries iat, so absence means the token is forged or
- from a pre-iat code path whose max TTL (24 h) has long since expired.
- """
- if iat is None:
- return False
- if not hasattr(user, "password_changed_at") or user.password_changed_at is None:
- return True # No password change recorded yet (I2 migration handles this)
- token_issued_at = datetime.fromtimestamp(iat, tz=timezone.utc)
- pca = user.password_changed_at
- if pca.tzinfo is None:
- pca = pca.replace(tzinfo=timezone.utc)
- # JWT iat is whole seconds; truncate pca so tokens issued in the same second pass.
- pca = pca.replace(microsecond=0)
- return token_issued_at >= pca
- async def revoke_jti(jti: str, expires_at: datetime, username: str | None = None) -> None:
- """Store a revoked JWT jti so it is rejected on future requests.
- Silently ignores duplicate inserts (e.g. double-logout with the same token).
- """
- from sqlalchemy.exc import IntegrityError
- async with async_session() as db:
- revoked = AuthEphemeralToken(
- token=jti,
- token_type="revoked_jti",
- username=username,
- expires_at=expires_at,
- )
- db.add(revoked)
- try:
- await db.commit()
- except IntegrityError:
- await db.rollback() # jti already revoked — desired state, ignore
- async def is_jti_revoked(jti: str) -> bool:
- """Return True if the given jti has been revoked."""
- async with async_session() as db:
- result = await db.execute(
- select(AuthEphemeralToken).where(
- AuthEphemeralToken.token == jti,
- AuthEphemeralToken.token_type == "revoked_jti",
- )
- )
- return result.scalar_one_or_none() is not None
- async def get_user_by_username(db: AsyncSession, username: str) -> User | None:
- """Get a user by username (case-insensitive) with groups loaded for permission checks."""
- result = await db.execute(
- select(User).where(func.lower(User.username) == func.lower(username)).options(selectinload(User.groups))
- )
- return result.scalar_one_or_none()
- async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
- """Get a user by email (case-insensitive) with groups loaded for permission checks."""
- result = await db.execute(
- select(User).where(func.lower(User.email) == func.lower(email)).options(selectinload(User.groups))
- )
- return result.scalar_one_or_none()
- async def authenticate_user(db: AsyncSession, username: str, password: str) -> User | None:
- """Authenticate a user by username and password.
- Username lookup is case-insensitive. Password is case-sensitive.
- LDAP and OIDC users must authenticate via their respective providers.
- """
- user = await get_user_by_username(db, username)
- if not user:
- return None
- if getattr(user, "auth_source", "local") in ("ldap", "oidc"):
- return None # LDAP/OIDC users must authenticate via their provider
- if not user.password_hash or not verify_password(password, user.password_hash):
- return None
- if not user.is_active:
- return None
- return user
- async def authenticate_user_by_email(db: AsyncSession, email: str, password: str) -> User | None:
- """Authenticate a user by email and password.
- Email lookup is case-insensitive. Password is case-sensitive.
- LDAP and OIDC users must authenticate via their respective providers.
- """
- user = await get_user_by_email(db, email)
- if not user:
- return None
- if getattr(user, "auth_source", "local") in ("ldap", "oidc"):
- return None # LDAP/OIDC users must authenticate via their provider
- if not user.password_hash or not verify_password(password, user.password_hash):
- return None
- if not user.is_active:
- return None
- return user
- async def is_auth_enabled(db: AsyncSession) -> bool:
- """Check if authentication is enabled."""
- try:
- result = await db.execute(select(Settings).where(Settings.key == "auth_enabled"))
- setting = result.scalar_one_or_none()
- if setting is None:
- return False
- return setting.value.lower() == "true"
- except Exception:
- # If settings table doesn't exist or query fails, assume auth is disabled
- return False
- async def _user_from_api_key(db: AsyncSession, api_key: APIKey) -> User | None:
- """Resolve the owner of a validated API key, or None for legacy ownerless keys.
- Cloud routes (and any route that needs caller identity) read the returned
- User to look up per-user state like ``cloud_token``. Legacy keys created
- before #1182 have ``user_id IS NULL`` and stay anonymous — they keep working
- against non-cloud routes for backward compatibility, but cloud routes will
- surface a "recreate this key" error rather than 200 with empty results.
- """
- if api_key.user_id is None:
- return None
- result = await db.execute(select(User).where(User.id == api_key.user_id))
- user = result.scalar_one_or_none()
- if user is None or not user.is_active:
- # CASCADE on user delete should prevent a dangling user_id, but if
- # someone manually deactivates the owner the key shouldn't suddenly
- # gain an "anonymous" identity — drop the request to None so cloud
- # access fails closed.
- return None
- return user
- async def _validate_api_key(db: AsyncSession, api_key_value: str) -> APIKey | None:
- """Validate an API key and return the APIKey object if valid, None otherwise.
- L-1: Pre-filter by key_prefix (first 8 chars) before running pbkdf2 so only
- O(1) candidate rows are hashed instead of the full key table. The prefix is
- not secret (it is shown in the admin UI), so this does not reduce security.
- """
- try:
- # key_prefix is stored as "<first-8-chars>..." (e.g. "bb_Abc12...").
- # Matching on the first 8 chars of the submitted key reduces the scan to
- # at most one row in practice (2^40 collision space for 5 base64 chars).
- key_lookup = api_key_value[:8] if len(api_key_value) >= 8 else api_key_value
- result = await db.execute(
- select(APIKey).where(
- APIKey.enabled.is_(True),
- APIKey.key_prefix.like(
- key_lookup.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + "%", escape="\\"
- ),
- )
- )
- api_keys = result.scalars().all()
- for api_key in api_keys:
- if verify_password(api_key_value, api_key.key_hash):
- # Check expiration
- if api_key.expires_at:
- expires = api_key.expires_at
- if expires.tzinfo is None:
- expires = expires.replace(tzinfo=timezone.utc)
- if expires < datetime.now(timezone.utc):
- return None # Expired
- # Update last_used timestamp
- api_key.last_used = datetime.now(timezone.utc)
- await db.commit()
- return api_key
- except Exception as e:
- logger.warning("API key validation error: %s", e)
- return None
- async def get_current_user_optional(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- ) -> User | None:
- """Get the current authenticated user from JWT token, or None if not authenticated.
- Returns None only when NO credentials are supplied. If a token is supplied
- but invalid/revoked, raises 401 — a revoked token must not grant anonymous
- access (I6).
- """
- if credentials is None:
- return None
- _unauthorized = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- token = credentials.credentials
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise _unauthorized
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise _unauthorized # I6: revoked token → 401, not anonymous
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise _unauthorized
- async with async_session() as db:
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise _unauthorized
- if not _is_token_fresh(iat, user):
- raise _unauthorized
- return user
- async def get_current_user(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- ) -> User:
- """Get the current authenticated user from JWT token."""
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if credentials is None:
- raise credentials_exception
- try:
- token = credentials.credentials
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise credentials_exception
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise credentials_exception
- async with async_session() as db:
- user = await get_user_by_username(db, username)
- if user is None:
- raise credentials_exception
- if not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="User account is disabled",
- )
- if not _is_token_fresh(iat, user):
- raise credentials_exception
- return user
- async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)]) -> User:
- """Get the current active user (alias for clarity)."""
- return current_user
- async def require_auth_if_enabled(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- """Require authentication if auth is enabled, otherwise return None.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx). API keys return
- None for backward compatibility — routes that need the API-key owner (i.e.
- cloud routes for #1182) resolve it via their own router-level dependency
- that stashes ``request.state.api_key_owner``. Returning the owner here
- instead would silently grant API-keyed callers access to every route that
- fences via ``if current_user is None``, which is a wider surface than
- #1182 was designed to expose.
- """
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None # API key valid, allow access
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return user
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- def require_role(required_role: str):
- """Dependency factory for role-based access control."""
- async def role_checker(current_user: Annotated[User, Depends(get_current_user)]) -> User:
- if current_user.role != required_role:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Requires {required_role} role",
- )
- return current_user
- return role_checker
- def require_admin_if_auth_enabled():
- """Dependency factory that requires admin role if auth is enabled."""
- async def admin_checker(
- current_user: Annotated[User | None, Depends(require_auth_if_enabled)] = None,
- ) -> User | None:
- if current_user is None:
- return None # Auth not enabled, allow access
- if current_user.role != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Requires admin role",
- )
- return current_user
- return admin_checker
- def generate_api_key() -> tuple[str, str, str]:
- """Generate a new API key.
- Returns:
- tuple: (full_key, key_hash, key_prefix)
- - full_key: The complete API key (only shown once on creation)
- - key_hash: Hashed version for storage and verification
- - key_prefix: First 8 characters for display purposes
- """
- # Generate a secure random API key (32 bytes = 64 hex characters)
- full_key = f"bb_{secrets.token_urlsafe(32)}"
- key_hash = get_password_hash(full_key)
- key_prefix = full_key[:8] + "..." if len(full_key) > 8 else full_key
- return full_key, key_hash, key_prefix
- async def get_api_key(
- authorization: Annotated[str | None, Header(alias="Authorization")] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- db: AsyncSession = Depends(get_db),
- ) -> APIKey:
- """Get and validate API key from request headers.
- Checks both 'Authorization: Bearer <key>' and 'X-API-Key: <key>' headers.
- """
- api_key_value = None
- if x_api_key:
- api_key_value = x_api_key
- elif authorization and authorization.startswith("Bearer "):
- api_key_value = authorization.replace("Bearer ", "")
- if not api_key_value:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="API key required. Provide 'X-API-Key' header or 'Authorization: Bearer <key>'",
- )
- # M-NEW-2: Pre-filter by key_prefix (first 8 chars) to avoid O(n) pbkdf2 over all
- # enabled keys — same fix as in _validate_api_key (L-1 from previous review).
- key_lookup = api_key_value[:8] if len(api_key_value) >= 8 else api_key_value
- result = await db.execute(
- select(APIKey).where(
- APIKey.enabled.is_(True),
- APIKey.key_prefix.like(
- key_lookup.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + "%",
- escape="\\",
- ),
- )
- )
- api_keys = result.scalars().all()
- for api_key in api_keys:
- # Check if key matches (verify against hash)
- if verify_password(api_key_value, api_key.key_hash):
- # Check expiration
- if api_key.expires_at:
- expires = api_key.expires_at
- if expires.tzinfo is None:
- expires = expires.replace(tzinfo=timezone.utc)
- if expires < datetime.now(timezone.utc):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="API key has expired",
- )
- # Update last_used timestamp
- api_key.last_used = datetime.now(timezone.utc)
- await db.commit()
- return api_key
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- )
- def check_permission(api_key: APIKey, permission: str) -> None:
- """Check if API key has the required permission.
- Args:
- api_key: The API key object
- permission: One of 'queue', 'control_printer', 'read_status'
- Raises:
- HTTPException: If permission is not granted
- """
- permission_map = {
- "queue": "can_queue",
- "control_printer": "can_control_printer",
- "read_status": "can_read_status",
- }
- if permission not in permission_map:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Unknown permission: {permission}",
- )
- attr_name = permission_map[permission]
- if not getattr(api_key, attr_name, False):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"API key does not have '{permission}' permission",
- )
- def check_printer_access(api_key: APIKey, printer_id: int) -> None:
- """Check if API key has access to the specified printer.
- Args:
- api_key: The API key object
- printer_id: The printer ID to check access for
- Raises:
- HTTPException: If access is denied
- """
- # None = global key, access to all printers
- if api_key.printer_ids is None:
- return
- # Empty list or printer not in allowed list = no access
- if printer_id not in api_key.printer_ids:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"API key does not have access to printer {printer_id}",
- )
- # Convenience dependencies - these are functions that return Depends objects
- def RequireAdmin():
- """Dependency that requires admin role."""
- return Depends(require_role("admin"))
- def RequireAdminIfAuthEnabled():
- """Dependency that requires admin role if auth is enabled."""
- return Depends(require_admin_if_auth_enabled())
- def require_permission(*permissions: str | Permission):
- """Dependency factory that requires user to have ALL specified permissions.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- Args:
- *permissions: Permission strings or Permission enum values to require
- Returns:
- A dependency function that validates permissions
- """
- # Convert Permission enums to strings
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def permission_checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None # API key valid, allow access
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if credentials is None:
- raise credentials_exception
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise credentials_exception
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise credentials_exception
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise credentials_exception
- if not _is_token_fresh(iat, user):
- raise credentials_exception
- if not user.has_all_permissions(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- return permission_checker
- def require_permission_if_auth_enabled(*permissions: str | Permission):
- """Dependency factory that checks permissions only if auth is enabled.
- This provides backward compatibility - when auth is disabled, all access is allowed.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- Args:
- *permissions: Permission strings or Permission enum values to require
- Returns:
- A dependency function that validates permissions if auth is enabled
- """
- # Convert Permission enums to strings
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def permission_checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None # Auth disabled, allow access
- # Check for API key first (X-API-Key header). API-keyed requests
- # bypass the JWT permission check entirely — their scopes live on
- # the APIKey row (can_queue / can_control_printer / can_read_status
- # / can_access_cloud / printer_ids), and the dep returns None so
- # routes don't gain a synthetic User identity that would grant
- # access to fenced surfaces like long-lived-token management.
- # Cloud routes (#1182) resolve the API-key owner separately via
- # their own router-level dependency; see ``cloud.py``.
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None # API key valid, allow access
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not user.has_all_permissions(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return permission_checker
- def RequirePermission(*permissions: str | Permission):
- """Convenience dependency that requires ALL specified permissions."""
- return Depends(require_permission(*permissions))
- def RequirePermissionIfAuthEnabled(*permissions: str | Permission):
- """Convenience dependency that requires permissions if auth is enabled."""
- return Depends(require_permission_if_auth_enabled(*permissions))
- def require_any_permission_if_auth_enabled(*permissions: str | Permission):
- """Dependency factory that requires AT LEAST ONE of the given permissions when auth is enabled."""
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None
- if credentials is not None:
- token = credentials.credentials
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not user.has_any_permission(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return checker
- def RequireAnyPermissionIfAuthEnabled(*permissions: str | Permission):
- """Convenience dependency that requires AT LEAST ONE of the given permissions when auth is enabled."""
- return Depends(require_any_permission_if_auth_enabled(*permissions))
- def require_camera_stream_token_if_auth_enabled():
- """Dependency that validates a camera stream token query param when auth is enabled.
- Used for camera stream/snapshot endpoints that are loaded via <img> tags
- which cannot send Authorization headers. The frontend obtains a token from
- POST /printers/camera/stream-token and appends it as ?token=xxx.
- """
- async def checker(token: str | None = None) -> None:
- async with async_session() as db:
- if not await is_auth_enabled(db):
- return # Auth disabled, allow access
- if not token or not await verify_camera_stream_token(token):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Valid camera stream token required. Obtain one from POST /api/v1/printers/camera/stream-token",
- )
- return checker
- RequireCameraStreamTokenIfAuthEnabled = Depends(require_camera_stream_token_if_auth_enabled())
- def require_ownership_permission(
- all_permission: str | Permission,
- own_permission: str | Permission,
- ):
- """Dependency factory for ownership-based permission checks.
- - User with `all_permission` can modify any item
- - User with `own_permission` can only modify items where created_by_id == user.id
- - Ownerless items (created_by_id = null) require `all_permission`
- - API keys (via X-API-Key header or Bearer bb_xxx) get full access (can_modify_all=True)
- Returns:
- A dependency function that returns (user, can_modify_all).
- - can_modify_all=True: user can modify any item
- - can_modify_all=False: user can only modify their own items
- """
- all_perm = all_permission.value if isinstance(all_permission, Permission) else all_permission
- own_perm = own_permission.value if isinstance(own_permission, Permission) else own_permission
- async def checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> tuple[User | None, bool]:
- """Returns (user, can_modify_all).
- - can_modify_all=True: user can modify any item
- - can_modify_all=False: user can only modify their own items
- """
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None, True # Auth disabled, allow all
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None, True # API key valid, allow all
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None, True # API key valid, allow all
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if user.has_permission(all_perm):
- return user, True
- if user.has_permission(own_perm):
- return user, False
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing permission: {own_perm} or {all_perm}",
- )
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return checker
|