from __future__ import annotations
import logging
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Annotated
import jwt
from fastapi import Depends, Header, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jwt.exceptions import PyJWTError as JWTError
from passlib.context import CryptContext
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from backend.app.core.database import async_session, get_db
from backend.app.core.permissions import Permission
from backend.app.models.api_key import APIKey
from backend.app.models.auth_ephemeral import AuthEphemeralToken, TokenType
from backend.app.models.settings import Settings
from backend.app.models.user import User
logger = logging.getLogger(__name__)
# GHSA-r2qv-8222-hqg3 (CVSS 9.9) — API key permission enforcement is allowlist-based.
#
# Until 0.2.4.x, ``_check_apikey_permissions`` only consulted the admin denylist
# below. The three documented scope flags on ``APIKey``
# (``can_read_status`` / ``can_queue`` / ``can_control_printer`` / ``can_manage_library``)
# were enforced only by ``check_permission()`` inside ``routes/webhook.py``;
# every other route used ``require_permission_if_auth_enabled`` which fell
# through to the denylist-only path, so an API key with all flags unchecked
# could still stop prints, edit queue items, and read every endpoint not in
# this set. ``require_any_permission_if_auth_enabled`` and
# ``require_ownership_permission`` did not call this helper at all, so admin
# "any-of" routes and ownership-modify routes were entirely ungated for API keys.
#
# Fix: ``_check_apikey_permissions`` now requires every requested permission to
# be present in ``_APIKEY_SCOPE_BY_PERMISSION`` (allowlist), and gates on the
# corresponding scope flag on the API key. Unmapped permissions = 403. This
# means a Permission added to ``core/permissions.py`` without a matching entry
# in ``_APIKEY_SCOPE_BY_PERMISSION`` is automatically denied for API keys —
# the previous denylist shape allowed every new Permission to silently widen
# the API-key surface.
#
# The denylist is retained for documentation / drift-detection only — its
# entries also satisfy "not in the allowlist", so they fail closed regardless.
#
# Mapping rationale (see wiki/features/api-keys.md):
# can_read_status → every ``*_READ`` + camera + stats + system + websocket
# can_queue → queue write ops + archive reprint
# can_control_printer → physical printer + smart-plug control
# can_manage_library → library upload/own + MakerWorld import (separate
# trust level from queue management, hence its own flag)
# admin-only → unmapped (default-deny); covers all create/update/
# delete of admin resources, settings writes, user/
# group/api-key/backup admin ops, discovery scan,
# cloud auth, library ALL-ownership perms, purges
_APIKEY_SCOPE_BY_PERMISSION: dict[Permission, str] = {
# can_read_status — read-only access to status, history, and configuration
Permission.PRINTERS_READ: "can_read_status",
Permission.ARCHIVES_READ: "can_read_status",
Permission.QUEUE_READ: "can_read_status",
Permission.LIBRARY_READ: "can_read_status",
Permission.PROJECTS_READ: "can_read_status",
Permission.FILAMENTS_READ: "can_read_status",
Permission.INVENTORY_READ: "can_read_status",
Permission.INVENTORY_VIEW_ASSIGNMENTS: "can_read_status",
Permission.INVENTORY_FORECAST_READ: "can_read_status",
Permission.SMART_PLUGS_READ: "can_read_status",
Permission.CAMERA_VIEW: "can_read_status",
Permission.MAINTENANCE_READ: "can_read_status",
Permission.KPROFILES_READ: "can_read_status",
Permission.NOTIFICATIONS_READ: "can_read_status",
Permission.NOTIFICATION_TEMPLATES_READ: "can_read_status",
Permission.EXTERNAL_LINKS_READ: "can_read_status",
Permission.FIRMWARE_READ: "can_read_status",
Permission.AMS_HISTORY_READ: "can_read_status",
Permission.STATS_READ: "can_read_status",
Permission.STATS_FILTER_BY_USER: "can_read_status",
Permission.SYSTEM_READ: "can_read_status",
# SETTINGS_READ stays allowed via read-status so SpoolBuddy kiosks keep
# working (they need the UI-language setting via API key).
Permission.SETTINGS_READ: "can_read_status",
Permission.MAKERWORLD_VIEW: "can_read_status",
Permission.WEBSOCKET_CONNECT: "can_read_status",
# can_queue — queue write ops + reprint (which enqueues an existing archive)
Permission.QUEUE_CREATE: "can_queue",
Permission.QUEUE_UPDATE_OWN: "can_queue",
Permission.QUEUE_UPDATE_ALL: "can_queue",
Permission.QUEUE_DELETE_OWN: "can_queue",
Permission.QUEUE_DELETE_ALL: "can_queue",
Permission.QUEUE_REORDER: "can_queue",
Permission.ARCHIVES_REPRINT_OWN: "can_queue",
Permission.ARCHIVES_REPRINT_ALL: "can_queue",
# can_control_printer — physical-world side effects on hardware
Permission.PRINTERS_CONTROL: "can_control_printer",
Permission.PRINTERS_FILES: "can_control_printer",
Permission.PRINTERS_AMS_RFID: "can_control_printer",
Permission.PRINTERS_CLEAR_PLATE: "can_control_printer",
Permission.SMART_PLUGS_CONTROL: "can_control_printer",
# can_manage_library — file-manager scope (upload/rename/delete OWN library
# entries + MakerWorld import which downloads files into the library).
# Bulk/ALL-ownership library ops (UPDATE_ALL / DELETE_ALL / PURGE) stay
# admin-only because they cross the user boundary.
Permission.LIBRARY_UPLOAD: "can_manage_library",
Permission.LIBRARY_UPDATE_OWN: "can_manage_library",
Permission.LIBRARY_DELETE_OWN: "can_manage_library",
Permission.MAKERWORLD_IMPORT: "can_manage_library",
# can_manage_inventory — inventory write scope. Covers the documented
# spool/catalog/forecast write surface AND the SpoolBuddy kiosk endpoints
# (NFC scan, scale reading, system command/update) which used
# INVENTORY_UPDATE as a stand-in for "kiosk write" under the prior
# denylist model. Read-only inventory (INVENTORY_READ etc.) stays under
# can_read_status.
Permission.INVENTORY_CREATE: "can_manage_inventory",
Permission.INVENTORY_UPDATE: "can_manage_inventory",
Permission.INVENTORY_DELETE: "can_manage_inventory",
Permission.INVENTORY_FORECAST_WRITE: "can_manage_inventory",
# can_access_cloud — narrow opt-in scope, gated by the router-level
# ``_cloud_api_key_gate`` and additionally enforced here so the route-
# level ``cloud_caller(Permission.CLOUD_AUTH)`` dep also fails closed
# when the flag is off (defence-in-depth).
Permission.CLOUD_AUTH: "can_access_cloud",
}
# Retained for documentation, drift-detection, and the prior "administrative
# operations" error string. Entries here are also absent from
# ``_APIKEY_SCOPE_BY_PERMISSION``, so they fail closed via the allowlist; the
# denylist is a redundant explicit "these are admin" marker, not the load-
# bearing security check.
_APIKEY_DENIED_PERMISSIONS: frozenset[Permission] = frozenset(
{
# Settings administration (cred storage; rewriting these reaches SMTP/LDAP/MQTT).
Permission.SETTINGS_UPDATE,
Permission.SETTINGS_BACKUP,
Permission.SETTINGS_RESTORE,
# User / group / API-key administration.
Permission.USERS_READ,
Permission.USERS_CREATE,
Permission.USERS_UPDATE,
Permission.USERS_DELETE,
Permission.GROUPS_READ,
Permission.GROUPS_CREATE,
Permission.GROUPS_UPDATE,
Permission.GROUPS_DELETE,
Permission.API_KEYS_CREATE,
Permission.API_KEYS_UPDATE,
Permission.API_KEYS_DELETE,
Permission.API_KEYS_READ,
# GitHub backup admin + firmware OTA.
Permission.GITHUB_BACKUP,
Permission.GITHUB_RESTORE,
Permission.FIRMWARE_UPDATE,
# Resource administration (printer/project/filament/maintenance/k-profile/etc CRUD).
# API keys with the operational scopes can read these resources via
# *_READ permissions but cannot mutate the catalog/registry itself.
Permission.PRINTERS_CREATE,
Permission.PRINTERS_UPDATE,
Permission.PRINTERS_DELETE,
Permission.ARCHIVES_CREATE,
Permission.ARCHIVES_UPDATE_OWN,
Permission.ARCHIVES_UPDATE_ALL,
Permission.ARCHIVES_DELETE_OWN,
Permission.ARCHIVES_DELETE_ALL,
Permission.ARCHIVES_PURGE,
Permission.LIBRARY_UPDATE_ALL,
Permission.LIBRARY_DELETE_ALL,
Permission.LIBRARY_PURGE,
Permission.PROJECTS_CREATE,
Permission.PROJECTS_UPDATE,
Permission.PROJECTS_DELETE,
Permission.FILAMENTS_CREATE,
Permission.FILAMENTS_UPDATE,
Permission.FILAMENTS_DELETE,
Permission.MAINTENANCE_CREATE,
Permission.MAINTENANCE_UPDATE,
Permission.MAINTENANCE_DELETE,
Permission.KPROFILES_CREATE,
Permission.KPROFILES_UPDATE,
Permission.KPROFILES_DELETE,
Permission.NOTIFICATIONS_CREATE,
Permission.NOTIFICATIONS_UPDATE,
Permission.NOTIFICATIONS_DELETE,
Permission.NOTIFICATIONS_USER_EMAIL,
Permission.NOTIFICATION_TEMPLATES_UPDATE,
Permission.EXTERNAL_LINKS_CREATE,
Permission.EXTERNAL_LINKS_UPDATE,
Permission.EXTERNAL_LINKS_DELETE,
Permission.SMART_PLUGS_CREATE,
Permission.SMART_PLUGS_UPDATE,
Permission.SMART_PLUGS_DELETE,
# Network scanning — operator only (no API-key scope for this).
Permission.DISCOVERY_SCAN,
}
)
def _resolve_apikey_scope(perm_string: str) -> str | None:
"""Return the scope-flag attribute name gating ``perm_string`` for API keys.
None when the permission is unmapped (= admin-only / not API-key-usable).
"""
try:
perm = Permission(perm_string)
except ValueError:
return None
return _APIKEY_SCOPE_BY_PERMISSION.get(perm)
def _check_apikey_permissions(api_key: APIKey, perm_strings: list[str], *, require_any: bool = False) -> None:
"""Raise 403 unless ``api_key`` is allowed to use ``perm_strings``.
Allowlist semantics: every requested permission MUST be present in
``_APIKEY_SCOPE_BY_PERMISSION`` AND its scope flag must be True on
``api_key``. Unmapped permissions = administrative = 403.
By default ALL requested permissions must pass (mirrors
``require_permission`` / ``require_permission_if_auth_enabled``).
When ``require_any=True``, only one needs to pass (mirrors
``require_any_permission_if_auth_enabled``).
"""
if not perm_strings:
# Defensive: empty perm list means the dep is auth-only, not perm-gated.
# Routes never call us with [] today, but if they did, returning here
# would silently allow — instead, fail closed.
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API keys cannot be used for unspecified permissions",
)
last_failure: HTTPException | None = None
for perm_str in perm_strings:
scope_attr = _resolve_apikey_scope(perm_str)
if scope_attr is None:
failure = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API keys cannot be used for administrative operations",
)
elif not getattr(api_key, scope_attr, False):
failure = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"API key does not have '{scope_attr}' permission",
)
else:
failure = None
if failure is None and require_any:
return # at least one passed
if failure is not None and not require_any:
raise failure
last_failure = failure
if require_any and last_failure is not None:
raise last_failure
def require_energy_cost_update():
"""Dependency for ``POST /settings/electricity-price`` (#1356).
Bypasses the ``_APIKEY_DENIED_PERMISSIONS`` ``SETTINGS_UPDATE`` block for
API keys that explicitly opt into ``can_update_energy_cost``. Full
``SETTINGS_UPDATE`` for API keys stays denied — this is a narrowly-scoped
door for the Home Assistant dynamic-tariff use case documented in
``wiki/features/energy.md``, not a general settings-write capability.
Accepts:
* Auth disabled → always allowed (matches other settings routes)
* JWT user with ``SETTINGS_UPDATE`` permission
* API key with ``can_update_energy_cost = True``
"""
async def permission_checker(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
) -> User | None:
async with async_session() as db:
if not await is_auth_enabled(db):
return None
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# API key path — X-API-Key header or Bearer bb_xxx
api_key_value: str | None = None
if x_api_key:
api_key_value = x_api_key
elif credentials is not None and credentials.credentials.startswith("bb_"):
api_key_value = credentials.credentials
if api_key_value is not None:
api_key = await _validate_api_key(db, api_key_value)
if api_key is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
if not api_key.can_update_energy_cost:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API key does not have 'update_energy_cost' permission",
)
return None
# JWT path
if credentials is None:
raise credentials_exception
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
jti: str | None = payload.get("jti")
if not jti or await is_jti_revoked(jti):
raise credentials_exception
iat: int | float | None = payload.get("iat")
except JWTError:
raise credentials_exception
user = await get_user_by_username(db, username)
if user is None or not user.is_active:
raise credentials_exception
if not _is_token_fresh(iat, user):
raise credentials_exception
if not user.has_all_permissions(Permission.SETTINGS_UPDATE.value):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {Permission.SETTINGS_UPDATE.value}",
)
return user
return permission_checker
# Password hashing
# Use pbkdf2_sha256 instead of bcrypt to avoid 72-byte limit and passlib initialization issues
# pbkdf2_sha256 is a secure password hashing algorithm without bcrypt's limitations
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
def _get_jwt_secret() -> str:
"""Get the JWT secret key from environment, file, or generate a new one.
Priority:
1. JWT_SECRET_KEY environment variable
2. .jwt_secret file in data directory
3. Generate new random secret and save to file
Returns:
The JWT secret key
"""
# 1. Check environment variable first
env_secret = os.environ.get("JWT_SECRET_KEY")
if env_secret:
logger.info("Using JWT secret from JWT_SECRET_KEY environment variable")
return env_secret
# 2. Check for secret file in data directory
from backend.app.core.paths import resolve_data_dir
data_dir = resolve_data_dir()
secret_file = data_dir / ".jwt_secret"
if secret_file.exists():
try:
secret = secret_file.read_text().strip()
if secret and len(secret) >= 32:
logger.info("Using JWT secret from %s", secret_file)
return secret
except OSError as e:
logger.warning("Failed to read JWT secret file: %s", e)
# 3. Generate new random secret
new_secret = secrets.token_urlsafe(64)
# Try to save it
try:
data_dir.mkdir(parents=True, exist_ok=True)
# Note: CodeQL flags this as "clear-text storage of sensitive information" but this is
# intentional and secure - JWT secrets must be readable by the app, we set 0600 permissions,
# and this is standard practice for self-hosted applications (same as .env files).
secret_file.write_text(new_secret) # nosec B105
# Restrict permissions (owner read/write only)
secret_file.chmod(0o600)
logger.info("Generated new JWT secret and saved to %s", secret_file)
except OSError as e:
logger.warning(
"Could not save JWT secret to file (%s). "
"Secret will be regenerated on restart, invalidating existing tokens. "
"Set JWT_SECRET_KEY environment variable for persistence.",
e,
)
return new_secret
# JWT settings
SECRET_KEY = _get_jwt_secret()
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (M-2: reduced from 7 days)
# HTTP Bearer token
security = HTTPBearer(auto_error=False)
# --- Slicer download tokens ---
# Short-lived, single-use tokens for slicer protocol handlers that can't send
# auth headers. Stored in AuthEphemeralToken (token_type=TokenType.SLICER_DOWNLOAD)
# so they survive server restarts and work in multi-worker deployments (M-3).
SLICER_TOKEN_EXPIRE_MINUTES = 5
async def create_slicer_download_token(resource_type: str, resource_id: int) -> str:
"""Create a short-lived, single-use download token for slicer protocol handlers."""
now = datetime.now(timezone.utc)
expires_at = now + timedelta(minutes=SLICER_TOKEN_EXPIRE_MINUTES)
token = secrets.token_urlsafe(24)
resource_key = f"{resource_type}:{resource_id}"
async with async_session() as db:
# Prune expired tokens opportunistically
await db.execute(
delete(AuthEphemeralToken).where(
AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD,
AuthEphemeralToken.expires_at < now,
)
)
db.add(
AuthEphemeralToken(
token=token,
token_type=TokenType.SLICER_DOWNLOAD,
nonce=resource_key,
expires_at=expires_at,
)
)
await db.commit()
return token
async def verify_slicer_download_token(token: str, resource_type: str, resource_id: int) -> bool:
"""Verify and atomically consume a slicer download token.
Returns True only if the token is valid, unexpired, and bound to the given resource.
DELETE...RETURNING ensures the token is single-use even under concurrent requests.
M-NEW-1 fix: nonce (resource key) is included in the WHERE clause so the DELETE
only succeeds when the token is presented to the *correct* resource endpoint.
Previously the token was consumed (committed) even when stored_key != expected_key,
permanently invalidating it while returning False to the caller.
"""
expected_key = f"{resource_type}:{resource_id}"
now = datetime.now(timezone.utc)
async with async_session() as db:
result = await db.execute(
delete(AuthEphemeralToken)
.where(
AuthEphemeralToken.token == token,
AuthEphemeralToken.token_type == TokenType.SLICER_DOWNLOAD,
AuthEphemeralToken.nonce == expected_key,
AuthEphemeralToken.expires_at > now,
)
.returning(AuthEphemeralToken.id)
)
if result.one_or_none() is None:
return False
await db.commit()
return True
# --- Camera stream tokens ---
# Reusable tokens for camera stream/snapshot endpoints loaded via
/