from __future__ import annotations import logging import os import secrets from datetime import datetime, timedelta, timezone from typing import Annotated import jwt from fastapi import Depends, Header, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jwt.exceptions import PyJWTError as JWTError from passlib.context import CryptContext from sqlalchemy import delete, func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from backend.app.core.database import async_session, get_db from backend.app.core.permissions import Permission from backend.app.models.api_key import APIKey from backend.app.models.auth_ephemeral import AuthEphemeralToken, TokenType from backend.app.models.settings import Settings from backend.app.models.user import User logger = logging.getLogger(__name__) # SETTINGS_READ is intentionally not denied — the SpoolBuddy kiosk reads settings # via API key (e.g. to sync the UI language). _APIKEY_DENIED_PERMISSIONS: frozenset[Permission] = frozenset( { Permission.SETTINGS_UPDATE, Permission.SETTINGS_BACKUP, Permission.SETTINGS_RESTORE, Permission.USERS_READ, Permission.USERS_CREATE, Permission.USERS_UPDATE, Permission.USERS_DELETE, Permission.GROUPS_READ, Permission.GROUPS_CREATE, Permission.GROUPS_UPDATE, Permission.GROUPS_DELETE, Permission.API_KEYS_CREATE, Permission.API_KEYS_UPDATE, Permission.API_KEYS_DELETE, Permission.API_KEYS_READ, Permission.GITHUB_BACKUP, Permission.GITHUB_RESTORE, Permission.FIRMWARE_UPDATE, } ) def _check_apikey_permissions(perm_strings: list[str]) -> None: """Raise 403 if any required permission is admin-only (not accessible via API key).""" denied = _APIKEY_DENIED_PERMISSIONS.intersection(perm_strings) if denied: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="API keys cannot be used for administrative operations", ) def require_energy_cost_update(): """Dependency for ``POST /settings/electricity-price`` (#1356). Bypasses the ``_APIKEY_DENIED_PERMISSIONS`` ``SETTINGS_UPDATE`` block for API keys that explicitly opt into ``can_update_energy_cost``. Full ``SETTINGS_UPDATE`` for API keys stays denied — this is a narrowly-scoped door for the Home Assistant dynamic-tariff use case documented in ``wiki/features/energy.md``, not a general settings-write capability. Accepts: * Auth disabled → always allowed (matches other settings routes) * JWT user with ``SETTINGS_UPDATE`` permission * API key with ``can_update_energy_cost = True`` """ async def permission_checker( credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None, x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, ) -> User | None: async with async_session() as db: if not await is_auth_enabled(db): return None credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # API key path — X-API-Key header or Bearer bb_xxx api_key_value: str | None = None if x_api_key: api_key_value = x_api_key elif credentials is not None and credentials.credentials.startswith("bb_"): api_key_value = credentials.credentials if api_key_value is not None: api_key = await _validate_api_key(db, api_key_value) if api_key is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "Bearer"}, ) if not api_key.can_update_energy_cost: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="API key does not have 'update_energy_cost' permission", ) return None # JWT path if credentials is None: raise credentials_exception try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception jti: str | None = payload.get("jti") if not jti or await is_jti_revoked(jti): raise credentials_exception iat: int | float | None = payload.get("iat") except JWTError: raise credentials_exception user = await get_user_by_username(db, username) if user is None or not user.is_active: raise credentials_exception if not _is_token_fresh(iat, user): raise credentials_exception if not user.has_all_permissions(Permission.SETTINGS_UPDATE.value): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permissions: {Permission.SETTINGS_UPDATE.value}", ) return user return permission_checker # Password hashing # Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues # pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") def _get_jwt_secret() -> str: """Get the JWT secret key from environment, file, or generate a new one. Priority: 1. JWT_SECRET_KEY environment variable 2. .jwt_secret file in data directory 3. Generate new random secret and save to file Returns: The JWT secret key """ # 1. Check environment variable first env_secret = os.environ.get("JWT_SECRET_KEY") if env_secret: logger.info("Using JWT secret from JWT_SECRET_KEY environment variable") return env_secret # 2. Check for secret file in data directory from backend.app.core.paths import resolve_data_dir data_dir = resolve_data_dir() secret_file = data_dir / ".jwt_secret" if secret_file.exists(): try: secret = secret_file.read_text().strip() if secret and len(secret) >= 32: logger.info("Using JWT secret from %s", secret_file) return secret except OSError as e: logger.warning("Failed to read JWT secret file: %s", e) # 3. Generate new random secret new_secret = secrets.token_urlsafe(64) # Try to save it try: data_dir.mkdir(parents=True, exist_ok=True) # Note: CodeQL flags this as "clear-text storage of sensitive information" but this is # intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions, # and this is standard practice for self-hosted applications (same as .env files). secret_file.write_text(new_secret) # nosec B105 # Restrict permissions (owner read/write only) secret_file.chmod(0o600) logger.info("Generated new JWT secret and saved to %s", secret_file) except OSError as e: logger.warning( "Could not save JWT secret to file (%s). " "Secret will be regenerated on restart, invalidating existing tokens. " "Set JWT_SECRET_KEY environment variable for persistence.", e, ) return new_secret # JWT settings SECRET_KEY = _get_jwt_secret() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (M-2: reduced from 7 days) # HTTP Bearer token security = HTTPBearer(auto_error=False) # --- Slicer download tokens --- # Short-lived, single-use tokens for slicer protocol handlers that can't send # auth headers. Stored in AuthEphemeralToken (token_type=TokenType.SLICER_DOWNLOAD) # so they survive server restarts and work in multi-worker deployments (M-3). SLICER_TOKEN_EXPIRE_MINUTES = 5 async def create_slicer_download_token(resource_type: str, resource_id: int) -> str: """Create a short-lived, single-use download token for slicer protocol handlers.""" now = datetime.now(timezone.utc) expires_at = now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES) token = secrets.token_urlsafe(24) resource_key = f"{resource_type}:{resource_id}" async with async_session() as db: # Prune expired tokens opportunistically await db.execute( delete(AuthEphemeralToken).where( AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD, AuthEphemeralToken.expires_at < now, ) ) db.add( AuthEphemeralToken( token=token, token_type=TokenType.SLICER_DOWNLOAD, nonce=resource_key, expires_at=expires_at, ) ) await db.commit() return token async def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool: """Verify and atomically consume a slicer download token. Returns True only if the token is valid, unexpired, and bound to the given resource. DELETE...RETURNING ensures the token is single-use even under concurrent requests. M-NEW-1 fix: nonce (resource key) is included in the WHERE clause so the DELETE only succeeds when the token is presented to the *correct* resource endpoint. Previously the token was consumed (committed) even when stored_key != expected_key, permanently invalidating it while returning False to the caller. """ expected_key = f"{resource_type}:{resource_id}" now = datetime.now(timezone.utc) async with async_session() as db: result = await db.execute( delete(AuthEphemeralToken) .where( AuthEphemeralToken.token == token, AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD, AuthEphemeralToken.nonce == expected_key, AuthEphemeralToken.expires_at > now, ) .returning(AuthEphemeralToken.id) ) if result.one_or_none() is None: return False await db.commit() return True # --- Camera stream tokens --- # Reusable tokens for camera stream/snapshot endpoints loaded via /