from __future__ import annotations import logging import os import secrets from datetime import datetime, timedelta, timezone from typing import Annotated import jwt from fastapi import Depends, Header, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jwt.exceptions import PyJWTError as JWTError from passlib.context import CryptContext from sqlalchemy import delete, func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from backend.app.core.database import async_session, get_db from backend.app.core.permissions import Permission from backend.app.models.api_key import APIKey from backend.app.models.auth_ephemeral import AuthEphemeralToken, TokenType from backend.app.models.settings import Settings from backend.app.models.user import User logger = logging.getLogger(__name__) # GHSA-r2qv-8222-hqg3 (CVSS 9.9) — API key permission enforcement is allowlist-based. # # Until 0.2.4.x, ``_check_apikey_permissions`` only consulted the admin denylist # below. The three documented scope flags on ``APIKey`` # (``can_read_status`` / ``can_queue`` / ``can_control_printer`` / ``can_manage_library``) # were enforced only by ``check_permission()`` inside ``routes/webhook.py``; # every other route used ``require_permission_if_auth_enabled`` which fell # through to the denylist-only path, so an API key with all flags unchecked # could still stop prints, edit queue items, and read every endpoint not in # this set. ``require_any_permission_if_auth_enabled`` and # ``require_ownership_permission`` did not call this helper at all, so admin # "any-of" routes and ownership-modify routes were entirely ungated for API keys. # # Fix: ``_check_apikey_permissions`` now requires every requested permission to # be present in ``_APIKEY_SCOPE_BY_PERMISSION`` (allowlist), and gates on the # corresponding scope flag on the API key. Unmapped permissions = 403. This # means a Permission added to ``core/permissions.py`` without a matching entry # in ``_APIKEY_SCOPE_BY_PERMISSION`` is automatically denied for API keys — # the previous denylist shape allowed every new Permission to silently widen # the API-key surface. # # The denylist is retained for documentation / drift-detection only — its # entries also satisfy "not in the allowlist", so they fail closed regardless. # # Mapping rationale (see wiki/features/api-keys.md): # can_read_status → every ``*_READ`` + camera + stats + system + websocket # can_queue → queue write ops + archive reprint # can_control_printer → physical printer + smart-plug control # can_manage_library → library upload/own + MakerWorld import (separate # trust level from queue management, hence its own flag) # admin-only → unmapped (default-deny); covers all create/update/ # delete of admin resources, settings writes, user/ # group/api-key/backup admin ops, discovery scan, # cloud auth, library ALL-ownership perms, purges _APIKEY_SCOPE_BY_PERMISSION: dict[Permission, str] = { # can_read_status — read-only access to status, history, and configuration Permission.PRINTERS_READ: "can_read_status", Permission.ARCHIVES_READ: "can_read_status", Permission.QUEUE_READ: "can_read_status", Permission.LIBRARY_READ: "can_read_status", Permission.PROJECTS_READ: "can_read_status", Permission.FILAMENTS_READ: "can_read_status", Permission.INVENTORY_READ: "can_read_status", Permission.INVENTORY_VIEW_ASSIGNMENTS: "can_read_status", Permission.INVENTORY_FORECAST_READ: "can_read_status", Permission.SMART_PLUGS_READ: "can_read_status", Permission.CAMERA_VIEW: "can_read_status", Permission.MAINTENANCE_READ: "can_read_status", Permission.KPROFILES_READ: "can_read_status", Permission.NOTIFICATIONS_READ: "can_read_status", Permission.NOTIFICATION_TEMPLATES_READ: "can_read_status", Permission.EXTERNAL_LINKS_READ: "can_read_status", Permission.FIRMWARE_READ: "can_read_status", Permission.AMS_HISTORY_READ: "can_read_status", Permission.STATS_READ: "can_read_status", Permission.STATS_FILTER_BY_USER: "can_read_status", Permission.SYSTEM_READ: "can_read_status", # SETTINGS_READ stays allowed via read-status so SpoolBuddy kiosks keep # working (they need the UI-language setting via API key). Permission.SETTINGS_READ: "can_read_status", Permission.MAKERWORLD_VIEW: "can_read_status", Permission.WEBSOCKET_CONNECT: "can_read_status", # can_queue — queue write ops + reprint (which enqueues an existing archive) Permission.QUEUE_CREATE: "can_queue", Permission.QUEUE_UPDATE_OWN: "can_queue", Permission.QUEUE_UPDATE_ALL: "can_queue", Permission.QUEUE_DELETE_OWN: "can_queue", Permission.QUEUE_DELETE_ALL: "can_queue", Permission.QUEUE_REORDER: "can_queue", Permission.ARCHIVES_REPRINT_OWN: "can_queue", Permission.ARCHIVES_REPRINT_ALL: "can_queue", # can_control_printer — physical-world side effects on hardware Permission.PRINTERS_CONTROL: "can_control_printer", Permission.PRINTERS_FILES: "can_control_printer", Permission.PRINTERS_AMS_RFID: "can_control_printer", Permission.PRINTERS_CLEAR_PLATE: "can_control_printer", Permission.SMART_PLUGS_CONTROL: "can_control_printer", # can_manage_library — file-manager scope (upload/rename/delete OWN library # entries + MakerWorld import which downloads files into the library). # Bulk/ALL-ownership library ops (UPDATE_ALL / DELETE_ALL / PURGE) stay # admin-only because they cross the user boundary. Permission.LIBRARY_UPLOAD: "can_manage_library", Permission.LIBRARY_UPDATE_OWN: "can_manage_library", Permission.LIBRARY_DELETE_OWN: "can_manage_library", Permission.MAKERWORLD_IMPORT: "can_manage_library", # can_manage_inventory — inventory write scope. Covers the documented # spool/catalog/forecast write surface AND the SpoolBuddy kiosk endpoints # (NFC scan, scale reading, system command/update) which used # INVENTORY_UPDATE as a stand-in for "kiosk write" under the prior # denylist model. Read-only inventory (INVENTORY_READ etc.) stays under # can_read_status. Permission.INVENTORY_CREATE: "can_manage_inventory", Permission.INVENTORY_UPDATE: "can_manage_inventory", Permission.INVENTORY_DELETE: "can_manage_inventory", Permission.INVENTORY_FORECAST_WRITE: "can_manage_inventory", # can_access_cloud — narrow opt-in scope, gated by the router-level # ``_cloud_api_key_gate`` and additionally enforced here so the route- # level ``cloud_caller(Permission.CLOUD_AUTH)`` dep also fails closed # when the flag is off (defence-in-depth). Permission.CLOUD_AUTH: "can_access_cloud", } # Retained for documentation, drift-detection, and the prior "administrative # operations" error string. Entries here are also absent from # ``_APIKEY_SCOPE_BY_PERMISSION``, so they fail closed via the allowlist; the # denylist is a redundant explicit "these are admin" marker, not the load- # bearing security check. _APIKEY_DENIED_PERMISSIONS: frozenset[Permission] = frozenset( { # Settings administration (cred storage; rewriting these reaches SMTP/LDAP/MQTT). Permission.SETTINGS_UPDATE, Permission.SETTINGS_BACKUP, Permission.SETTINGS_RESTORE, # User / group / API-key administration. Permission.USERS_READ, Permission.USERS_CREATE, Permission.USERS_UPDATE, Permission.USERS_DELETE, Permission.GROUPS_READ, Permission.GROUPS_CREATE, Permission.GROUPS_UPDATE, Permission.GROUPS_DELETE, Permission.API_KEYS_CREATE, Permission.API_KEYS_UPDATE, Permission.API_KEYS_DELETE, Permission.API_KEYS_READ, # GitHub backup admin + firmware OTA. Permission.GITHUB_BACKUP, Permission.GITHUB_RESTORE, Permission.FIRMWARE_UPDATE, # Resource administration (printer/project/filament/maintenance/k-profile/etc CRUD). # API keys with the operational scopes can read these resources via # *_READ permissions but cannot mutate the catalog/registry itself. Permission.PRINTERS_CREATE, Permission.PRINTERS_UPDATE, Permission.PRINTERS_DELETE, Permission.ARCHIVES_CREATE, Permission.ARCHIVES_UPDATE_OWN, Permission.ARCHIVES_UPDATE_ALL, Permission.ARCHIVES_DELETE_OWN, Permission.ARCHIVES_DELETE_ALL, Permission.ARCHIVES_PURGE, Permission.LIBRARY_UPDATE_ALL, Permission.LIBRARY_DELETE_ALL, Permission.LIBRARY_PURGE, Permission.PROJECTS_CREATE, Permission.PROJECTS_UPDATE, Permission.PROJECTS_DELETE, Permission.FILAMENTS_CREATE, Permission.FILAMENTS_UPDATE, Permission.FILAMENTS_DELETE, Permission.MAINTENANCE_CREATE, Permission.MAINTENANCE_UPDATE, Permission.MAINTENANCE_DELETE, Permission.KPROFILES_CREATE, Permission.KPROFILES_UPDATE, Permission.KPROFILES_DELETE, Permission.NOTIFICATIONS_CREATE, Permission.NOTIFICATIONS_UPDATE, Permission.NOTIFICATIONS_DELETE, Permission.NOTIFICATIONS_USER_EMAIL, Permission.NOTIFICATION_TEMPLATES_UPDATE, Permission.EXTERNAL_LINKS_CREATE, Permission.EXTERNAL_LINKS_UPDATE, Permission.EXTERNAL_LINKS_DELETE, Permission.SMART_PLUGS_CREATE, Permission.SMART_PLUGS_UPDATE, Permission.SMART_PLUGS_DELETE, # Network scanning — operator only (no API-key scope for this). Permission.DISCOVERY_SCAN, } ) def _resolve_apikey_scope(perm_string: str) -> str | None: """Return the scope-flag attribute name gating ``perm_string`` for API keys. None when the permission is unmapped (= admin-only / not API-key-usable). """ try: perm = Permission(perm_string) except ValueError: return None return _APIKEY_SCOPE_BY_PERMISSION.get(perm) def _check_apikey_permissions(api_key: APIKey, perm_strings: list[str], *, require_any: bool = False) -> None: """Raise 403 unless ``api_key`` is allowed to use ``perm_strings``. Allowlist semantics: every requested permission MUST be present in ``_APIKEY_SCOPE_BY_PERMISSION`` AND its scope flag must be True on ``api_key``. Unmapped permissions = administrative = 403. By default ALL requested permissions must pass (mirrors ``require_permission`` / ``require_permission_if_auth_enabled``). When ``require_any=True``, only one needs to pass (mirrors ``require_any_permission_if_auth_enabled``). """ if not perm_strings: # Defensive: empty perm list means the dep is auth-only, not perm-gated. # Routes never call us with [] today, but if they did, returning here # would silently allow — instead, fail closed. raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="API keys cannot be used for unspecified permissions", ) last_failure: HTTPException | None = None for perm_str in perm_strings: scope_attr = _resolve_apikey_scope(perm_str) if scope_attr is None: failure = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="API keys cannot be used for administrative operations", ) elif not getattr(api_key, scope_attr, False): failure = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"API key does not have '{scope_attr}' permission", ) else: failure = None if failure is None and require_any: return # at least one passed if failure is not None and not require_any: raise failure last_failure = failure if require_any and last_failure is not None: raise last_failure def require_energy_cost_update(): """Dependency for ``POST /settings/electricity-price`` (#1356). Bypasses the ``_APIKEY_DENIED_PERMISSIONS`` ``SETTINGS_UPDATE`` block for API keys that explicitly opt into ``can_update_energy_cost``. Full ``SETTINGS_UPDATE`` for API keys stays denied — this is a narrowly-scoped door for the Home Assistant dynamic-tariff use case documented in ``wiki/features/energy.md``, not a general settings-write capability. Accepts: * Auth disabled → always allowed (matches other settings routes) * JWT user with ``SETTINGS_UPDATE`` permission * API key with ``can_update_energy_cost = True`` """ async def permission_checker( credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None, x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, ) -> User | None: async with async_session() as db: if not await is_auth_enabled(db): return None credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # API key path — X-API-Key header or Bearer bb_xxx api_key_value: str | None = None if x_api_key: api_key_value = x_api_key elif credentials is not None and credentials.credentials.startswith("bb_"): api_key_value = credentials.credentials if api_key_value is not None: api_key = await _validate_api_key(db, api_key_value) if api_key is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "Bearer"}, ) if not api_key.can_update_energy_cost: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="API key does not have 'update_energy_cost' permission", ) return None # JWT path if credentials is None: raise credentials_exception try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception jti: str | None = payload.get("jti") if not jti or await is_jti_revoked(jti): raise credentials_exception iat: int | float | None = payload.get("iat") except JWTError: raise credentials_exception user = await get_user_by_username(db, username) if user is None or not user.is_active: raise credentials_exception if not _is_token_fresh(iat, user): raise credentials_exception if not user.has_all_permissions(Permission.SETTINGS_UPDATE.value): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permissions: {Permission.SETTINGS_UPDATE.value}", ) return user return permission_checker # Password hashing # Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues # pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") def _get_jwt_secret() -> str: """Get the JWT secret key from environment, file, or generate a new one. Priority: 1. JWT_SECRET_KEY environment variable 2. .jwt_secret file in data directory 3. Generate new random secret and save to file Returns: The JWT secret key """ # 1. Check environment variable first env_secret = os.environ.get("JWT_SECRET_KEY") if env_secret: logger.info("Using JWT secret from JWT_SECRET_KEY environment variable") return env_secret # 2. Check for secret file in data directory from backend.app.core.paths import resolve_data_dir data_dir = resolve_data_dir() secret_file = data_dir / ".jwt_secret" if secret_file.exists(): try: secret = secret_file.read_text().strip() if secret and len(secret) >= 32: logger.info("Using JWT secret from %s", secret_file) return secret except OSError as e: logger.warning("Failed to read JWT secret file: %s", e) # 3. Generate new random secret new_secret = secrets.token_urlsafe(64) # Try to save it try: data_dir.mkdir(parents=True, exist_ok=True) # Note: CodeQL flags this as "clear-text storage of sensitive information" but this is # intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions, # and this is standard practice for self-hosted applications (same as .env files). secret_file.write_text(new_secret) # nosec B105 # Restrict permissions (owner read/write only) secret_file.chmod(0o600) logger.info("Generated new JWT secret and saved to %s", secret_file) except OSError as e: logger.warning( "Could not save JWT secret to file (%s). " "Secret will be regenerated on restart, invalidating existing tokens. " "Set JWT_SECRET_KEY environment variable for persistence.", e, ) return new_secret # JWT settings SECRET_KEY = _get_jwt_secret() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (M-2: reduced from 7 days) # HTTP Bearer token security = HTTPBearer(auto_error=False) # --- Slicer download tokens --- # Short-lived, single-use tokens for slicer protocol handlers that can't send # auth headers. Stored in AuthEphemeralToken (token_type=TokenType.SLICER_DOWNLOAD) # so they survive server restarts and work in multi-worker deployments (M-3). SLICER_TOKEN_EXPIRE_MINUTES = 5 async def create_slicer_download_token(resource_type: str, resource_id: int) -> str: """Create a short-lived, single-use download token for slicer protocol handlers.""" now = datetime.now(timezone.utc) expires_at = now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES) token = secrets.token_urlsafe(24) resource_key = f"{resource_type}:{resource_id}" async with async_session() as db: # Prune expired tokens opportunistically await db.execute( delete(AuthEphemeralToken).where( AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD, AuthEphemeralToken.expires_at < now, ) ) db.add( AuthEphemeralToken( token=token, token_type=TokenType.SLICER_DOWNLOAD, nonce=resource_key, expires_at=expires_at, ) ) await db.commit() return token async def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool: """Verify and atomically consume a slicer download token. Returns True only if the token is valid, unexpired, and bound to the given resource. DELETE...RETURNING ensures the token is single-use even under concurrent requests. M-NEW-1 fix: nonce (resource key) is included in the WHERE clause so the DELETE only succeeds when the token is presented to the *correct* resource endpoint. Previously the token was consumed (committed) even when stored_key != expected_key, permanently invalidating it while returning False to the caller. """ expected_key = f"{resource_type}:{resource_id}" now = datetime.now(timezone.utc) async with async_session() as db: result = await db.execute( delete(AuthEphemeralToken) .where( AuthEphemeralToken.token == token, AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD, AuthEphemeralToken.nonce == expected_key, AuthEphemeralToken.expires_at > now, ) .returning(AuthEphemeralToken.id) ) if result.one_or_none() is None: return False await db.commit() return True # --- Camera stream tokens --- # Reusable tokens for camera stream/snapshot endpoints loaded via /