| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761 |
- import io
- import logging
- import zipfile
- from datetime import datetime
- from pathlib import Path
- from fastapi import APIRouter, Depends, File, UploadFile
- from fastapi.responses import JSONResponse, StreamingResponse
- from sqlalchemy import delete, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.auth import RequirePermissionIfAuthEnabled
- from backend.app.core.config import settings as app_settings
- from backend.app.core.database import get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- from backend.app.schemas.settings import AppSettings, AppSettingsUpdate
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/settings", tags=["settings"])
- # Default settings
- DEFAULT_SETTINGS = AppSettings()
- async def get_setting(db: AsyncSession, key: str) -> str | None:
- """Get a single setting value by key."""
- result = await db.execute(select(Settings).where(Settings.key == key))
- setting = result.scalar_one_or_none()
- return setting.value if setting else None
- async def get_external_login_url(db: AsyncSession) -> str:
- """Get the external URL for the login page.
- Uses external_url from settings if available, otherwise falls back to APP_URL env var.
- Args:
- db: Database session
- Returns:
- Full URL to the login page
- """
- import os
- external_url = await get_setting(db, "external_url")
- if external_url:
- external_url = external_url.rstrip("/")
- else:
- external_url = os.environ.get("APP_URL", "http://localhost:5173")
- return external_url + "/login"
- async def set_setting(db: AsyncSession, key: str, value: str) -> None:
- """Set a single setting value."""
- from sqlalchemy import func
- from sqlalchemy.dialects.sqlite import insert as sqlite_insert
- # Use upsert (INSERT ... ON CONFLICT UPDATE) for reliability
- stmt = sqlite_insert(Settings).values(key=key, value=value)
- stmt = stmt.on_conflict_do_update(index_elements=["key"], set_={"value": value, "updated_at": func.now()})
- await db.execute(stmt)
- @router.get("", response_model=AppSettings)
- @router.get("/", response_model=AppSettings)
- async def get_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get all application settings."""
- settings_dict = DEFAULT_SETTINGS.model_dump()
- # Load saved settings from database
- result = await db.execute(select(Settings))
- db_settings = result.scalars().all()
- for setting in db_settings:
- if setting.key in settings_dict:
- # Parse the value based on the expected type
- if setting.key in [
- "auto_archive",
- "save_thumbnails",
- "capture_finish_photo",
- "spoolman_enabled",
- "spoolman_disable_weight_sync",
- "spoolman_report_partial_usage",
- "disable_filament_warnings",
- "check_updates",
- "check_printer_firmware",
- "include_beta_updates",
- "virtual_printer_enabled",
- "ftp_retry_enabled",
- "mqtt_enabled",
- "mqtt_use_tls",
- "ha_enabled",
- "per_printer_mapping_expanded",
- "prometheus_enabled",
- "user_notifications_enabled",
- "queue_drying_enabled",
- "queue_drying_block",
- "ambient_drying_enabled",
- ]:
- settings_dict[setting.key] = setting.value.lower() == "true"
- elif setting.key in [
- "default_filament_cost",
- "energy_cost_per_kwh",
- "ams_temp_good",
- "ams_temp_fair",
- "library_disk_warning_gb",
- "low_stock_threshold",
- ]:
- settings_dict[setting.key] = float(setting.value)
- elif setting.key in [
- "ams_humidity_good",
- "ams_humidity_fair",
- "ams_history_retention_days",
- "ftp_retry_count",
- "ftp_retry_delay",
- "ftp_timeout",
- "mqtt_port",
- ]:
- settings_dict[setting.key] = int(setting.value)
- elif setting.key == "default_printer_id":
- # Handle nullable integer
- settings_dict[setting.key] = int(setting.value) if setting.value and setting.value != "None" else None
- else:
- settings_dict[setting.key] = setting.value
- # Get Home Assistant settings (with environment variable overrides)
- ha_settings = await get_homeassistant_settings(db)
- settings_dict.update(ha_settings)
- return AppSettings(**settings_dict)
- @router.put("/", response_model=AppSettings)
- async def update_settings(
- settings_update: AppSettingsUpdate,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Update application settings."""
- update_data = settings_update.model_dump(exclude_unset=True)
- # Check if any MQTT settings are being updated
- mqtt_keys = {
- "mqtt_enabled",
- "mqtt_broker",
- "mqtt_port",
- "mqtt_username",
- "mqtt_password",
- "mqtt_topic_prefix",
- "mqtt_use_tls",
- }
- mqtt_updated = bool(mqtt_keys & set(update_data.keys()))
- for key, value in update_data.items():
- # Convert value to string for storage
- if isinstance(value, bool):
- str_value = "true" if value else "false"
- elif value is None:
- str_value = "None"
- else:
- str_value = str(value)
- await set_setting(db, key, str_value)
- await db.commit()
- # Expire all objects to ensure fresh reads after commit
- db.expire_all()
- # Reconfigure MQTT relay if any MQTT settings changed
- if mqtt_updated:
- try:
- from backend.app.services.mqtt_relay import mqtt_relay
- mqtt_settings = {
- "mqtt_enabled": (await get_setting(db, "mqtt_enabled") or "false") == "true",
- "mqtt_broker": await get_setting(db, "mqtt_broker") or "",
- "mqtt_port": int(await get_setting(db, "mqtt_port") or "1883"),
- "mqtt_username": await get_setting(db, "mqtt_username") or "",
- "mqtt_password": await get_setting(db, "mqtt_password") or "",
- "mqtt_topic_prefix": await get_setting(db, "mqtt_topic_prefix") or "bambuddy",
- "mqtt_use_tls": (await get_setting(db, "mqtt_use_tls") or "false") == "true",
- }
- await mqtt_relay.configure(mqtt_settings)
- except Exception:
- pass # Don't fail the settings update if MQTT reconfiguration fails
- # Return updated settings
- return await get_settings(db)
- @router.patch("/", response_model=AppSettings)
- @router.patch("", response_model=AppSettings)
- async def patch_settings(
- settings_update: AppSettingsUpdate,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Partially update application settings (same as PUT, for REST compatibility)."""
- return await update_settings(settings_update, db, _)
- @router.post("/reset", response_model=AppSettings)
- async def reset_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Reset all settings to defaults."""
- # Delete all settings
- result = await db.execute(select(Settings))
- for setting in result.scalars().all():
- await db.delete(setting)
- await db.commit()
- return DEFAULT_SETTINGS
- @router.get("/default-sidebar-order")
- async def get_default_sidebar_order(
- db: AsyncSession = Depends(get_db),
- ):
- """Get the admin-set default sidebar order.
- Intentionally unauthenticated: non-admin users need to read this value to apply
- the default sidebar order, but may lack SETTINGS_READ permission.
- The value is non-sensitive (sidebar item IDs only).
- """
- value = await get_setting(db, "default_sidebar_order")
- return {"default_sidebar_order": value or ""}
- @router.get("/check-ffmpeg")
- async def check_ffmpeg():
- """Check if ffmpeg is installed and available."""
- from backend.app.services.camera import get_ffmpeg_path
- ffmpeg_path = get_ffmpeg_path()
- return {
- "installed": ffmpeg_path is not None,
- "path": ffmpeg_path,
- }
- @router.get("/spoolman")
- async def get_spoolman_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get Spoolman integration settings."""
- spoolman_enabled = await get_setting(db, "spoolman_enabled") or "false"
- spoolman_url = await get_setting(db, "spoolman_url") or ""
- spoolman_sync_mode = await get_setting(db, "spoolman_sync_mode") or "auto"
- spoolman_disable_weight_sync = await get_setting(db, "spoolman_disable_weight_sync") or "false"
- spoolman_report_partial_usage = await get_setting(db, "spoolman_report_partial_usage") or "true"
- return {
- "spoolman_enabled": spoolman_enabled,
- "spoolman_url": spoolman_url,
- "spoolman_sync_mode": spoolman_sync_mode,
- "spoolman_disable_weight_sync": spoolman_disable_weight_sync,
- "spoolman_report_partial_usage": spoolman_report_partial_usage,
- }
- @router.put("/spoolman")
- async def update_spoolman_settings(
- settings: dict,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Update Spoolman integration settings."""
- if "spoolman_enabled" in settings:
- old_val = await get_setting(db, "spoolman_enabled") or "false"
- new_val = settings["spoolman_enabled"]
- await set_setting(db, "spoolman_enabled", new_val)
- # Switching to Spoolman: clear built-in inventory slot assignments
- if old_val.lower() != "true" and new_val.lower() == "true":
- from backend.app.models.spool_assignment import SpoolAssignment
- result = await db.execute(delete(SpoolAssignment))
- logger.info("Cleared %d spool assignments on switch to Spoolman mode", result.rowcount)
- if "spoolman_url" in settings:
- await set_setting(db, "spoolman_url", settings["spoolman_url"])
- if "spoolman_sync_mode" in settings:
- await set_setting(db, "spoolman_sync_mode", settings["spoolman_sync_mode"])
- if "spoolman_disable_weight_sync" in settings:
- await set_setting(db, "spoolman_disable_weight_sync", settings["spoolman_disable_weight_sync"])
- if "spoolman_report_partial_usage" in settings:
- await set_setting(db, "spoolman_report_partial_usage", settings["spoolman_report_partial_usage"])
- await db.commit()
- db.expire_all()
- # Return updated settings
- return await get_spoolman_settings(db)
- async def get_homeassistant_settings(db: AsyncSession) -> dict:
- """
- Get Home Assistant integration settings.
- Environment variables (HA_URL, HA_TOKEN) take precedence over database settings.
- """
- import os
- # Check environment variables first
- ha_url_env = os.environ.get("HA_URL")
- ha_token_env = os.environ.get("HA_TOKEN")
- # Fall back to database values
- ha_url = ha_url_env or await get_setting(db, "ha_url") or ""
- ha_token = ha_token_env or await get_setting(db, "ha_token") or ""
- ha_enabled_db = await get_setting(db, "ha_enabled") or "false"
- # Track which settings come from environment
- ha_url_from_env = bool(ha_url_env)
- ha_token_from_env = bool(ha_token_env)
- ha_env_managed = ha_url_from_env and ha_token_from_env
- # Auto-enable when both env vars are set, otherwise use database value
- if ha_url_env and ha_token_env:
- ha_enabled = True
- else:
- ha_enabled = ha_enabled_db.lower() == "true"
- return {
- "ha_enabled": ha_enabled,
- "ha_url": ha_url,
- "ha_token": ha_token,
- "ha_url_from_env": ha_url_from_env,
- "ha_token_from_env": ha_token_from_env,
- "ha_env_managed": ha_env_managed,
- }
- @router.get("/backup")
- async def create_backup(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_BACKUP),
- ):
- """Create a complete backup (database + all files) as a ZIP.
- This is a simplified backup that includes the entire SQLite database
- and all data directories. It is complete by definition and cannot miss data.
- """
- import shutil
- import tempfile
- from sqlalchemy import text
- from backend.app.core.database import engine
- try:
- base_dir = app_settings.base_dir
- db_path = Path(app_settings.database_url.replace("sqlite+aiosqlite:///", ""))
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_path = Path(temp_dir)
- # 1. Checkpoint WAL to ensure all data is in main db file
- async with engine.begin() as conn:
- await conn.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
- # 2. Copy database file
- shutil.copy2(db_path, temp_path / "bambuddy.db")
- # 3. Copy data directories (if they exist)
- dirs_to_backup = [
- ("archive", base_dir / "archive"),
- ("virtual_printer", base_dir / "virtual_printer"),
- ("plate_calibration", app_settings.plate_calibration_dir),
- ("icons", base_dir / "icons"),
- ("projects", base_dir / "projects"),
- ]
- for name, src_dir in dirs_to_backup:
- if src_dir.exists() and any(src_dir.iterdir()):
- try:
- shutil.copytree(src_dir, temp_path / name)
- except shutil.Error as e:
- # Some files may have restricted permissions (e.g., SSL keys)
- # Log the error but continue with partial backup
- logger.warning("Some files in %s could not be copied: %s", name, e)
- except PermissionError as e:
- logger.warning("Permission denied copying %s: %s", name, e)
- # 4. Create ZIP
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- for file_path in temp_path.rglob("*"):
- if file_path.is_file():
- arcname = file_path.relative_to(temp_path)
- zf.write(file_path, arcname)
- zip_buffer.seek(0)
- filename = f"bambuddy-backup-{datetime.now().strftime('%Y%m%d-%H%M%S')}.zip"
- return StreamingResponse(
- zip_buffer,
- media_type="application/zip",
- headers={"Content-Disposition": f"attachment; filename={filename}"},
- )
- except Exception as e:
- logger.error("Backup failed: %s", e, exc_info=True)
- return JSONResponse(
- status_code=500,
- content={"success": False, "message": "Backup failed. Check server logs for details."},
- )
- @router.post("/restore")
- async def restore_backup(
- file: UploadFile = File(...),
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_RESTORE),
- ):
- """Restore from a complete backup ZIP.
- This is a simplified restore that replaces the database and all data directories
- from the backup ZIP. Requires a restart after restore.
- """
- import shutil
- import tempfile
- from fastapi import HTTPException
- from backend.app.core.database import close_all_connections, init_db, reinitialize_database
- from backend.app.services.virtual_printer import virtual_printer_manager
- base_dir = app_settings.base_dir
- db_path = Path(app_settings.database_url.replace("sqlite+aiosqlite:///", ""))
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_path = Path(temp_dir)
- # 1. Read and extract ZIP
- content = await file.read()
- # Check if it's a valid ZIP
- if not file.filename or not file.filename.endswith(".zip"):
- raise HTTPException(400, "Invalid backup file: must be a .zip file")
- try:
- with zipfile.ZipFile(io.BytesIO(content), "r") as zf:
- zf.extractall(temp_path)
- except zipfile.BadZipFile:
- raise HTTPException(400, "Invalid backup file: not a valid ZIP")
- # 2. Validate backup (must have database)
- backup_db = temp_path / "bambuddy.db"
- if not backup_db.exists():
- raise HTTPException(400, "Invalid backup: missing bambuddy.db")
- try:
- import asyncio
- # 3. Stop virtual printer if running (releases file locks)
- try:
- if virtual_printer_manager.is_enabled:
- logger.info("Stopping virtual printer for restore...")
- await virtual_printer_manager.configure(enabled=False)
- # Give it time to fully release file handles
- await asyncio.sleep(1)
- except Exception as e:
- logger.warning("Failed to stop virtual printer: %s", e)
- # 4. Close current database connections
- logger.info("Closing database connections...")
- await close_all_connections()
- # 5. Replace database
- logger.info("Restoring database from backup...")
- shutil.copy2(backup_db, db_path)
- # 6. Replace data directories
- # For Docker compatibility: clear contents then copy (don't delete mount points)
- dirs_to_restore = [
- ("archive", base_dir / "archive"),
- ("virtual_printer", base_dir / "virtual_printer"),
- ("plate_calibration", app_settings.plate_calibration_dir),
- ("icons", base_dir / "icons"),
- ("projects", base_dir / "projects"),
- ]
- skipped_dirs = []
- for name, dest_dir in dirs_to_restore:
- src_dir = temp_path / name
- if src_dir.exists():
- logger.info("Restoring %s directory...", name)
- try:
- # Clear destination contents (not the dir itself - may be Docker mount)
- if dest_dir.exists():
- for item in dest_dir.iterdir():
- try:
- if item.is_dir():
- shutil.rmtree(item)
- else:
- item.unlink()
- except OSError as e:
- logger.warning("Could not delete %s: %s", item, e)
- else:
- dest_dir.mkdir(parents=True, exist_ok=True)
- # Copy contents from backup
- for item in src_dir.iterdir():
- dest_item = dest_dir / item.name
- if item.is_dir():
- shutil.copytree(item, dest_item)
- else:
- shutil.copy2(item, dest_item)
- except OSError as e:
- logger.warning("Could not restore %s directory: %s", name, e)
- skipped_dirs.append(name)
- # 7. Reinitialize the database engine and apply schema migrations so that
- # tables added after the backup was created (e.g. ams_labels) exist
- # immediately, without requiring a manual restart.
- await reinitialize_database()
- await init_db()
- logger.info("Restore complete - restart required")
- message = "Backup restored successfully. Please restart Bambuddy for changes to take effect."
- if skipped_dirs:
- message += f" Note: Some directories could not be restored ({', '.join(skipped_dirs)})."
- return {
- "success": True,
- "message": message,
- }
- except Exception as e:
- logger.error("Restore failed: %s", e, exc_info=True)
- return JSONResponse(
- status_code=500,
- content={"success": False, "message": "Restore failed. Check server logs for details."},
- )
- @router.get("/network-interfaces")
- async def get_network_interfaces(
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get available network interfaces with all IPs (primary + aliases)."""
- from backend.app.services.network_utils import get_all_interface_ips
- interfaces = get_all_interface_ips()
- return {"interfaces": interfaces}
- @router.get("/virtual-printer/models")
- async def get_virtual_printer_models(
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get available virtual printer models."""
- from backend.app.services.virtual_printer import (
- DEFAULT_VIRTUAL_PRINTER_MODEL,
- VIRTUAL_PRINTER_MODELS,
- )
- return {
- "models": VIRTUAL_PRINTER_MODELS,
- "default": DEFAULT_VIRTUAL_PRINTER_MODEL,
- }
- @router.get("/virtual-printer")
- async def get_virtual_printer_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get virtual printer settings and status."""
- from backend.app.services.virtual_printer import (
- DEFAULT_VIRTUAL_PRINTER_MODEL,
- virtual_printer_manager,
- )
- enabled = await get_setting(db, "virtual_printer_enabled")
- access_code = await get_setting(db, "virtual_printer_access_code")
- mode = await get_setting(db, "virtual_printer_mode")
- model = await get_setting(db, "virtual_printer_model")
- target_printer_id = await get_setting(db, "virtual_printer_target_printer_id")
- remote_interface_ip = await get_setting(db, "virtual_printer_remote_interface_ip")
- return {
- "enabled": enabled == "true" if enabled else False,
- "access_code_set": bool(access_code),
- "mode": mode or "immediate",
- "model": model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- "target_printer_id": int(target_printer_id) if target_printer_id else None,
- "remote_interface_ip": remote_interface_ip or "",
- "status": virtual_printer_manager.get_status(),
- }
- @router.put("/virtual-printer")
- async def update_virtual_printer_settings(
- enabled: bool = None,
- access_code: str = None,
- mode: str = None,
- model: str = None,
- target_printer_id: int = None,
- remote_interface_ip: str = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Update virtual printer settings and restart services if needed.
- For proxy mode with SSDP proxy (dual-homed setup):
- - remote_interface_ip: IP of interface on slicer's network (LAN B)
- - Local interface is auto-detected based on target printer IP
- """
- from sqlalchemy import select
- from backend.app.models.printer import Printer
- from backend.app.services.virtual_printer import (
- DEFAULT_VIRTUAL_PRINTER_MODEL,
- VIRTUAL_PRINTER_MODELS,
- virtual_printer_manager,
- )
- # Get current values
- current_enabled = await get_setting(db, "virtual_printer_enabled") == "true"
- current_access_code = await get_setting(db, "virtual_printer_access_code") or ""
- current_mode = await get_setting(db, "virtual_printer_mode") or "immediate"
- current_model = await get_setting(db, "virtual_printer_model") or DEFAULT_VIRTUAL_PRINTER_MODEL
- current_target_id_str = await get_setting(db, "virtual_printer_target_printer_id")
- current_target_id = int(current_target_id_str) if current_target_id_str else None
- current_remote_iface = await get_setting(db, "virtual_printer_remote_interface_ip") or ""
- # Apply updates
- new_enabled = enabled if enabled is not None else current_enabled
- new_access_code = access_code if access_code is not None else current_access_code
- new_mode = mode if mode is not None else current_mode
- new_model = model if model is not None else current_model
- new_target_id = target_printer_id if target_printer_id is not None else current_target_id
- new_remote_iface = remote_interface_ip if remote_interface_ip is not None else current_remote_iface
- # Validate mode
- # "review" is the new name for "queue" (pending review before archiving)
- # "print_queue" archives and adds to print queue (unassigned)
- # "proxy" is transparent TCP proxy to a real printer
- if new_mode not in ("immediate", "queue", "review", "print_queue", "proxy"):
- return JSONResponse(
- status_code=400,
- content={"detail": "Mode must be 'immediate', 'review', 'print_queue', or 'proxy'"},
- )
- # Normalize legacy "queue" to "review" for storage
- if new_mode == "queue":
- new_mode = "review"
- # Validate model
- if model is not None and model not in VIRTUAL_PRINTER_MODELS:
- return JSONResponse(
- status_code=400,
- content={"detail": f"Invalid model. Must be one of: {', '.join(VIRTUAL_PRINTER_MODELS.keys())}"},
- )
- # Mode-specific validation and printer lookup
- target_printer_ip = ""
- target_printer_serial = ""
- if new_mode == "proxy":
- # Proxy mode requires target printer when enabling
- if new_enabled and not new_target_id:
- # If just switching to proxy mode (not explicitly enabling), auto-disable
- if enabled is None:
- new_enabled = False
- else:
- return JSONResponse(
- status_code=400,
- content={"detail": "Target printer is required for proxy mode"},
- )
- # Look up printer IP and serial if we have a target
- if new_target_id:
- result = await db.execute(select(Printer).where(Printer.id == new_target_id))
- printer = result.scalar_one_or_none()
- if not printer:
- return JSONResponse(
- status_code=400,
- content={"detail": f"Printer with ID {new_target_id} not found"},
- )
- target_printer_ip = printer.ip_address
- target_printer_serial = printer.serial_number
- # Access code not required for proxy mode
- else:
- # Non-proxy modes require access code when enabling
- if new_enabled and not new_access_code:
- # If just switching modes (not explicitly enabling), auto-disable
- if enabled is None:
- new_enabled = False
- else:
- return JSONResponse(
- status_code=400,
- content={"detail": "Access code is required when enabling virtual printer"},
- )
- # Validate access code length (Bambu Studio requires exactly 8 characters)
- if access_code is not None and access_code and len(access_code) != 8:
- return JSONResponse(
- status_code=400,
- content={"detail": "Access code must be exactly 8 characters"},
- )
- # Save settings
- await set_setting(db, "virtual_printer_enabled", "true" if new_enabled else "false")
- if access_code is not None:
- await set_setting(db, "virtual_printer_access_code", access_code)
- await set_setting(db, "virtual_printer_mode", new_mode)
- if model is not None:
- await set_setting(db, "virtual_printer_model", model)
- if target_printer_id is not None:
- await set_setting(db, "virtual_printer_target_printer_id", str(target_printer_id))
- if remote_interface_ip is not None:
- await set_setting(db, "virtual_printer_remote_interface_ip", remote_interface_ip)
- await db.commit()
- db.expire_all()
- # Reconfigure virtual printer
- try:
- await virtual_printer_manager.configure(
- enabled=new_enabled,
- access_code=new_access_code,
- mode=new_mode,
- model=new_model,
- target_printer_ip=target_printer_ip,
- target_printer_serial=target_printer_serial,
- remote_interface_ip=new_remote_iface,
- )
- except ValueError as e:
- logger.warning("Virtual printer configuration validation error: %s", e)
- return JSONResponse(
- status_code=400,
- content={"detail": "Invalid virtual printer configuration. Check the provided values."},
- )
- except Exception as e:
- logger.error("Failed to configure virtual printer: %s", e, exc_info=True)
- return JSONResponse(
- status_code=500,
- content={"detail": "Failed to configure virtual printer. Check server logs for details."},
- )
- return await get_virtual_printer_settings(db)
- # =============================================================================
- # MQTT Relay Settings
- # =============================================================================
- @router.get("/mqtt/status")
- async def get_mqtt_status(
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get MQTT relay connection status."""
- from backend.app.services.mqtt_relay import mqtt_relay
- return mqtt_relay.get_status()
|