"""Email service for sending authentication-related emails.""" from __future__ import annotations import html import logging import re import secrets import smtplib import string from datetime import datetime, timezone from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from backend.app.models.notification_template import NotificationTemplate from backend.app.models.settings import Settings from backend.app.schemas.auth import SMTPSettings logger = logging.getLogger(__name__) def generate_secure_password(length: int = 16) -> str: """Generate a secure random password. Args: length: Length of the password (default: 16) Returns: A secure random password containing uppercase, lowercase, digits, and special characters """ import random # Define character sets lowercase = string.ascii_lowercase uppercase = string.ascii_uppercase digits = string.digits special = "!@#$%^&*()_+-=[]{}|;:,.<>?" # Ensure at least one character from each set password_chars = [ secrets.choice(lowercase), secrets.choice(uppercase), secrets.choice(digits), secrets.choice(special), ] # Fill the rest with random characters from all sets all_chars = lowercase + uppercase + digits + special password_chars.extend(secrets.choice(all_chars) for _ in range(length - 4)) # Shuffle to avoid predictable patterns random.shuffle(password_chars) return "".join(password_chars) async def get_notification_template(db: AsyncSession, event_type: str) -> NotificationTemplate | None: """Get a notification template by event type from database. Args: db: Database session event_type: Type of event (e.g., 'user_created', 'password_reset') Returns: NotificationTemplate object or None if not found """ result = await db.execute(select(NotificationTemplate).where(NotificationTemplate.event_type == event_type)) return result.scalar_one_or_none() def render_template(template_str: str, variables: dict[str, Any]) -> str: """Render a template string with variables. Args: template_str: Template string with {variable} placeholders variables: Dictionary of variables to substitute Returns: Rendered template string """ result = template_str for key, value in variables.items(): result = result.replace("{" + key + "}", str(value) if value is not None else "") # Remove any remaining unreplaced placeholders (case-insensitive, alphanumeric + underscore) result = re.sub(r"\{[a-zA-Z0-9_]+\}", "", result) return result async def get_smtp_settings(db: AsyncSession) -> SMTPSettings | None: """Get SMTP settings from database. Args: db: Database session Returns: SMTPSettings object or None if not configured """ # Fetch all SMTP-related settings result = await db.execute( select(Settings).where( Settings.key.in_( [ "smtp_host", "smtp_port", "smtp_username", "smtp_password", "smtp_use_tls", "smtp_security", "smtp_auth_enabled", "smtp_from_email", "smtp_from_name", ] ) ) ) settings_dict = {s.key: s.value for s in result.scalars().all()} # Check if minimum required settings are present required_keys = ["smtp_host", "smtp_port", "smtp_from_email"] if not all(key in settings_dict for key in required_keys): return None # Handle migration: convert old smtp_use_tls to smtp_security if needed smtp_security = settings_dict.get("smtp_security") if not smtp_security: # Migrate from old smtp_use_tls format smtp_use_tls = settings_dict.get("smtp_use_tls", "true").lower() == "true" smtp_security = "starttls" if smtp_use_tls else "ssl" smtp_auth_enabled = settings_dict.get("smtp_auth_enabled", "true").lower() == "true" return SMTPSettings( smtp_host=settings_dict["smtp_host"], smtp_port=int(settings_dict["smtp_port"]), smtp_username=settings_dict.get("smtp_username"), smtp_password=settings_dict.get("smtp_password"), smtp_security=smtp_security, smtp_auth_enabled=smtp_auth_enabled, smtp_from_email=settings_dict["smtp_from_email"], smtp_from_name=settings_dict.get("smtp_from_name", "BamBuddy"), ) async def save_smtp_settings(db: AsyncSession, smtp_settings: SMTPSettings) -> None: """Save SMTP settings to database. Args: db: Database session smtp_settings: SMTP settings to save """ from sqlalchemy import func from sqlalchemy.dialects.sqlite import insert as sqlite_insert settings_data = { "smtp_host": smtp_settings.smtp_host, "smtp_port": str(smtp_settings.smtp_port), "smtp_security": smtp_settings.smtp_security, "smtp_auth_enabled": "true" if smtp_settings.smtp_auth_enabled else "false", "smtp_from_email": smtp_settings.smtp_from_email, "smtp_from_name": smtp_settings.smtp_from_name, } # Only save username if auth is enabled or if provided if smtp_settings.smtp_username: settings_data["smtp_username"] = smtp_settings.smtp_username # Only save password if provided if smtp_settings.smtp_password: settings_data["smtp_password"] = smtp_settings.smtp_password for key, value in settings_data.items(): stmt = sqlite_insert(Settings).values(key=key, value=value) stmt = stmt.on_conflict_do_update( index_elements=["key"], set_={"value": value, "updated_at": func.now()}, ) await db.execute(stmt) def send_email( smtp_settings: SMTPSettings, to_email: str, subject: str, body_text: str, body_html: str | None = None, ) -> None: """Send an email using SMTP. Args: smtp_settings: SMTP configuration to_email: Recipient email address subject: Email subject body_text: Plain text body body_html: Optional HTML body Raises: Exception: If email sending fails """ msg = MIMEMultipart("alternative") msg["From"] = f"{smtp_settings.smtp_from_name} <{smtp_settings.smtp_from_email}>" msg["To"] = to_email msg["Subject"] = subject # Attach plain text part msg.attach(MIMEText(body_text, "plain")) # Attach HTML part if provided if body_html: msg.attach(MIMEText(body_html, "html")) # Send email try: security = smtp_settings.smtp_security auth_enabled = smtp_settings.smtp_auth_enabled # Validate username is provided when authentication is enabled if auth_enabled and smtp_settings.smtp_password: if not smtp_settings.smtp_username: raise ValueError("SMTP username is required when authentication is enabled") if security == "ssl": # Direct SSL connection (typically port 465) with smtplib.SMTP_SSL(smtp_settings.smtp_host, smtp_settings.smtp_port, timeout=10) as server: if auth_enabled and smtp_settings.smtp_password and smtp_settings.smtp_username: server.login(smtp_settings.smtp_username, smtp_settings.smtp_password) server.send_message(msg) elif security == "starttls": # STARTTLS upgrade (typically port 587) with smtplib.SMTP(smtp_settings.smtp_host, smtp_settings.smtp_port, timeout=10) as server: server.starttls() if auth_enabled and smtp_settings.smtp_password and smtp_settings.smtp_username: server.login(smtp_settings.smtp_username, smtp_settings.smtp_password) server.send_message(msg) else: # No encryption (typically port 25) - use with caution with smtplib.SMTP(smtp_settings.smtp_host, smtp_settings.smtp_port, timeout=10) as server: if auth_enabled and smtp_settings.smtp_password and smtp_settings.smtp_username: server.login(smtp_settings.smtp_username, smtp_settings.smtp_password) server.send_message(msg) logger.info(f"Email sent successfully to {to_email}") except Exception as e: logger.error(f"Failed to send email to {to_email}: {e}") raise def create_welcome_email(username: str, password: str, login_url: str) -> tuple[str, str, str]: """Create welcome email content for new user. Args: username: Username of the new user password: Auto-generated password login_url: URL to login page Returns: Tuple of (subject, text_body, html_body) """ subject = "Welcome to BamBuddy - Your Account Details" text_body = f"""Welcome to BamBuddy! Your account has been created. Here are your login details: Username: {username} Password: {password} You can login at: {login_url} For security reasons, please change your password after your first login. Best regards, BamBuddy Team """ html_body = f"""
Your account has been created. Here are your login details:
Username: {username}
Password: {password}
Security Note: For security reasons, please change your password after your first login.
Best regards,
BamBuddy Team
Your BamBuddy password has been reset.
Username: {username}
New Password: {password}
⚠️ Security Alert: If you did not request this password reset, please contact your administrator immediately.
Security Note: For security reasons, please change your password after logging in.
Best regards,
BamBuddy Team
⚠️ Security Alert: If you did not request this password reset, please contact your administrator immediately.