from __future__ import annotations
import logging
import os
import secrets
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Annotated
import jwt
from fastapi import Depends, Header, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jwt.exceptions import PyJWTError as JWTError
from passlib.context import CryptContext
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from backend.app.core.database import async_session, get_db
from backend.app.core.permissions import Permission
from backend.app.models.api_key import APIKey
from backend.app.models.settings import Settings
from backend.app.models.user import User
logger = logging.getLogger(__name__)
# Password hashing
# Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues
# pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
def _get_jwt_secret() -> str:
"""Get the JWT secret key from environment, file, or generate a new one.
Priority:
1. JWT_SECRET_KEY environment variable
2. .jwt_secret file in data directory
3. Generate new random secret and save to file
Returns:
The JWT secret key
"""
# 1. Check environment variable first
env_secret = os.environ.get("JWT_SECRET_KEY")
if env_secret:
logger.info("Using JWT secret from JWT_SECRET_KEY environment variable")
return env_secret
# 2. Check for secret file in data directory
# Use DATA_DIR env var (same as rest of app), fallback to data/ subdirectory
data_dir_env = os.environ.get("DATA_DIR")
if data_dir_env:
data_dir = Path(data_dir_env)
else:
# Fallback to data/ subdirectory under project root (not project root itself!)
data_dir = Path(__file__).parent.parent.parent.parent / "data"
secret_file = data_dir / ".jwt_secret"
if secret_file.exists():
try:
secret = secret_file.read_text().strip()
if secret and len(secret) >= 32:
logger.info("Using JWT secret from %s", secret_file)
return secret
except OSError as e:
logger.warning("Failed to read JWT secret file: %s", e)
# 3. Generate new random secret
new_secret = secrets.token_urlsafe(64)
# Try to save it
try:
data_dir.mkdir(parents=True, exist_ok=True)
# Note: CodeQL flags this as "clear-text storage of sensitive information" but this is
# intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions,
# and this is standard practice for self-hosted applications (same as .env files).
secret_file.write_text(new_secret) # nosec B105
# Restrict permissions (owner read/write only)
secret_file.chmod(0o600)
logger.info("Generated new JWT secret and saved to %s", secret_file)
except OSError as e:
logger.warning(
"Could not save JWT secret to file (%s). "
"Secret will be regenerated on restart, invalidating existing tokens. "
"Set JWT_SECRET_KEY environment variable for persistence.",
e,
)
return new_secret
# JWT settings
SECRET_KEY = _get_jwt_secret()
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
# HTTP Bearer token
security = HTTPBearer(auto_error=False)
# --- Slicer download tokens ---
# Short-lived tokens for slicer protocol handlers that can't send auth headers.
# Maps token → (resource_key, expiry). resource_key = "archive:{id}" or "library:{id}".
_slicer_tokens: dict[str, tuple[str, datetime]] = {}
SLICER_TOKEN_EXPIRE_MINUTES = 5
def create_slicer_download_token(resource_type: str, resource_id: int) -> str:
"""Create a short-lived download token for slicer protocol handlers."""
# Cleanup expired tokens
now = datetime.now(timezone.utc)
expired = [k for k, (_, exp) in _slicer_tokens.items() if exp < now]
for k in expired:
del _slicer_tokens[k]
token = secrets.token_urlsafe(24)
resource_key = f"{resource_type}:{resource_id}"
_slicer_tokens[token] = (resource_key, now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES))
return token
def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool:
"""Verify a slicer download token is valid for the given resource."""
entry = _slicer_tokens.get(token)
if not entry:
return False
resource_key, expiry = entry
if datetime.now(timezone.utc) > expiry:
del _slicer_tokens[token]
return False
expected_key = f"{resource_type}:{resource_id}"
if resource_key != expected_key:
return False
# Token is single-use
del _slicer_tokens[token]
return True
# --- Camera stream tokens ---
# Reusable tokens for camera stream/snapshot endpoints loaded via
tags.
# Unlike slicer tokens, these are NOT single-use (streams reconnect on errors)
# and have a longer expiry. Maps token → expiry.
_camera_stream_tokens: dict[str, datetime] = {}
CAMERA_STREAM_TOKEN_EXPIRE_MINUTES = 60
def create_camera_stream_token() -> str:
"""Create a reusable token for camera stream/snapshot access."""
now = datetime.now(timezone.utc)
# Cleanup expired tokens
expired = [k for k, exp in _camera_stream_tokens.items() if exp < now]
for k in expired:
del _camera_stream_tokens[k]
token = secrets.token_urlsafe(24)
_camera_stream_tokens[token] = now + timedelta(minutes=CAMERA_STREAM_TOKEN_EXPIRE_MINUTES)
return token
def verify_camera_stream_token(token: str) -> bool:
"""Verify a camera stream token is valid."""
expiry = _camera_stream_tokens.get(token)
if not expiry:
return False
if datetime.now(timezone.utc) > expiry:
del _camera_stream_tokens[token]
return False
return True
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash.
Uses pbkdf2_sha256 which handles long passwords automatically.
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password.
Uses pbkdf2_sha256 which is secure and has no password length limit.
"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_user_by_username(db: AsyncSession, username: str) -> User | None:
"""Get a user by username (case-insensitive) with groups loaded for permission checks."""
result = await db.execute(
select(User).where(func.lower(User.username) == func.lower(username)).options(selectinload(User.groups))
)
return result.scalar_one_or_none()
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
"""Get a user by email (case-insensitive) with groups loaded for permission checks."""
result = await db.execute(
select(User).where(func.lower(User.email) == func.lower(email)).options(selectinload(User.groups))
)
return result.scalar_one_or_none()
async def authenticate_user(db: AsyncSession, username: str, password: str) -> User | None:
"""Authenticate a user by username and password.
Username lookup is case-insensitive. Password is case-sensitive.
"""
user = await get_user_by_username(db, username)
if not user:
return None
if not verify_password(password, user.password_hash):
return None
if not user.is_active:
return None
return user
async def authenticate_user_by_email(db: AsyncSession, email: str, password: str) -> User | None:
"""Authenticate a user by email and password.
Email lookup is case-insensitive. Password is case-sensitive.
"""
user = await get_user_by_email(db, email)
if not user:
return None
if not verify_password(password, user.password_hash):
return None
if not user.is_active:
return None
return user
async def is_auth_enabled(db: AsyncSession) -> bool:
"""Check if authentication is enabled."""
try:
result = await db.execute(select(Settings).where(Settings.key == "auth_enabled"))
setting = result.scalar_one_or_none()
if setting is None:
return False
return setting.value.lower() == "true"
except Exception:
# If settings table doesn't exist or query fails, assume auth is disabled
return False
async def _validate_api_key(db: AsyncSession, api_key_value: str) -> APIKey | None:
"""Validate an API key and return the APIKey object if valid, None otherwise.
This is an internal helper used by auth functions to check API keys.
"""
try:
result = await db.execute(select(APIKey).where(APIKey.enabled.is_(True)))
api_keys = result.scalars().all()
for api_key in api_keys:
if verify_password(api_key_value, api_key.key_hash):
# Check expiration
if api_key.expires_at:
expires = api_key.expires_at
if expires.tzinfo is None:
expires = expires.replace(tzinfo=timezone.utc)
if expires < datetime.now(timezone.utc):
return None # Expired
# Update last_used timestamp
api_key.last_used = datetime.now(timezone.utc)
await db.commit()
return api_key
except Exception as e:
logger.warning("API key validation error: %s", e)
return None
async def get_current_user_optional(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
) -> User | None:
"""Get the current authenticated user from JWT token, or None if not authenticated."""
if credentials is None:
return None
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
except JWTError:
return None
async with async_session() as db:
user = await get_user_by_username(db, username)
if user is None or not user.is_active:
return None
return user
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
) -> User:
"""Get the current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if credentials is None:
raise credentials_exception
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
async with async_session() as db:
user = await get_user_by_username(db, username)
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled",
)
return user
async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)]) -> User:
"""Get the current active user (alias for clarity)."""
return current_user
async def require_auth_if_enabled(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
) -> User | None:
"""Require authentication if auth is enabled, otherwise return None.
Accepts both JWT tokens (via Authorization: Bearer header) and API keys
(via X-API-Key header or Authorization: Bearer bb_xxx).
"""
async with async_session() as db:
auth_enabled = await is_auth_enabled(db)
if not auth_enabled:
return None
# Check for API key first (X-API-Key header)
if x_api_key:
api_key = await _validate_api_key(db, x_api_key)
if api_key:
return None # API key valid, allow access
# Check for Bearer token (could be JWT or API key)
if credentials is not None:
token = credentials.credentials
# Check if it's an API key (starts with bb_)
if token.startswith("bb_"):
api_key = await _validate_api_key(db, token)
if api_key:
return None # API key valid, allow access
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
# Otherwise treat as JWT
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await get_user_by_username(db, username)
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# No credentials provided
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
def require_role(required_role: str):
"""Dependency factory for role-based access control."""
async def role_checker(current_user: Annotated[User, Depends(get_current_user)]) -> User:
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires {required_role} role",
)
return current_user
return role_checker
def require_admin_if_auth_enabled():
"""Dependency factory that requires admin role if auth is enabled."""
async def admin_checker(
current_user: Annotated[User | None, Depends(require_auth_if_enabled)] = None,
) -> User | None:
if current_user is None:
return None # Auth not enabled, allow access
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Requires admin role",
)
return current_user
return admin_checker
def generate_api_key() -> tuple[str, str, str]:
"""Generate a new API key.
Returns:
tuple: (full_key, key_hash, key_prefix)
- full_key: The complete API key (only shown once on creation)
- key_hash: Hashed version for storage and verification
- key_prefix: First 8 characters for display purposes
"""
# Generate a secure random API key (32 bytes = 64 hex characters)
full_key = f"bb_{secrets.token_urlsafe(32)}"
key_hash = get_password_hash(full_key)
key_prefix = full_key[:8] + "..." if len(full_key) > 8 else full_key
return full_key, key_hash, key_prefix
async def get_api_key(
authorization: Annotated[str | None, Header(alias="Authorization")] = None,
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
db: AsyncSession = Depends(get_db),
) -> APIKey:
"""Get and validate API key from request headers.
Checks both 'Authorization: Bearer ' and 'X-API-Key: ' headers.
"""
api_key_value = None
if x_api_key:
api_key_value = x_api_key
elif authorization and authorization.startswith("Bearer "):
api_key_value = authorization.replace("Bearer ", "")
if not api_key_value:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key required. Provide 'X-API-Key' header or 'Authorization: Bearer '",
)
# Get all API keys and check them
result = await db.execute(select(APIKey).where(APIKey.enabled.is_(True)))
api_keys = result.scalars().all()
for api_key in api_keys:
# Check if key matches (verify against hash)
if verify_password(api_key_value, api_key.key_hash):
# Check expiration
if api_key.expires_at:
expires = api_key.expires_at
if expires.tzinfo is None:
expires = expires.replace(tzinfo=timezone.utc)
if expires < datetime.now(timezone.utc):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key has expired",
)
# Update last_used timestamp
api_key.last_used = datetime.now(timezone.utc)
await db.commit()
return api_key
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
)
def check_permission(api_key: APIKey, permission: str) -> None:
"""Check if API key has the required permission.
Args:
api_key: The API key object
permission: One of 'queue', 'control_printer', 'read_status'
Raises:
HTTPException: If permission is not granted
"""
permission_map = {
"queue": "can_queue",
"control_printer": "can_control_printer",
"read_status": "can_read_status",
}
if permission not in permission_map:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unknown permission: {permission}",
)
attr_name = permission_map[permission]
if not getattr(api_key, attr_name, False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"API key does not have '{permission}' permission",
)
def check_printer_access(api_key: APIKey, printer_id: int) -> None:
"""Check if API key has access to the specified printer.
Args:
api_key: The API key object
printer_id: The printer ID to check access for
Raises:
HTTPException: If access is denied
"""
# None = global key, access to all printers
if api_key.printer_ids is None:
return
# Empty list or printer not in allowed list = no access
if printer_id not in api_key.printer_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"API key does not have access to printer {printer_id}",
)
# Convenience dependencies - these are functions that return Depends objects
def RequireAdmin():
"""Dependency that requires admin role."""
return Depends(require_role("admin"))
def RequireAdminIfAuthEnabled():
"""Dependency that requires admin role if auth is enabled."""
return Depends(require_admin_if_auth_enabled())
def require_permission(*permissions: str | Permission):
"""Dependency factory that requires user to have ALL specified permissions.
Accepts both JWT tokens (via Authorization: Bearer header) and API keys
(via X-API-Key header or Authorization: Bearer bb_xxx).
Args:
*permissions: Permission strings or Permission enum values to require
Returns:
A dependency function that validates permissions
"""
# Convert Permission enums to strings
perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
async def permission_checker(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
) -> User | None:
async with async_session() as db:
# Check for API key first (X-API-Key header)
if x_api_key:
api_key = await _validate_api_key(db, x_api_key)
if api_key:
return None # API key valid, allow access
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if credentials is None:
raise credentials_exception
token = credentials.credentials
# Check if it's an API key (starts with bb_)
if token.startswith("bb_"):
api_key = await _validate_api_key(db, token)
if api_key:
return None # API key valid, allow access
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
# Otherwise treat as JWT
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await get_user_by_username(db, username)
if user is None or not user.is_active:
raise credentials_exception
if not user.has_all_permissions(*perm_strings):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {', '.join(perm_strings)}",
)
return user
return permission_checker
def require_permission_if_auth_enabled(*permissions: str | Permission):
"""Dependency factory that checks permissions only if auth is enabled.
This provides backward compatibility - when auth is disabled, all access is allowed.
Accepts both JWT tokens (via Authorization: Bearer header) and API keys
(via X-API-Key header or Authorization: Bearer bb_xxx).
Args:
*permissions: Permission strings or Permission enum values to require
Returns:
A dependency function that validates permissions if auth is enabled
"""
# Convert Permission enums to strings
perm_strings = [p.value if isinstance(p, Permission) else p for p in permissions]
async def permission_checker(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
) -> User | None:
async with async_session() as db:
auth_enabled = await is_auth_enabled(db)
if not auth_enabled:
return None # Auth disabled, allow access
# Check for API key first (X-API-Key header)
if x_api_key:
api_key = await _validate_api_key(db, x_api_key)
if api_key:
return None # API key valid, allow access
# Check for Bearer token (could be JWT or API key)
if credentials is not None:
token = credentials.credentials
# Check if it's an API key (starts with bb_)
if token.startswith("bb_"):
api_key = await _validate_api_key(db, token)
if api_key:
return None # API key valid, allow access
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
# Otherwise treat as JWT
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await get_user_by_username(db, username)
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.has_all_permissions(*perm_strings):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {', '.join(perm_strings)}",
)
return user
# No credentials provided
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
return permission_checker
def RequirePermission(*permissions: str | Permission):
"""Convenience dependency that requires ALL specified permissions."""
return Depends(require_permission(*permissions))
def RequirePermissionIfAuthEnabled(*permissions: str | Permission):
"""Convenience dependency that requires permissions if auth is enabled."""
return Depends(require_permission_if_auth_enabled(*permissions))
def require_camera_stream_token_if_auth_enabled():
"""Dependency that validates a camera stream token query param when auth is enabled.
Used for camera stream/snapshot endpoints that are loaded via
tags
which cannot send Authorization headers. The frontend obtains a token from
POST /printers/camera/stream-token and appends it as ?token=xxx.
"""
async def checker(token: str | None = None) -> None:
async with async_session() as db:
if not await is_auth_enabled(db):
return # Auth disabled, allow access
if not token or not verify_camera_stream_token(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Valid camera stream token required. Obtain one from POST /api/v1/printers/camera/stream-token",
)
return checker
RequireCameraStreamTokenIfAuthEnabled = Depends(require_camera_stream_token_if_auth_enabled())
def require_ownership_permission(
all_permission: str | Permission,
own_permission: str | Permission,
):
"""Dependency factory for ownership-based permission checks.
- User with `all_permission` can modify any item
- User with `own_permission` can only modify items where created_by_id == user.id
- Ownerless items (created_by_id = null) require `all_permission`
- API keys (via X-API-Key header or Bearer bb_xxx) get full access (can_modify_all=True)
Returns:
A dependency function that returns (user, can_modify_all).
- can_modify_all=True: user can modify any item
- can_modify_all=False: user can only modify their own items
"""
all_perm = all_permission.value if isinstance(all_permission, Permission) else all_permission
own_perm = own_permission.value if isinstance(own_permission, Permission) else own_permission
async def checker(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
) -> tuple[User | None, bool]:
"""Returns (user, can_modify_all).
- can_modify_all=True: user can modify any item
- can_modify_all=False: user can only modify their own items
"""
async with async_session() as db:
auth_enabled = await is_auth_enabled(db)
if not auth_enabled:
return None, True # Auth disabled, allow all
# Check for API key first (X-API-Key header)
if x_api_key:
api_key = await _validate_api_key(db, x_api_key)
if api_key:
return None, True # API key valid, allow all
# Check for Bearer token (could be JWT or API key)
if credentials is not None:
token = credentials.credentials
# Check if it's an API key (starts with bb_)
if token.startswith("bb_"):
api_key = await _validate_api_key(db, token)
if api_key:
return None, True # API key valid, allow all
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
# Otherwise treat as JWT
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await get_user_by_username(db, username)
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if user.has_permission(all_perm):
return user, True
if user.has_permission(own_perm):
return user, False
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing permission: {own_perm} or {all_perm}",
)
# No credentials provided
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
return checker