from __future__ import annotations import logging import os import secrets from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Annotated import jwt from fastapi import Depends, Header, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jwt.exceptions import PyJWTError as JWTError from passlib.context import CryptContext from sqlalchemy import delete, func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from backend.app.core.database import async_session, get_db from backend.app.core.permissions import Permission from backend.app.models.api_key import APIKey from backend.app.models.auth_ephemeral import AuthEphemeralToken, TokenType from backend.app.models.settings import Settings from backend.app.models.user import User logger = logging.getLogger(__name__) # Permissions that cannot be accessed via API key. # API keys are limited to device/inventory operations. The following capabilities # are restricted to fully-authenticated users: user/group/API-key management, # settings read/backup/restore, firmware updates, and GitHub-backed backup/restore. _APIKEY_DENIED_PERMISSIONS: frozenset[Permission] = frozenset( { Permission.SETTINGS_READ, Permission.SETTINGS_UPDATE, Permission.SETTINGS_BACKUP, Permission.SETTINGS_RESTORE, Permission.USERS_READ, Permission.USERS_CREATE, Permission.USERS_UPDATE, Permission.USERS_DELETE, Permission.GROUPS_READ, Permission.GROUPS_CREATE, Permission.GROUPS_UPDATE, Permission.GROUPS_DELETE, Permission.API_KEYS_CREATE, Permission.API_KEYS_UPDATE, Permission.API_KEYS_DELETE, Permission.API_KEYS_READ, Permission.GITHUB_BACKUP, Permission.GITHUB_RESTORE, Permission.FIRMWARE_UPDATE, } ) def _check_apikey_permissions(perm_strings: list[str]) -> None: """Raise 403 if any required permission is admin-only (not accessible via API key).""" denied = _APIKEY_DENIED_PERMISSIONS.intersection(perm_strings) if denied: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="API keys cannot be used for administrative operations", ) # Password hashing # Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues # pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") def _get_jwt_secret() -> str: """Get the JWT secret key from environment, file, or generate a new one. Priority: 1. JWT_SECRET_KEY environment variable 2. .jwt_secret file in data directory 3. Generate new random secret and save to file Returns: The JWT secret key """ # 1. Check environment variable first env_secret = os.environ.get("JWT_SECRET_KEY") if env_secret: logger.info("Using JWT secret from JWT_SECRET_KEY environment variable") return env_secret # 2. Check for secret file in data directory # Use DATA_DIR env var (same as rest of app), fallback to data/ subdirectory data_dir_env = os.environ.get("DATA_DIR") if data_dir_env: data_dir = Path(data_dir_env) else: # Fallback to data/ subdirectory under project root (not project root itself!) data_dir = Path(__file__).parent.parent.parent.parent / "data" secret_file = data_dir / ".jwt_secret" if secret_file.exists(): try: secret = secret_file.read_text().strip() if secret and len(secret) >= 32: logger.info("Using JWT secret from %s", secret_file) return secret except OSError as e: logger.warning("Failed to read JWT secret file: %s", e) # 3. Generate new random secret new_secret = secrets.token_urlsafe(64) # Try to save it try: data_dir.mkdir(parents=True, exist_ok=True) # Note: CodeQL flags this as "clear-text storage of sensitive information" but this is # intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions, # and this is standard practice for self-hosted applications (same as .env files). secret_file.write_text(new_secret) # nosec B105 # Restrict permissions (owner read/write only) secret_file.chmod(0o600) logger.info("Generated new JWT secret and saved to %s", secret_file) except OSError as e: logger.warning( "Could not save JWT secret to file (%s). " "Secret will be regenerated on restart, invalidating existing tokens. " "Set JWT_SECRET_KEY environment variable for persistence.", e, ) return new_secret # JWT settings SECRET_KEY = _get_jwt_secret() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (M-2: reduced from 7 days) # HTTP Bearer token security = HTTPBearer(auto_error=False) # --- Slicer download tokens --- # Short-lived, single-use tokens for slicer protocol handlers that can't send # auth headers. Stored in AuthEphemeralToken (token_type=TokenType.SLICER_DOWNLOAD) # so they survive server restarts and work in multi-worker deployments (M-3). SLICER_TOKEN_EXPIRE_MINUTES = 5 async def create_slicer_download_token(resource_type: str, resource_id: int) -> str: """Create a short-lived, single-use download token for slicer protocol handlers.""" now = datetime.now(timezone.utc) expires_at = now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES) token = secrets.token_urlsafe(24) resource_key = f"{resource_type}:{resource_id}" async with async_session() as db: # Prune expired tokens opportunistically await db.execute( delete(AuthEphemeralToken).where( AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD, AuthEphemeralToken.expires_at < now, ) ) db.add( AuthEphemeralToken( token=token, token_type=TokenType.SLICER_DOWNLOAD, nonce=resource_key, expires_at=expires_at, ) ) await db.commit() return token async def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool: """Verify and atomically consume a slicer download token. Returns True only if the token is valid, unexpired, and bound to the given resource. DELETE...RETURNING ensures the token is single-use even under concurrent requests. M-NEW-1 fix: nonce (resource key) is included in the WHERE clause so the DELETE only succeeds when the token is presented to the *correct* resource endpoint. Previously the token was consumed (committed) even when stored_key != expected_key, permanently invalidating it while returning False to the caller. """ expected_key = f"{resource_type}:{resource_id}" now = datetime.now(timezone.utc) async with async_session() as db: result = await db.execute( delete(AuthEphemeralToken) .where( AuthEphemeralToken.token == token, AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD, AuthEphemeralToken.nonce == expected_key, AuthEphemeralToken.expires_at > now, ) .returning(AuthEphemeralToken.id) ) if result.one_or_none() is None: return False await db.commit() return True # --- Camera stream tokens --- # Reusable tokens for camera stream/snapshot endpoints loaded via /