| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863 |
- from __future__ import annotations
- import logging
- import os
- import secrets
- from datetime import datetime, timedelta, timezone
- from pathlib import Path
- from typing import Annotated
- import jwt
- from fastapi import Depends, Header, HTTPException, status
- from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
- from jwt.exceptions import PyJWTError as JWTError
- from passlib.context import CryptContext
- from sqlalchemy import func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import selectinload
- from backend.app.core.database import async_session, get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.api_key import APIKey
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- logger = logging.getLogger(__name__)
- # Password hashing
- # Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues
- # pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations
- pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- def _get_jwt_secret() -> str:
- """Get the JWT secret key from environment, file, or generate a new one.
- Priority:
- 1. JWT_SECRET_KEY environment variable
- 2. .jwt_secret file in data directory
- 3. Generate new random secret and save to file
- Returns:
- The JWT secret key
- """
- # 1. Check environment variable first
- env_secret = os.environ.get("JWT_SECRET_KEY")
- if env_secret:
- logger.info("Using JWT secret from JWT_SECRET_KEY environment variable")
- return env_secret
- # 2. Check for secret file in data directory
- # Use DATA_DIR env var (same as rest of app), fallback to data/ subdirectory
- data_dir_env = os.environ.get("DATA_DIR")
- if data_dir_env:
- data_dir = Path(data_dir_env)
- else:
- # Fallback to data/ subdirectory under project root (not project root itself!)
- data_dir = Path(__file__).parent.parent.parent.parent / "data"
- secret_file = data_dir / ".jwt_secret"
- if secret_file.exists():
- try:
- secret = secret_file.read_text().strip()
- if secret and len(secret) >= 32:
- logger.info("Using JWT secret from %s", secret_file)
- return secret
- except OSError as e:
- logger.warning("Failed to read JWT secret file: %s", e)
- # 3. Generate new random secret
- new_secret = secrets.token_urlsafe(64)
- # Try to save it
- try:
- data_dir.mkdir(parents=True, exist_ok=True)
- # Note: CodeQL flags this as "clear-text storage of sensitive information" but this is
- # intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions,
- # and this is standard practice for self-hosted applications (same as .env files).
- secret_file.write_text(new_secret) # nosec B105
- # Restrict permissions (owner read/write only)
- secret_file.chmod(0o600)
- logger.info("Generated new JWT secret and saved to %s", secret_file)
- except OSError as e:
- logger.warning(
- "Could not save JWT secret to file (%s). "
- "Secret will be regenerated on restart, invalidating existing tokens. "
- "Set JWT_SECRET_KEY environment variable for persistence.",
- e,
- )
- return new_secret
- # JWT settings
- SECRET_KEY = _get_jwt_secret()
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
- # HTTP Bearer token
- security = HTTPBearer(auto_error=False)
- # --- Slicer download tokens ---
- # Short-lived tokens for slicer protocol handlers that can't send auth headers.
- # Maps token → (resource_key, expiry). resource_key = "archive:{id}" or "library:{id}".
- _slicer_tokens: dict[str, tuple[str, datetime]] = {}
- SLICER_TOKEN_EXPIRE_MINUTES = 5
- def create_slicer_download_token(resource_type: str, resource_id: int) -> str:
- """Create a short-lived download token for slicer protocol handlers."""
- # Cleanup expired tokens
- now = datetime.now(timezone.utc)
- expired = [k for k, (_, exp) in _slicer_tokens.items() if exp < now]
- for k in expired:
- del _slicer_tokens[k]
- token = secrets.token_urlsafe(24)
- resource_key = f"{resource_type}:{resource_id}"
- _slicer_tokens[token] = (resource_key, now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES))
- return token
- def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool:
- """Verify a slicer download token is valid for the given resource."""
- entry = _slicer_tokens.get(token)
- if not entry:
- return False
- resource_key, expiry = entry
- if datetime.now(timezone.utc) > expiry:
- del _slicer_tokens[token]
- return False
- expected_key = f"{resource_type}:{resource_id}"
- if resource_key != expected_key:
- return False
- # Token is single-use
- del _slicer_tokens[token]
- return True
- # --- Camera stream tokens ---
- # Reusable tokens for camera stream/snapshot endpoints loaded via <img> tags.
- # Unlike slicer tokens, these are NOT single-use (streams reconnect on errors)
- # and have a longer expiry. Maps token → expiry.
- _camera_stream_tokens: dict[str, datetime] = {}
- CAMERA_STREAM_TOKEN_EXPIRE_MINUTES = 60
- def create_camera_stream_token() -> str:
- """Create a reusable token for camera stream/snapshot access."""
- now = datetime.now(timezone.utc)
- # Cleanup expired tokens
- expired = [k for k, exp in _camera_stream_tokens.items() if exp < now]
- for k in expired:
- del _camera_stream_tokens[k]
- token = secrets.token_urlsafe(24)
- _camera_stream_tokens[token] = now + timedelta(minutes=CAMERA_STREAM_TOKEN_EXPIRE_MINUTES)
- return token
- def verify_camera_stream_token(token: str) -> bool:
- """Verify a camera stream token is valid."""
- expiry = _camera_stream_tokens.get(token)
- if not expiry:
- return False
- if datetime.now(timezone.utc) > expiry:
- del _camera_stream_tokens[token]
- return False
- return True
- def verify_password(plain_password: str, hashed_password: str) -> bool:
- """Verify a password against a hash.
- Uses pbkdf2_sha256 which handles long passwords automatically.
- """
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password: str) -> str:
- """Hash a password.
- Uses pbkdf2_sha256 which is secure and has no password length limit.
- """
- return pwd_context.hash(password)
- def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
- """Create a JWT access token."""
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.now(timezone.utc) + expires_delta
- else:
- expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- async def get_user_by_username(db: AsyncSession, username: str) -> User | None:
- """Get a user by username (case-insensitive) with groups loaded for permission checks."""
- result = await db.execute(
- select(User).where(func.lower(User.username) == func.lower(username)).options(selectinload(User.groups))
- )
- return result.scalar_one_or_none()
- async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
- """Get a user by email (case-insensitive) with groups loaded for permission checks."""
- result = await db.execute(
- select(User).where(func.lower(User.email) == func.lower(email)).options(selectinload(User.groups))
- )
- return result.scalar_one_or_none()
- async def authenticate_user(db: AsyncSession, username: str, password: str) -> User | None:
- """Authenticate a user by username and password.
- Username lookup is case-insensitive. Password is case-sensitive.
- """
- user = await get_user_by_username(db, username)
- if not user:
- return None
- if getattr(user, "auth_source", "local") == "ldap":
- return None # LDAP users authenticate via LDAP, not local password
- if not user.password_hash or not verify_password(password, user.password_hash):
- return None
- if not user.is_active:
- return None
- return user
- async def authenticate_user_by_email(db: AsyncSession, email: str, password: str) -> User | None:
- """Authenticate a user by email and password.
- Email lookup is case-insensitive. Password is case-sensitive.
- """
- user = await get_user_by_email(db, email)
- if not user:
- return None
- if getattr(user, "auth_source", "local") == "ldap":
- return None
- if not user.password_hash or not verify_password(password, user.password_hash):
- return None
- if not user.is_active:
- return None
- return user
- async def is_auth_enabled(db: AsyncSession) -> bool:
- """Check if authentication is enabled."""
- try:
- result = await db.execute(select(Settings).where(Settings.key == "auth_enabled"))
- setting = result.scalar_one_or_none()
- if setting is None:
- return False
- return setting.value.lower() == "true"
- except Exception:
- # If settings table doesn't exist or query fails, assume auth is disabled
- return False
- async def _validate_api_key(db: AsyncSession, api_key_value: str) -> APIKey | None:
- """Validate an API key and return the APIKey object if valid, None otherwise.
- This is an internal helper used by auth functions to check API keys.
- """
- try:
- result = await db.execute(select(APIKey).where(APIKey.enabled.is_(True)))
- api_keys = result.scalars().all()
- for api_key in api_keys:
- if verify_password(api_key_value, api_key.key_hash):
- # Check expiration
- if api_key.expires_at:
- expires = api_key.expires_at
- if expires.tzinfo is None:
- expires = expires.replace(tzinfo=timezone.utc)
- if expires < datetime.now(timezone.utc):
- return None # Expired
- # Update last_used timestamp
- api_key.last_used = datetime.now(timezone.utc)
- await db.commit()
- return api_key
- except Exception as e:
- logger.warning("API key validation error: %s", e)
- return None
- async def get_current_user_optional(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- ) -> User | None:
- """Get the current authenticated user from JWT token, or None if not authenticated."""
- if credentials is None:
- return None
- try:
- token = credentials.credentials
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- return None
- except JWTError:
- return None
- async with async_session() as db:
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- return None
- return user
- async def get_current_user(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- ) -> User:
- """Get the current authenticated user from JWT token."""
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if credentials is None:
- raise credentials_exception
- try:
- token = credentials.credentials
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- except JWTError:
- raise credentials_exception
- async with async_session() as db:
- user = await get_user_by_username(db, username)
- if user is None:
- raise credentials_exception
- if not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="User account is disabled",
- )
- return user
- async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)]) -> User:
- """Get the current active user (alias for clarity)."""
- return current_user
- async def require_auth_if_enabled(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- """Require authentication if auth is enabled, otherwise return None.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- """
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None # API key valid, allow access
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return user
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- def require_role(required_role: str):
- """Dependency factory for role-based access control."""
- async def role_checker(current_user: Annotated[User, Depends(get_current_user)]) -> User:
- if current_user.role != required_role:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Requires {required_role} role",
- )
- return current_user
- return role_checker
- def require_admin_if_auth_enabled():
- """Dependency factory that requires admin role if auth is enabled."""
- async def admin_checker(
- current_user: Annotated[User | None, Depends(require_auth_if_enabled)] = None,
- ) -> User | None:
- if current_user is None:
- return None # Auth not enabled, allow access
- if current_user.role != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Requires admin role",
- )
- return current_user
- return admin_checker
- def generate_api_key() -> tuple[str, str, str]:
- """Generate a new API key.
- Returns:
- tuple: (full_key, key_hash, key_prefix)
- - full_key: The complete API key (only shown once on creation)
- - key_hash: Hashed version for storage and verification
- - key_prefix: First 8 characters for display purposes
- """
- # Generate a secure random API key (32 bytes = 64 hex characters)
- full_key = f"bb_{secrets.token_urlsafe(32)}"
- key_hash = get_password_hash(full_key)
- key_prefix = full_key[:8] + "..." if len(full_key) > 8 else full_key
- return full_key, key_hash, key_prefix
- async def get_api_key(
- authorization: Annotated[str | None, Header(alias="Authorization")] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- db: AsyncSession = Depends(get_db),
- ) -> APIKey:
- """Get and validate API key from request headers.
- Checks both 'Authorization: Bearer <key>' and 'X-API-Key: <key>' headers.
- """
- api_key_value = None
- if x_api_key:
- api_key_value = x_api_key
- elif authorization and authorization.startswith("Bearer "):
- api_key_value = authorization.replace("Bearer ", "")
- if not api_key_value:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="API key required. Provide 'X-API-Key' header or 'Authorization: Bearer <key>'",
- )
- # Get all API keys and check them
- result = await db.execute(select(APIKey).where(APIKey.enabled.is_(True)))
- api_keys = result.scalars().all()
- for api_key in api_keys:
- # Check if key matches (verify against hash)
- if verify_password(api_key_value, api_key.key_hash):
- # Check expiration
- if api_key.expires_at:
- expires = api_key.expires_at
- if expires.tzinfo is None:
- expires = expires.replace(tzinfo=timezone.utc)
- if expires < datetime.now(timezone.utc):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="API key has expired",
- )
- # Update last_used timestamp
- api_key.last_used = datetime.now(timezone.utc)
- await db.commit()
- return api_key
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- )
- def check_permission(api_key: APIKey, permission: str) -> None:
- """Check if API key has the required permission.
- Args:
- api_key: The API key object
- permission: One of 'queue', 'control_printer', 'read_status'
- Raises:
- HTTPException: If permission is not granted
- """
- permission_map = {
- "queue": "can_queue",
- "control_printer": "can_control_printer",
- "read_status": "can_read_status",
- }
- if permission not in permission_map:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Unknown permission: {permission}",
- )
- attr_name = permission_map[permission]
- if not getattr(api_key, attr_name, False):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"API key does not have '{permission}' permission",
- )
- def check_printer_access(api_key: APIKey, printer_id: int) -> None:
- """Check if API key has access to the specified printer.
- Args:
- api_key: The API key object
- printer_id: The printer ID to check access for
- Raises:
- HTTPException: If access is denied
- """
- # None = global key, access to all printers
- if api_key.printer_ids is None:
- return
- # Empty list or printer not in allowed list = no access
- if printer_id not in api_key.printer_ids:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"API key does not have access to printer {printer_id}",
- )
- # Convenience dependencies - these are functions that return Depends objects
- def RequireAdmin():
- """Dependency that requires admin role."""
- return Depends(require_role("admin"))
- def RequireAdminIfAuthEnabled():
- """Dependency that requires admin role if auth is enabled."""
- return Depends(require_admin_if_auth_enabled())
- def require_permission(*permissions: str | Permission):
- """Dependency factory that requires user to have ALL specified permissions.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- Args:
- *permissions: Permission strings or Permission enum values to require
- Returns:
- A dependency function that validates permissions
- """
- # Convert Permission enums to strings
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def permission_checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None # API key valid, allow access
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if credentials is None:
- raise credentials_exception
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- except JWTError:
- raise credentials_exception
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise credentials_exception
- if not user.has_all_permissions(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- return permission_checker
- def require_permission_if_auth_enabled(*permissions: str | Permission):
- """Dependency factory that checks permissions only if auth is enabled.
- This provides backward compatibility - when auth is disabled, all access is allowed.
- Accepts both JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- Args:
- *permissions: Permission strings or Permission enum values to require
- Returns:
- A dependency function that validates permissions if auth is enabled
- """
- # Convert Permission enums to strings
- perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
- async def permission_checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> User | None:
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None # Auth disabled, allow access
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None # API key valid, allow access
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None # API key valid, allow access
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not user.has_all_permissions(*perm_strings):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing required permissions: {', '.join(perm_strings)}",
- )
- return user
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return permission_checker
- def RequirePermission(*permissions: str | Permission):
- """Convenience dependency that requires ALL specified permissions."""
- return Depends(require_permission(*permissions))
- def RequirePermissionIfAuthEnabled(*permissions: str | Permission):
- """Convenience dependency that requires permissions if auth is enabled."""
- return Depends(require_permission_if_auth_enabled(*permissions))
- def require_camera_stream_token_if_auth_enabled():
- """Dependency that validates a camera stream token query param when auth is enabled.
- Used for camera stream/snapshot endpoints that are loaded via <img> tags
- which cannot send Authorization headers. The frontend obtains a token from
- POST /printers/camera/stream-token and appends it as ?token=xxx.
- """
- async def checker(token: str | None = None) -> None:
- async with async_session() as db:
- if not await is_auth_enabled(db):
- return # Auth disabled, allow access
- if not token or not verify_camera_stream_token(token):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Valid camera stream token required. Obtain one from POST /api/v1/printers/camera/stream-token",
- )
- return checker
- RequireCameraStreamTokenIfAuthEnabled = Depends(require_camera_stream_token_if_auth_enabled())
- def require_ownership_permission(
- all_permission: str | Permission,
- own_permission: str | Permission,
- ):
- """Dependency factory for ownership-based permission checks.
- - User with `all_permission` can modify any item
- - User with `own_permission` can only modify items where created_by_id == user.id
- - Ownerless items (created_by_id = null) require `all_permission`
- - API keys (via X-API-Key header or Bearer bb_xxx) get full access (can_modify_all=True)
- Returns:
- A dependency function that returns (user, can_modify_all).
- - can_modify_all=True: user can modify any item
- - can_modify_all=False: user can only modify their own items
- """
- all_perm = all_permission.value if isinstance(all_permission, Permission) else all_permission
- own_perm = own_permission.value if isinstance(own_permission, Permission) else own_permission
- async def checker(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- ) -> tuple[User | None, bool]:
- """Returns (user, can_modify_all).
- - can_modify_all=True: user can modify any item
- - can_modify_all=False: user can only modify their own items
- """
- async with async_session() as db:
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- return None, True # Auth disabled, allow all
- # Check for API key first (X-API-Key header)
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return None, True # API key valid, allow all
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return None, True # API key valid, allow all
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if user.has_permission(all_perm):
- return user, True
- if user.has_permission(own_perm):
- return user, False
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Missing permission: {own_perm} or {all_perm}",
- )
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return checker
|