| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236 |
- from __future__ import annotations
- import logging
- import os
- import secrets
- from datetime import datetime, timedelta, timezone
- from typing import Annotated
- import jwt
- from fastapi import Depends, Header, HTTPException, status
- from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
- from jwt.exceptions import PyJWTError as JWTError
- from passlib.context import CryptContext
- from sqlalchemy import delete, func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import selectinload
- from backend.app.core.database import async_session, get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.api_key import APIKey
- from backend.app.models.auth_ephemeral import AuthEphemeralToken, TokenType
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- logger = logging.getLogger(__name__)
- # SETTINGS_READ is intentionally not denied — the SpoolBuddy kiosk reads settings
- # via API key (e.g. to sync the UI language).
- _APIKEY_DENIED_PERMISSIONS: frozenset[Permission] = frozenset(
- {
- Permission.SETTINGS_UPDATE,
- Permission.SETTINGS_BACKUP,
- Permission.SETTINGS_RESTORE,
- Permission.USERS_READ,
- Permission.USERS_CREATE,
- Permission.USERS_UPDATE,
- Permission.USERS_DELETE,
- Permission.GROUPS_READ,
- Permission.GROUPS_CREATE,
- Permission.GROUPS_UPDATE,
- Permission.GROUPS_DELETE,
- Permission.API_KEYS_CREATE,
- Permission.API_KEYS_UPDATE,
- Permission.API_KEYS_DELETE,
- Permission.API_KEYS_READ,
- Permission.GITHUB_BACKUP,
- Permission.GITHUB_RESTORE,
- Permission.FIRMWARE_UPDATE,
- }
- )
- def _check_apikey_permissions(perm_strings: list[str]) -> None:
- """Raise 403 if any required permission is admin-only (not accessible via API key)."""
- denied = _APIKEY_DENIED_PERMISSIONS.intersection(perm_strings)
- if denied:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="API keys cannot be used for administrative operations",
- )
- # Password hashing
- # Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues
- # pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations
- pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- def _get_jwt_secret() -> str:
- """Get the JWT secret key from environment, file, or generate a new one.
- Priority:
- 1. JWT_SECRET_KEY environment variable
- 2. .jwt_secret file in data directory
- 3. Generate new random secret and save to file
- Returns:
- The JWT secret key
- """
- # 1. Check environment variable first
- env_secret = os.environ.get("JWT_SECRET_KEY")
- if env_secret:
- logger.info("Using JWT secret from JWT_SECRET_KEY environment variable")
- return env_secret
- # 2. Check for secret file in data directory
- from backend.app.core.paths import resolve_data_dir
- data_dir = resolve_data_dir()
- secret_file = data_dir / ".jwt_secret"
- if secret_file.exists():
- try:
- secret = secret_file.read_text().strip()
- if secret and len(secret) >= 32:
- logger.info("Using JWT secret from %s", secret_file)
- return secret
- except OSError as e:
- logger.warning("Failed to read JWT secret file: %s", e)
- # 3. Generate new random secret
- new_secret = secrets.token_urlsafe(64)
- # Try to save it
- try:
- data_dir.mkdir(parents=True, exist_ok=True)
- # Note: CodeQL flags this as "clear-text storage of sensitive information" but this is
- # intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions,
- # and this is standard practice for self-hosted applications (same as .env files).
- secret_file.write_text(new_secret) # nosec B105
- # Restrict permissions (owner read/write only)
- secret_file.chmod(0o600)
- logger.info("Generated new JWT secret and saved to %s", secret_file)
- except OSError as e:
- logger.warning(
- "Could not save JWT secret to file (%s). "
- "Secret will be regenerated on restart, invalidating existing tokens. "
- "Set JWT_SECRET_KEY environment variable for persistence.",
- e,
- )
- return new_secret
- # JWT settings
- SECRET_KEY = _get_jwt_secret()
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (M-2: reduced from 7 days)
- # HTTP Bearer token
- security = HTTPBearer(auto_error=False)
- # --- Slicer download tokens ---
- # Short-lived, single-use tokens for slicer protocol handlers that can't send
- # auth headers. Stored in AuthEphemeralToken (token_type=TokenType.SLICER_DOWNLOAD)
- # so they survive server restarts and work in multi-worker deployments (M-3).
- SLICER_TOKEN_EXPIRE_MINUTES = 5
- async def create_slicer_download_token(resource_type: str, resource_id: int) -> str:
- """Create a short-lived, single-use download token for slicer protocol handlers."""
- now = datetime.now(timezone.utc)
- expires_at = now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES)
- token = secrets.token_urlsafe(24)
- resource_key = f"{resource_type}:{resource_id}"
- async with async_session() as db:
- # Prune expired tokens opportunistically
- await db.execute(
- delete(AuthEphemeralToken).where(
- AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD,
- AuthEphemeralToken.expires_at < now,
- )
- )
- db.add(
- AuthEphemeralToken(
- token=token,
- token_type=TokenType.SLICER_DOWNLOAD,
- nonce=resource_key,
- expires_at=expires_at,
- )
- )
- await db.commit()
- return token
- async def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool:
- """Verify and atomically consume a slicer download token.
- Returns True only if the token is valid, unexpired, and bound to the given resource.
- DELETE...RETURNING ensures the token is single-use even under concurrent requests.
- M-NEW-1 fix: nonce (resource key) is included in the WHERE clause so the DELETE
- only succeeds when the token is presented to the *correct* resource endpoint.
- Previously the token was consumed (committed) even when stored_key != expected_key,
- permanently invalidating it while returning False to the caller.
- """
- expected_key = f"{resource_type}:{resource_id}"
- now = datetime.now(timezone.utc)
- async with async_session() as db:
- result = await db.execute(
- delete(AuthEphemeralToken)
- .where(
- AuthEphemeralToken.token == token,
- AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD,
- AuthEphemeralToken.nonce == expected_key,
- AuthEphemeralToken.expires_at > now,
- )
- .returning(AuthEphemeralToken.id)
- )
- if result.one_or_none() is None:
- return False
- await db.commit()
- return True
- # --- Camera stream tokens ---
- # Reusable tokens for camera stream/snapshot endpoints loaded via <img>/<video>
- # tags (these cannot send Authorization headers). Unlike slicer tokens they are
- # NOT single-use — streams reconnect on errors. Stored in AuthEphemeralToken
- # (token_type="camera_stream") for multi-worker compatibility (M-3).
- CAMERA_STREAM_TOKEN_EXPIRE_MINUTES = 60
- async def create_camera_stream_token() -> str:
- """Create a reusable token for camera stream/snapshot access."""
- now = datetime.now(timezone.utc)
- expires_at = now + timedelta(minutes=CAMERA_STREAM_TOKEN_EXPIRE_MINUTES)
- token = secrets.token_urlsafe(24)
- async with async_session() as db:
- # Prune expired tokens opportunistically
- await db.execute(
- delete(AuthEphemeralToken).where(
- AuthEphemeralToken.token_type == "camera_stream",
- AuthEphemeralToken.expires_at < now,
- )
- )
- db.add(
- AuthEphemeralToken(
- token=token,
- token_type="camera_stream",
- expires_at=expires_at,
- )
- )
- await db.commit()
- return token
- async def verify_camera_stream_token(token: str) -> bool:
- """Verify a camera stream token is valid (reusable — does not consume it).
- Tries the ephemeral 60-minute token first (the common, browser-bound case)
- and falls through to long-lived tokens (#1108) for HA / kiosk integrations
- that paste a token once and expect it to keep working for days.
- """
- now = datetime.now(timezone.utc)
- async with async_session() as db:
- result = await db.execute(
- select(AuthEphemeralToken).where(
- AuthEphemeralToken.token == token,
- AuthEphemeralToken.token_type == "camera_stream",
- AuthEphemeralToken.expires_at > now,
- )
- )
- if result.scalar_one_or_none() is not None:
- return True
- # Long-lived path. Imported lazily so the auth module stays importable
- # at startup before the long_lived_tokens model is registered.
- from backend.app.services.long_lived_tokens import verify_token as verify_long_lived
- record = await verify_long_lived(db, token, scope="camera_stream")
- return record is not None
- def verify_password(plain_password: str, hashed_password: str) -> bool:
- """Verify a password against a hash.
- Uses pbkdf2_sha256 which handles long passwords automatically.
- """
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password: str) -> str:
- """Hash a password.
- Uses pbkdf2_sha256 which is secure and has no password length limit.
- """
- return pwd_context.hash(password)
- def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
- """Create a JWT access token with jti (revocation) and iat (freshness) claims."""
- to_encode = data.copy()
- now = datetime.now(timezone.utc)
- if expires_delta:
- expire = now + expires_delta
- else:
- expire = now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- jti = secrets.token_hex(16)
- to_encode.update({"exp": expire, "jti": jti, "iat": now})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- def _is_token_fresh(iat: int | float | None, user: User) -> bool:
- """Return False if the token was issued before the user's last password change.
- Used to invalidate all sessions after a password reset/change (M-R7-B).
- All tokens without an iat claim are unconditionally rejected — every token
- issued by this server carries iat, so absence means the token is forged or
- from a pre-iat code path whose max TTL (24 h) has long since expired.
- """
- if iat is None:
- return False
- if not hasattr(user, "password_changed_at") or user.password_changed_at is None:
- return True # No password change recorded yet (I2 migration handles this)
- token_issued_at = datetime.fromtimestamp(iat, tz=timezone.utc)
- pca = user.password_changed_at
- if pca.tzinfo is None:
- pca = pca.replace(tzinfo=timezone.utc)
- # JWT iat is whole seconds; truncate pca so tokens issued in the same second pass.
- pca = pca.replace(microsecond=0)
- return token_issued_at >= pca
- async def revoke_jti(jti: str, expires_at: datetime, username: str | None = None) -> None:
- """Store a revoked JWT jti so it is rejected on future requests.
- Silently ignores duplicate inserts (e.g. double-logout with the same token).
- """
- from sqlalchemy.exc import IntegrityError
- async with async_session() as db:
- revoked = AuthEphemeralToken(
- token=jti,
- token_type="revoked_jti",
- username=username,
- expires_at=expires_at,
- )
- db.add(revoked)
- try:
- await db.commit()
- except IntegrityError:
- await db.rollback() # jti already revoked — desired state, ignore
- async def is_jti_revoked(jti: str) -> bool:
- """Return True if the given jti has been revoked."""
- async with async_session() as db:
- result = await db.execute(
- select(AuthEphemeralToken).where(
- AuthEphemeralToken.token == jti,
- AuthEphemeralToken.token_type == "revoked_jti",
- )
- )
- return result.scalar_one_or_none() is not None
- async def get_user_by_username(db: AsyncSession, username: str) -> User | None:
- """Get a user by username (case-insensitive) with groups loaded for permission checks."""
- result = await db.execute(
- select(User).where(func.lower(User.username) == func.lower(username)).options(selectinload(User.groups))
- )
- return result.scalar_one_or_none()
- async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
- """Get a user by email (case-insensitive) with groups loaded for permission checks."""
- result = await db.execute(
- select(User).where(func.lower(User.email) == func.lower(email)).options(selectinload(User.groups))
- )
- return result.scalar_one_or_none()
- async def authenticate_user(db: AsyncSession, username: str, password: str) -> User | None:
- """Authenticate a user by username and password.
- Username lookup is case-insensitive. Password is case-sensitive.
- LDAP and OIDC users must authenticate via their respective providers.
- """
- user = await get_user_by_username(db, username)
- if not user:
- return None
- if getattr(user, "auth_source", "local") in ("ldap", "oidc"):
- return None # LDAP/OIDC users must authenticate via their provider
- if not user.password_hash or not verify_password(password, user.password_hash):
- return None
- if not user.is_active:
- return None
- return user
- async def authenticate_user_by_email(db: AsyncSession, email: str, password: str) -> User | None:
- """Authenticate a user by email and password.
- Email lookup is case-insensitive. Password is case-sensitive.
- LDAP and OIDC users must authenticate via their respective providers.
- """
- user = await get_user_by_email(db, email)
- if not user:
- return None
- if getattr(user, "auth_source", "local") in ("ldap", "oidc"):
- return None # LDAP/OIDC users must authenticate via their provider
- if not user.password_hash or not verify_password(password, user.password_hash):
- return None
- if not user.is_active:
- return None
- return user
- async def is_auth_enabled(db: AsyncSession) -> bool:
- """Check if authentication is enabled."""
- try:
- result = await db.execute(select(Settings).where(Settings.key == "auth_enabled"))
- setting = result.scalar_one_or_none()
- if setting is None:
- return False
- return setting.value.lower() == "true"
- except Exception:
- # If settings table doesn't exist or query fails, assume auth is disabled
- return False
- async def _user_from_api_key(db: AsyncSession, api_key: APIKey) -> User | None:
- """Resolve the owner of a validated API key, or None for legacy ownerless keys.
- Cloud routes (and any route that needs caller identity) read the returned
- User to look up per-user state like ``cloud_token``. Legacy keys created
- before #1182 have ``user_id IS NULL`` and stay anonymous — they keep working
- against non-cloud routes for backward compatibility, but cloud routes will
- surface a "recreate this key" error rather than 200 with empty results.
- """
- if api_key.user_id is None:
- return None
- result = await db.execute(select(User).where(User.id == api_key.user_id))
- user = result.scalar_one_or_none()
- if user is None or not user.is_active:
- # CASCADE on user delete should prevent a dangling user_id, but if
- # someone manually deactivates the owner the key shouldn't suddenly
- # gain an "anonymous" identity — drop the request to None so cloud
- # access fails closed.
- return None
- return user
- async def _validate_api_key(db: AsyncSession, api_key_value: str) -> APIKey | None:
- """Validate an API key and return the APIKey object if valid, None otherwise.
- L-1: Pre-filter by key_prefix (first 8 chars) before running pbkdf2 so only
- O(1) candidate rows are hashed instead of the full key table. The prefix is
- not secret (it is shown in the admin UI), so this does not reduce security.
- """
- try:
- # key_prefix is stored as "<first-8-chars>..." (e.g. "bb_Abc12...").
- # Matching on the first 8 chars of the submitted key reduces the scan to
- # at most one row in practice (2^40 collision space for 5 base64 chars).
- key_lookup = api_key_value[:8] if len(api_key_value) >= 8 else api_key_value
- result = await db.execute(
- select(APIKey).where(
- APIKey.enabled.is_(True),
- APIKey.key_prefix.like(
- key_lookup.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + "%", escape="\\"
- ),
- )
- )
- api_keys = result.scalars().all()
- for api_key in api_keys:
- if verify_password(api_key_value, api_key.key_hash):
- # Check expiration
- if api_key.expires_at:
- expires = api_key.expires_at
- if expires.tzinfo is None:
- expires = expires.replace(tzinfo=timezone.utc)
- if expires < datetime.now(timezone.utc):
- return None # Expired
- # Update last_used timestamp
- api_key.last_used = datetime.now(timezone.utc)
- await db.commit()
- return api_key
- except Exception as e:
- logger.warning("API key validation error: %s", e)
- return None
- async def get_current_user_optional(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- ) -> User | None:
- """Get the current authenticated user from JWT token, or None if not authenticated.
- Returns None only when NO credentials are supplied. If a token is supplied
- but invalid/revoked, raises 401 — a revoked token must not grant anonymous
- access (I6).
- """
- if credentials is None:
- return None
- _unauthorized = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- token = credentials.credentials
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise _unauthorized
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise _unauthorized # I6: revoked token → 401, not anonymous
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise _unauthorized
- async with async_session() as db:
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise _unauthorized
- if not _is_token_fresh(iat, user):
- raise _unauthorized
- return user
- async def get_current_user(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- ) -> User:
- """Get the current authenticated user from JWT token."""
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if credentials is None:
- raise credentials_exception
- try:
- token = credentials.credentials
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise credentials_exception
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise credentials_exception
- async with async_session() as db:
- user = await get_user_by_username(db, username)
- if user is None:
- raise credentials_exception
- if not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="User account is disabled",
- )
- if not _is_token_fresh(iat, user):
- raise credentials_exception
- return user
- async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)]) -> User:
- """Get the current active user (alias for clarity)."""
- return current_user
- async def require_auth_if_enabled(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- """Require authentication if auth is enabled, otherwise return None.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx). API keys return
- None for backward compatibility — routes that need the API-key owner (i.e.
- cloud routes for #1182) resolve it via their own router-level dependency
- that stashes ``request.state.api_key_owner``. Returning the owner here
- instead would silently grant API-keyed callers access to every route that
- fences via ``if current_user is None``, which is a wider surface than
- #1182 was designed to expose.
- """
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None # API key valid, allow access
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return user
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- def require_role(required_role: str):
- """Dependency factory for role-based access control."""
- async def role_checker(current_user: Annotated[User, Depends(get_current_user)]) -> User:
- if current_user.role != required_role:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Requires {required_role} role",
- )
- return current_user
- return role_checker
- def require_admin_if_auth_enabled():
- """Dependency factory that requires admin role if auth is enabled."""
- async def admin_checker(
- current_user: Annotated[User | None, Depends(require_auth_if_enabled)] = None,
- ) -> User | None:
- if current_user is None:
- return None # Auth not enabled, allow access
- if current_user.role != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Requires admin role",
- )
- return current_user
- return admin_checker
- def generate_api_key() -> tuple[str, str, str]:
- """Generate a new API key.
- Returns:
- tuple: (full_key, key_hash, key_prefix)
- - full_key: The complete API key (only shown once on creation)
- - key_hash: Hashed version for storage and verification
- - key_prefix: First 8 characters for display purposes
- """
- # Generate a secure random API key (32 bytes = 64 hex characters)
- full_key = f"bb_{secrets.token_urlsafe(32)}"
- key_hash = get_password_hash(full_key)
- key_prefix = full_key[:8] + "..." if len(full_key) > 8 else full_key
- return full_key, key_hash, key_prefix
- async def get_api_key(
- authorization: Annotated[str | None, Header(alias="Authorization")] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- db: AsyncSession = Depends(get_db),
- ) -> APIKey:
- """Get and validate API key from request headers.
- Checks both 'Authorization: Bearer <key>' and 'X-API-Key: <key>' headers.
- """
- api_key_value = None
- if x_api_key:
- api_key_value = x_api_key
- elif authorization and authorization.startswith("Bearer "):
- api_key_value = authorization.replace("Bearer ", "")
- if not api_key_value:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="API key required. Provide 'X-API-Key' header or 'Authorization: Bearer <key>'",
- )
- # Pre-filter by key_prefix to avoid O(n) pbkdf2 hashes across all enabled keys.
- key_lookup = api_key_value[:8] if len(api_key_value) >= 8 else api_key_value
- result = await db.execute(
- select(APIKey).where(
- APIKey.enabled.is_(True),
- APIKey.key_prefix.like(
- key_lookup.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + "%",
- escape="\\",
- ),
- )
- )
- api_keys = result.scalars().all()
- for api_key in api_keys:
- # Check if key matches (verify against hash)
- if verify_password(api_key_value, api_key.key_hash):
- # Check expiration
- if api_key.expires_at:
- expires = api_key.expires_at
- if expires.tzinfo is None:
- expires = expires.replace(tzinfo=timezone.utc)
- if expires < datetime.now(timezone.utc):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="API key has expired",
- )
- # Update last_used timestamp
- api_key.last_used = datetime.now(timezone.utc)
- await db.commit()
- return api_key
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- )
- async def caller_is_api_key(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> bool:
- """Return True when the request is authenticated via API key (X-API-Key or Bearer bb_xxx)."""
- if x_api_key:
- return True
- return credentials is not None and credentials.credentials.startswith("bb_")
- def check_permission(api_key: APIKey, permission: str) -> None:
- """Check if API key has the required permission.
- Args:
- api_key: The API key object
- permission: One of 'queue', 'control_printer', 'read_status'
- Raises:
- HTTPException: If permission is not granted
- """
- permission_map = {
- "queue": "can_queue",
- "control_printer": "can_control_printer",
- "read_status": "can_read_status",
- }
- if permission not in permission_map:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Unknown permission: {permission}",
- )
- attr_name = permission_map[permission]
- if not getattr(api_key, attr_name, False):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"API key does not have '{permission}' permission",
- )
- def check_printer_access(api_key: APIKey, printer_id: int) -> None:
- """Check if API key has access to the specified printer.
- Args:
- api_key: The API key object
- printer_id: The printer ID to check access for
- Raises:
- HTTPException: If access is denied
- """
- # None = global key, access to all printers
- if api_key.printer_ids is None:
- return
- # Empty list or printer not in allowed list = no access
- if printer_id not in api_key.printer_ids:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"API key does not have access to printer {printer_id}",
- )
- # Convenience dependencies - these are functions that return Depends objects
- def RequireAdmin():
- """Dependency that requires admin role."""
- return Depends(require_role("admin"))
- def RequireAdminIfAuthEnabled():
- """Dependency that requires admin role if auth is enabled."""
- return Depends(require_admin_if_auth_enabled())
- def require_permission(*permissions: str | Permission):
- """Dependency factory that requires user to have ALL specified permissions.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- Args:
- *permissions: Permission strings or Permission enum values to require
- Returns:
- A dependency function that validates permissions
- """
- # Convert Permission enums to strings
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def permission_checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- _check_apikey_permissions(perm_strings)
- return None # API key valid, allow access
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if credentials is None:
- raise credentials_exception
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- _check_apikey_permissions(perm_strings)
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise credentials_exception
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise credentials_exception
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise credentials_exception
- if not _is_token_fresh(iat, user):
- raise credentials_exception
- if not user.has_all_permissions(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- return permission_checker
- def require_permission_if_auth_enabled(*permissions: str | Permission):
- """Dependency factory that checks permissions only if auth is enabled.
- This provides backward compatibility - when auth is disabled, all access is allowed.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- Args:
- *permissions: Permission strings or Permission enum values to require
- Returns:
- A dependency function that validates permissions if auth is enabled
- """
- # Convert Permission enums to strings
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def permission_checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None # Auth disabled, allow access
- # Check for API key first (X-API-Key header). API-keyed requests
- # bypass the JWT permission check entirely — their scopes live on
- # the APIKey row (can_queue / can_control_printer / can_read_status
- # / can_access_cloud / printer_ids), and the dep returns None so
- # routes don't gain a synthetic User identity that would grant
- # access to fenced surfaces like long-lived-token management.
- # Cloud routes (#1182) resolve the API-key owner separately via
- # their own router-level dependency; see ``cloud.py``.
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- _check_apikey_permissions(perm_strings)
- return None # API key valid, allow access
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- _check_apikey_permissions(perm_strings)
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not user.has_all_permissions(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return permission_checker
- def RequirePermission(*permissions: str | Permission):
- """Convenience dependency that requires ALL specified permissions."""
- return Depends(require_permission(*permissions))
- def RequirePermissionIfAuthEnabled(*permissions: str | Permission):
- """Convenience dependency that requires permissions if auth is enabled."""
- return Depends(require_permission_if_auth_enabled(*permissions))
- def require_any_permission_if_auth_enabled(*permissions: str | Permission):
- """Dependency factory that requires AT LEAST ONE of the given permissions when auth is enabled."""
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None
- if credentials is not None:
- token = credentials.credentials
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not user.has_any_permission(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return checker
- def RequireAnyPermissionIfAuthEnabled(*permissions: str | Permission):
- """Convenience dependency that requires AT LEAST ONE of the given permissions when auth is enabled."""
- return Depends(require_any_permission_if_auth_enabled(*permissions))
- def require_camera_stream_token_if_auth_enabled():
- """Dependency that validates a camera stream token query param when auth is enabled.
- Used for camera stream/snapshot endpoints that are loaded via <img> tags
- which cannot send Authorization headers. The frontend obtains a token from
- POST /printers/camera/stream-token and appends it as ?token=xxx.
- """
- async def checker(token: str | None = None) -> None:
- async with async_session() as db:
- if not await is_auth_enabled(db):
- return # Auth disabled, allow access
- if not token or not await verify_camera_stream_token(token):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Valid camera stream token required. Obtain one from POST /api/v1/printers/camera/stream-token",
- )
- return checker
- RequireCameraStreamTokenIfAuthEnabled = Depends(require_camera_stream_token_if_auth_enabled())
- def require_ownership_permission(
- all_permission: str | Permission,
- own_permission: str | Permission,
- ):
- """Dependency factory for ownership-based permission checks.
- - User with `all_permission` can modify any item
- - User with `own_permission` can only modify items where created_by_id == user.id
- - Ownerless items (created_by_id = null) require `all_permission`
- - API keys (via X-API-Key header or Bearer bb_xxx) get full access (can_modify_all=True)
- Returns:
- A dependency function that returns (user, can_modify_all).
- - can_modify_all=True: user can modify any item
- - can_modify_all=False: user can only modify their own items
- """
- all_perm = all_permission.value if isinstance(all_permission, Permission) else all_permission
- own_perm = own_permission.value if isinstance(own_permission, Permission) else own_permission
- async def checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> tuple[User | None, bool]:
- """Returns (user, can_modify_all).
- - can_modify_all=True: user can modify any item
- - can_modify_all=False: user can only modify their own items
- """
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None, True # Auth disabled, allow all
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None, True # API key valid, allow all
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None, True # API key valid, allow all
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- jti: str | None = payload.get("jti")
- if not jti or await is_jti_revoked(jti):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- iat: int | float | None = payload.get("iat")
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not _is_token_fresh(iat, user):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if user.has_permission(all_perm):
- return user, True
- if user.has_permission(own_perm):
- return user, False
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing permission: {own_perm} or {all_perm}",
- )
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return checker
|