| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690 |
- from datetime import timedelta
- from typing import Annotated
- from fastapi import APIRouter, Depends, Header, HTTPException, status
- from fastapi.security import HTTPAuthorizationCredentials
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import selectinload
- from backend.app.api.routes.settings import get_external_login_url
- from backend.app.core.auth import (
- ACCESS_TOKEN_EXPIRE_MINUTES,
- ALGORITHM,
- SECRET_KEY,
- Permission,
- RequirePermissionIfAuthEnabled,
- _validate_api_key,
- authenticate_user,
- authenticate_user_by_email,
- create_access_token,
- get_current_active_user,
- get_password_hash,
- get_user_by_email,
- get_user_by_username,
- security,
- )
- from backend.app.core.database import get_db
- from backend.app.core.permissions import ALL_PERMISSIONS
- from backend.app.models.group import Group
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- from backend.app.schemas.auth import (
- ForgotPasswordRequest,
- ForgotPasswordResponse,
- GroupBrief,
- LoginRequest,
- LoginResponse,
- ResetPasswordRequest,
- ResetPasswordResponse,
- SetupRequest,
- SetupResponse,
- SMTPSettings,
- TestSMTPRequest,
- TestSMTPResponse,
- UserResponse,
- )
- from backend.app.services.email_service import (
- create_password_reset_email_from_template,
- generate_secure_password,
- get_smtp_settings,
- save_smtp_settings,
- send_email,
- )
- def _user_to_response(user: User) -> UserResponse:
- """Convert a User model to UserResponse schema."""
- return UserResponse(
- id=user.id,
- username=user.username,
- email=user.email,
- role=user.role,
- is_active=user.is_active,
- is_admin=user.is_admin,
- groups=[GroupBrief(id=g.id, name=g.name) for g in user.groups],
- permissions=sorted(user.get_permissions()),
- created_at=user.created_at.isoformat(),
- )
- def _api_key_to_user_response(api_key) -> UserResponse:
- """Create a synthetic admin UserResponse for a valid API key."""
- return UserResponse(
- id=0,
- username=f"api-key:{api_key.key_prefix}",
- email=None,
- role="admin",
- is_active=True,
- is_admin=True,
- groups=[],
- permissions=sorted(ALL_PERMISSIONS),
- created_at=api_key.created_at.isoformat(),
- )
- router = APIRouter(prefix="/auth", tags=["authentication"])
- async def is_auth_enabled(db: AsyncSession) -> bool:
- """Check if authentication is enabled."""
- result = await db.execute(select(Settings).where(Settings.key == "auth_enabled"))
- setting = result.scalar_one_or_none()
- if setting is None:
- return False
- return setting.value.lower() == "true"
- async def is_advanced_auth_enabled(db: AsyncSession) -> bool:
- """Check if advanced authentication is enabled."""
- result = await db.execute(select(Settings).where(Settings.key == "advanced_auth_enabled"))
- setting = result.scalar_one_or_none()
- if setting is None:
- return False
- return setting.value.lower() == "true"
- async def set_advanced_auth_enabled(db: AsyncSession, enabled: bool) -> None:
- """Set advanced authentication enabled status."""
- from backend.app.core.db_dialect import upsert_setting
- await upsert_setting(db, Settings, "advanced_auth_enabled", "true" if enabled else "false")
- async def set_auth_enabled(db: AsyncSession, enabled: bool) -> None:
- """Set authentication enabled status."""
- from backend.app.core.db_dialect import upsert_setting
- await upsert_setting(db, Settings, "auth_enabled", "true" if enabled else "false")
- # Note: Don't commit here - let get_db handle it or commit explicitly in the route
- async def is_setup_completed(db: AsyncSession) -> bool:
- """Check if setup has been completed."""
- result = await db.execute(select(Settings).where(Settings.key == "setup_completed"))
- setting = result.scalar_one_or_none()
- return setting and setting.value.lower() == "true"
- async def set_setup_completed(db: AsyncSession, completed: bool) -> None:
- """Set setup completed status."""
- from backend.app.core.db_dialect import upsert_setting
- await upsert_setting(db, Settings, "setup_completed", "true" if completed else "false")
- # Note: Don't commit here - let get_db handle it or commit explicitly in the route
- @router.post("/setup", response_model=SetupResponse)
- async def setup_auth(request: SetupRequest, db: AsyncSession = Depends(get_db)):
- """First-time setup: enable/disable authentication and create admin user."""
- import logging
- logger = logging.getLogger(__name__)
- try:
- # If auth is currently enabled, block unauthenticated setup changes.
- # Use the admin panel (/disable endpoint) to modify auth when it's already on.
- if await is_auth_enabled(db):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Authentication is already configured. Use the admin panel to modify auth settings.",
- )
- admin_created = False
- if request.auth_enabled:
- # Check if admin users already exist
- admin_users_result = await db.execute(select(User).where(User.role == "admin"))
- existing_admin_users = list(admin_users_result.scalars().all())
- has_admin_users = len(existing_admin_users) > 0
- if has_admin_users:
- # Admin users already exist, just enable auth (don't create new admin)
- logger.info(
- f"Admin users already exist ({len(existing_admin_users)} found), enabling authentication without creating new admin"
- )
- admin_created = False
- else:
- # No admin users exist, require admin credentials to create first admin
- if not request.admin_username or not request.admin_password:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Admin username and password are required when enabling authentication (no admin users exist)",
- )
- # Check if username already exists (shouldn't happen if no admin users exist, but check anyway)
- existing_user = await get_user_by_username(db, request.admin_username)
- if existing_user:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="User with this username already exists",
- )
- # Create admin user FIRST (before enabling auth)
- try:
- logger.info("Creating admin user: %s", request.admin_username)
- admin_user = User(
- username=request.admin_username,
- password_hash=get_password_hash(request.admin_password),
- role="admin",
- is_active=True,
- )
- # Try to add user to Administrators group if it exists
- admin_group_result = await db.execute(select(Group).where(Group.name == "Administrators"))
- admin_group = admin_group_result.scalar_one_or_none()
- if admin_group:
- admin_user.groups.append(admin_group)
- logger.info("Added new admin user to Administrators group")
- db.add(admin_user)
- logger.info("Admin user added to session: %s", request.admin_username)
- admin_created = True
- except Exception as e:
- await db.rollback()
- logger.error("Failed to create admin user: %s", e, exc_info=True)
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to create admin user: {str(e)}",
- )
- # Set auth enabled and mark setup as completed
- await set_auth_enabled(db, request.auth_enabled)
- await set_setup_completed(db, True)
- await db.commit()
- if admin_created:
- await db.refresh(admin_user)
- logger.info("Admin user created successfully: %s", admin_user.id)
- logger.info("Setup completed: auth_enabled=%s, admin_created=%s", request.auth_enabled, admin_created)
- return SetupResponse(auth_enabled=request.auth_enabled, admin_created=admin_created)
- except HTTPException:
- raise
- except Exception as e:
- logger.error("Setup error: %s", e, exc_info=True)
- await db.rollback()
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Setup failed: {str(e)}",
- )
- @router.get("/status")
- async def get_auth_status(db: AsyncSession = Depends(get_db)):
- """Get authentication status (public endpoint)."""
- auth_enabled = await is_auth_enabled(db)
- setup_completed = await is_setup_completed(db)
- # Only require setup if it hasn't been completed yet
- requires_setup = not setup_completed
- return {"auth_enabled": auth_enabled, "requires_setup": requires_setup}
- @router.post("/disable", response_model=dict)
- async def disable_auth(
- current_user: User = Depends(get_current_active_user),
- db: AsyncSession = Depends(get_db),
- ):
- """Disable authentication (admin only)."""
- import logging
- logger = logging.getLogger(__name__)
- # Reload user with groups for proper is_admin check
- result = await db.execute(select(User).where(User.id == current_user.id).options(selectinload(User.groups)))
- user = result.scalar_one()
- # Only admins can disable authentication
- if not user.is_admin:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Only admins can disable authentication",
- )
- try:
- await set_auth_enabled(db, False)
- await db.commit()
- logger.info("Authentication disabled by admin user: %s", user.username)
- return {"message": "Authentication disabled successfully", "auth_enabled": False}
- except Exception as e:
- await db.rollback()
- logger.error("Failed to disable authentication: %s", e, exc_info=True)
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to disable authentication: {str(e)}",
- )
- @router.post("/login", response_model=LoginResponse)
- async def login(request: LoginRequest, db: AsyncSession = Depends(get_db)):
- """Login and get access token.
- Supports username or email-based login. Username lookup is case-insensitive.
- """
- # Check if auth is enabled
- auth_enabled = await is_auth_enabled(db)
- if not auth_enabled:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Authentication is not enabled",
- )
- # Try username-based authentication first
- user = await authenticate_user(db, request.username, request.password)
- # If username auth failed and advanced auth is enabled, try email-based authentication
- if not user:
- advanced_auth = await is_advanced_auth_enabled(db)
- if advanced_auth:
- user = await authenticate_user_by_email(db, request.username, request.password)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Reload user with groups for proper permission calculation
- result = await db.execute(select(User).where(User.id == user.id).options(selectinload(User.groups)))
- user = result.scalar_one()
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
- return LoginResponse(
- access_token=access_token,
- token_type="bearer",
- user=_user_to_response(user),
- )
- @router.get("/me", response_model=UserResponse)
- async def get_current_user_info(
- credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)] = None,
- x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
- db: AsyncSession = Depends(get_db),
- ):
- """Get current user information.
- Accepts JWT tokens (via Authorization: Bearer header) and API keys
- (via X-API-Key header or Authorization: Bearer bb_xxx).
- API keys return a synthetic admin user with all permissions.
- """
- import jwt
- from jwt.exceptions import PyJWTError as JWTError
- # Check for API key via X-API-Key header
- if x_api_key:
- api_key = await _validate_api_key(db, x_api_key)
- if api_key:
- return _api_key_to_user_response(api_key)
- # Check for Bearer token (could be JWT or API key)
- if credentials is not None:
- token = credentials.credentials
- # Check if it's an API key (starts with bb_)
- if token.startswith("bb_"):
- api_key = await _validate_api_key(db, token)
- if api_key:
- return _api_key_to_user_response(api_key)
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API key",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Otherwise treat as JWT
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- except JWTError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- user = await get_user_by_username(db, username)
- if user is None or not user.is_active:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Reload with groups for proper permission calculation
- result = await db.execute(select(User).where(User.id == user.id).options(selectinload(User.groups)))
- user = result.scalar_one()
- return _user_to_response(user)
- # No credentials provided
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- @router.post("/logout")
- async def logout():
- """Logout (client should discard token)."""
- return {"message": "Logged out successfully"}
- # Advanced Authentication Endpoints
- @router.post("/smtp/test", response_model=TestSMTPResponse)
- async def test_smtp_connection(
- test_request: TestSMTPRequest,
- current_user: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Test SMTP connection using saved settings (admin only when auth enabled)."""
- import logging
- logger = logging.getLogger(__name__)
- try:
- smtp_settings = await get_smtp_settings(db)
- if not smtp_settings:
- return TestSMTPResponse(success=False, message="SMTP settings not configured. Save SMTP settings first.")
- # Send test email
- send_email(
- smtp_settings=smtp_settings,
- to_email=test_request.test_recipient,
- subject="BamBuddy SMTP Test",
- body_text="This is a test email from BamBuddy. If you received this, your SMTP settings are working correctly!",
- body_html="<p>This is a test email from <strong>BamBuddy</strong>.</p><p>If you received this, your SMTP settings are working correctly!</p>",
- )
- logger.info(f"Test email sent successfully to {test_request.test_recipient}")
- return TestSMTPResponse(success=True, message="Test email sent successfully")
- except Exception as e:
- logger.error(f"Failed to send test email: {e}")
- return TestSMTPResponse(success=False, message=f"Failed to send test email: {str(e)}")
- @router.get("/smtp", response_model=SMTPSettings | None)
- async def get_smtp_config(
- current_user: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get SMTP settings (admin only when auth enabled). Password is not returned."""
- smtp_settings = await get_smtp_settings(db)
- if smtp_settings:
- # Don't return password in response
- smtp_settings.smtp_password = None
- return smtp_settings
- @router.post("/smtp", response_model=dict)
- async def save_smtp_config(
- smtp_settings: SMTPSettings,
- current_user: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Save SMTP settings (admin only when auth enabled)."""
- import logging
- logger = logging.getLogger(__name__)
- try:
- await save_smtp_settings(db, smtp_settings)
- await db.commit()
- logger.info(f"SMTP settings updated by admin user: {current_user.username if current_user else 'anonymous'}")
- return {"message": "SMTP settings saved successfully"}
- except Exception as e:
- await db.rollback()
- logger.error(f"Failed to save SMTP settings: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to save SMTP settings: {str(e)}",
- )
- @router.post("/advanced-auth/enable", response_model=dict)
- async def enable_advanced_auth(
- current_user: User = Depends(get_current_active_user),
- db: AsyncSession = Depends(get_db),
- ):
- """Enable advanced authentication (admin only).
- Requires SMTP settings to be configured and tested first.
- """
- import logging
- logger = logging.getLogger(__name__)
- # Reload user with groups for proper is_admin check
- result = await db.execute(select(User).where(User.id == current_user.id).options(selectinload(User.groups)))
- user = result.scalar_one()
- if not user.is_admin:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Only admins can enable advanced authentication",
- )
- # Verify SMTP settings are configured
- smtp_settings = await get_smtp_settings(db)
- if not smtp_settings:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="SMTP settings must be configured before enabling advanced authentication",
- )
- try:
- await set_advanced_auth_enabled(db, True)
- await db.commit()
- logger.info(f"Advanced authentication enabled by admin user: {user.username}")
- return {"message": "Advanced authentication enabled successfully", "advanced_auth_enabled": True}
- except Exception as e:
- await db.rollback()
- logger.error(f"Failed to enable advanced authentication: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to enable advanced authentication: {str(e)}",
- )
- @router.post("/advanced-auth/disable", response_model=dict)
- async def disable_advanced_auth(
- current_user: User = Depends(get_current_active_user),
- db: AsyncSession = Depends(get_db),
- ):
- """Disable advanced authentication (admin only)."""
- import logging
- logger = logging.getLogger(__name__)
- # Reload user with groups for proper is_admin check
- result = await db.execute(select(User).where(User.id == current_user.id).options(selectinload(User.groups)))
- user = result.scalar_one()
- if not user.is_admin:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Only admins can disable advanced authentication",
- )
- try:
- await set_advanced_auth_enabled(db, False)
- await db.commit()
- logger.info(f"Advanced authentication disabled by admin user: {user.username}")
- return {"message": "Advanced authentication disabled successfully", "advanced_auth_enabled": False}
- except Exception as e:
- await db.rollback()
- logger.error(f"Failed to disable advanced authentication: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to disable advanced authentication: {str(e)}",
- )
- @router.get("/advanced-auth/status")
- async def get_advanced_auth_status(db: AsyncSession = Depends(get_db)):
- """Get advanced authentication status."""
- advanced_auth_enabled = await is_advanced_auth_enabled(db)
- smtp_configured = await get_smtp_settings(db) is not None
- return {
- "advanced_auth_enabled": advanced_auth_enabled,
- "smtp_configured": smtp_configured,
- }
- @router.post("/forgot-password", response_model=ForgotPasswordResponse)
- async def forgot_password(request: ForgotPasswordRequest, db: AsyncSession = Depends(get_db)):
- """Request password reset via email (advanced auth only)."""
- import logging
- logger = logging.getLogger(__name__)
- # Check if advanced auth is enabled
- advanced_auth = await is_advanced_auth_enabled(db)
- if not advanced_auth:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Advanced authentication is not enabled",
- )
- # Get SMTP settings
- smtp_settings = await get_smtp_settings(db)
- if not smtp_settings:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Email service is not configured",
- )
- # Find user by email
- user = await get_user_by_email(db, request.email)
- # Always return success message to prevent email enumeration
- # but only send email if user exists
- if user and user.is_active:
- try:
- # Generate new password
- new_password = generate_secure_password()
- user.password_hash = get_password_hash(new_password)
- await db.commit()
- login_url = await get_external_login_url(db)
- # Send password reset email
- subject, text_body, html_body = await create_password_reset_email_from_template(
- db, user.username, new_password, login_url
- )
- send_email(smtp_settings, user.email, subject, text_body, html_body)
- logger.info(f"Password reset email sent to {user.email}")
- except Exception as e:
- logger.error(f"Failed to send password reset email: {e}")
- # Don't reveal error to user for security
- return ForgotPasswordResponse(
- message="If the email address is associated with an account, a password reset email has been sent."
- )
- @router.post("/reset-password", response_model=ResetPasswordResponse)
- async def reset_user_password(
- request: ResetPasswordRequest,
- current_user: User = Depends(get_current_active_user),
- db: AsyncSession = Depends(get_db),
- ):
- """Reset a user's password and send them an email (admin only, advanced auth only)."""
- import logging
- logger = logging.getLogger(__name__)
- # Reload user with groups for proper is_admin check
- result = await db.execute(select(User).where(User.id == current_user.id).options(selectinload(User.groups)))
- admin_user = result.scalar_one()
- if not admin_user.is_admin:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Only admins can reset user passwords",
- )
- # Check if advanced auth is enabled
- advanced_auth = await is_advanced_auth_enabled(db)
- if not advanced_auth:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Advanced authentication is not enabled",
- )
- # Get SMTP settings
- smtp_settings = await get_smtp_settings(db)
- if not smtp_settings:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Email service is not configured",
- )
- # Find user to reset
- result = await db.execute(select(User).where(User.id == request.user_id))
- user = result.scalar_one_or_none()
- if not user:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="User not found",
- )
- if not user.email:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="User does not have an email address configured",
- )
- try:
- # Generate new password
- new_password = generate_secure_password()
- user.password_hash = get_password_hash(new_password)
- await db.commit()
- login_url = await get_external_login_url(db)
- # Send password reset email
- subject, text_body, html_body = await create_password_reset_email_from_template(
- db, user.username, new_password, login_url
- )
- send_email(smtp_settings, user.email, subject, text_body, html_body)
- logger.info(f"Password reset by admin {admin_user.username} for user {user.username}")
- return ResetPasswordResponse(message=f"Password reset email sent to {user.email}")
- except Exception as e:
- await db.rollback()
- logger.error(f"Failed to reset password for user {user.username}: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to reset password: {str(e)}",
- )
|