""" Bambu Lab Cloud API Routes Handles authentication and profile management with Bambu Cloud. """ import json import logging from pathlib import Path from typing import Literal from fastapi import APIRouter, Body, Depends, Header, HTTPException, Request from fastapi.security import HTTPAuthorizationCredentials from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from backend.app.core.auth import ( RequirePermissionIfAuthEnabled, _user_from_api_key, _validate_api_key, require_permission_if_auth_enabled, security, ) from backend.app.core.database import get_db from backend.app.core.permissions import Permission from backend.app.models.api_key import APIKey from backend.app.models.settings import Settings from backend.app.models.user import User from backend.app.schemas.cloud import ( CloudAuthStatus, CloudDevice, CloudLoginRequest, CloudLoginResponse, CloudTokenRequest, CloudVerifyRequest, FirmwareUpdateInfo, FirmwareUpdatesResponse, SlicerSetting, SlicerSettingCreate, SlicerSettingDeleteResponse, SlicerSettingsResponse, SlicerSettingUpdate, ) from backend.app.services.bambu_cloud import ( BambuCloudAuthError, BambuCloudError, BambuCloudService, ) from backend.app.utils.filament_ids import filament_id_to_setting_id logger = logging.getLogger(__name__) async def _cloud_api_key_gate( request: Request, credentials: HTTPAuthorizationCredentials | None = Depends(security), x_api_key: str | None = Header(default=None, alias="X-API-Key"), db: AsyncSession = Depends(get_db), ) -> None: """Router-level dependency: enforce API-key cloud-access fences (#1182). Runs before every /cloud/* handler. JWT-authed and anonymous callers are no-ops — their access is gated by the per-route ``Permission.CLOUD_AUTH`` / ``Permission.FILAMENTS_READ`` / etc. dependency. API-keyed callers must have an owner and ``can_access_cloud=True``; legacy ownerless keys and keys without the cloud scope are rejected here. On a successful API-keyed request the owner User is stashed on ``request.state.api_key_owner`` so route handlers can resolve it via ``cloud_caller`` (the auth gate returns None for API keys to avoid a wider behaviour change in non-cloud routes — see auth.py). The dep duplicates the API-key validation done by the regular auth gate (which runs as a route-level dep, *after* router-level deps). The cost is one extra ``SELECT FROM api_keys`` per /cloud/* request — bounded and cheap (key_prefix is indexed). """ api_key_value: str | None = None if x_api_key: api_key_value = x_api_key elif credentials and credentials.credentials.startswith("bb_"): api_key_value = credentials.credentials if api_key_value is None: return # JWT or anonymous — no-op api_key = await _validate_api_key(db, api_key_value) if api_key is None: # Invalid key — let the route-level auth gate produce the 401 so the # error matches what every other route returns for a bad key. return _assert_api_key_can_access_cloud(api_key) # All fences passed. Stash the owner so cloud routes can resolve their # caller User without going through the auth gate (which intentionally # returns None for API keys to keep #1182 surface-bounded to /cloud/*). request.state.api_key_owner = await _user_from_api_key(db, api_key) def cloud_caller(*permissions: Permission): """Route-level dep factory for /cloud/* handlers. Returns a Depends that resolves to: - the JWT-authenticated User (when a JWT is present and the route's permission set is satisfied), OR - the API-key owner User stashed by the router-level gate (``request.state.api_key_owner``), OR - None when auth is disabled. Replaces the direct ``RequirePermissionIfAuthEnabled(...)`` dep on cloud routes so API-keyed callers get the *owner* in ``current_user`` rather than None — without that the route falls back to the global Settings cloud_token, which is empty in auth-enabled deployments. """ base_dep = require_permission_if_auth_enabled(*permissions) async def resolved( request: Request, base_user: User | None = Depends(base_dep), ) -> User | None: if base_user is not None: return base_user return getattr(request.state, "api_key_owner", None) return Depends(resolved) async def resolve_api_key_cloud_owner( credentials: HTTPAuthorizationCredentials | None = Depends(security), x_api_key: str | None = Header(default=None, alias="X-API-Key"), db: AsyncSession = Depends(get_db), ) -> User | None: """Route-level dep for non-/cloud/* endpoints that need to read the caller's stored Bambu Cloud token (e.g. the slice path resolving cloud presets — #1182 follow-up). Returns the API key's owner User when the caller is an API-keyed request *and* the key has ``can_access_cloud=True``; returns None for JWT, anonymous, or API keys without the cloud scope. The caller is expected to fall back to the JWT-authed ``current_user`` first and use this dep's result only when ``current_user`` is None. Unlike ``_cloud_api_key_gate`` (which 403s legacy/non-cloud keys at the router level), this dep is permissive: it returns None instead of raising, so a slice request via an API key without cloud scope still runs against local presets. The downstream cloud-token check in ``preset_resolver._resolve_cloud`` produces the right 400 if the request actually selects a cloud preset. """ api_key_value: str | None = None if x_api_key: api_key_value = x_api_key elif credentials and credentials.credentials.startswith("bb_"): api_key_value = credentials.credentials if api_key_value is None: return None api_key = await _validate_api_key(db, api_key_value) if api_key is None or api_key.user_id is None or not api_key.can_access_cloud: return None return await _user_from_api_key(db, api_key) router = APIRouter(prefix="/cloud", tags=["cloud"], dependencies=[Depends(_cloud_api_key_gate)]) # Keys for storing cloud credentials in settings CLOUD_TOKEN_KEY = "bambu_cloud_token" CLOUD_EMAIL_KEY = "bambu_cloud_email" CLOUD_REGION_KEY = "bambu_cloud_region" def _normalise_region(region: str | None) -> str: """Treat NULL/empty as 'global' for legacy rows that predate the region column.""" return region if region in ("global", "china") else "global" async def get_stored_token(db: AsyncSession, user: User | None = None) -> tuple[str | None, str | None, str]: """Get stored cloud token, email, and region. When a user is provided (auth enabled), returns that user's per-user credentials. When user is None (auth disabled), falls back to global Settings table. Region defaults to ``"global"`` when unset (including for rows that predate the ``cloud_region`` column). """ if user is not None: return user.cloud_token, user.cloud_email, _normalise_region(user.cloud_region) # Fallback: global storage (auth disabled) result = await db.execute( select(Settings).where(Settings.key.in_([CLOUD_TOKEN_KEY, CLOUD_EMAIL_KEY, CLOUD_REGION_KEY])) ) settings = {s.key: s.value for s in result.scalars().all()} return ( settings.get(CLOUD_TOKEN_KEY), settings.get(CLOUD_EMAIL_KEY), _normalise_region(settings.get(CLOUD_REGION_KEY)), ) async def store_token(db: AsyncSession, token: str, email: str, region: str, user: User | None = None) -> None: """Store cloud token, email, and region. When a user is provided (auth enabled), stores on the user record. When user is None (auth disabled), stores in global Settings table. """ region = _normalise_region(region) if user is not None: # User object is from the auth dependency's session (detached), # so use a direct UPDATE via the route's db session. from sqlalchemy import update await db.execute( update(User).where(User.id == user.id).values(cloud_token=token, cloud_email=email, cloud_region=region) ) await db.commit() return # Fallback: global storage (auth disabled) for key, value in [(CLOUD_TOKEN_KEY, token), (CLOUD_EMAIL_KEY, email), (CLOUD_REGION_KEY, region)]: result = await db.execute(select(Settings).where(Settings.key == key)) setting = result.scalar_one_or_none() if setting: setting.value = value else: db.add(Settings(key=key, value=value)) await db.commit() async def clear_token(db: AsyncSession, user: User | None = None) -> None: """Clear stored cloud token, email, and region. When a user is provided (auth enabled), clears that user's credentials. When user is None (auth disabled), clears from global Settings table. """ if user is not None: from sqlalchemy import update await db.execute( update(User).where(User.id == user.id).values(cloud_token=None, cloud_email=None, cloud_region=None) ) await db.commit() return # Fallback: global storage (auth disabled) result = await db.execute( select(Settings).where(Settings.key.in_([CLOUD_TOKEN_KEY, CLOUD_EMAIL_KEY, CLOUD_REGION_KEY])) ) for setting in result.scalars().all(): await db.delete(setting) await db.commit() def _assert_api_key_can_access_cloud(api_key: APIKey) -> None: """Reject API keys that aren't authorised to read cloud data. Three independent fences for API keys (#1182): 1. user_id IS NOT NULL — legacy keys created before per-user ownership have no owner whose cloud_token we could read; force recreate. 2. can_access_cloud=True — opt-in scope so existing automation doesn't start reading cloud data without the operator explicitly enabling it. 3. owner has stored cloud_token — enforced separately at the route level via ``build_authenticated_cloud`` returning None. """ if api_key.user_id is None: raise HTTPException( status_code=401, detail=( "This API key was created before per-user cloud access was supported. " "Recreate it from Settings → API Keys to use /cloud/* endpoints." ), ) if not api_key.can_access_cloud: raise HTTPException( status_code=403, detail=( "This API key is not authorised to access Bambu Cloud data. " "Enable 'Allow cloud access' on the key in Settings → API Keys." ), ) async def build_authenticated_cloud(db: AsyncSession, user: User | None) -> BambuCloudService | None: """Build a per-request cloud service seeded with the caller's stored token + region. Returns ``None`` when no token is stored, so callers can 401 without constructing (and then closing) a useless client. Caller is responsible for ``await cloud.close()``. """ token, _email, region = await get_stored_token(db, user) if not token: return None cloud = BambuCloudService(region=region) cloud.set_token(token) return cloud @router.get("/status", response_model=CloudAuthStatus) async def get_auth_status( db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """Get current cloud authentication status. Reads the stored credentials in one DB round-trip (we used to call ``get_stored_token`` twice — once here and once inside ``build_authenticated_cloud``). ``region`` is exposed so the frontend can show "Connected (China)" after a reload without relying on local state. """ token, email, region = await get_stored_token(db, current_user) if not token: return CloudAuthStatus(is_authenticated=False, email=None, region=None) cloud = BambuCloudService(region=region) cloud.set_token(token) try: authenticated = cloud.is_authenticated return CloudAuthStatus( is_authenticated=authenticated, email=email if authenticated else None, region=region if authenticated else None, ) finally: await cloud.close() @router.post("/login", response_model=CloudLoginResponse) async def login( request: CloudLoginRequest, db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Initiate login to Bambu Cloud. This will trigger either: - Email verification: A code is sent to the user's email - TOTP verification: User enters code from their authenticator app After receiving/generating the code, call /cloud/verify to complete the login. For TOTP, include the tfa_key from this response in the verify request. """ cloud = BambuCloudService(region=request.region) try: result = await cloud.login_request(request.email, request.password) if result.get("success") and cloud.access_token: # Direct login succeeded (rare) await store_token(db, cloud.access_token, request.email, request.region, current_user) return CloudLoginResponse( success=result.get("success", False), needs_verification=result.get("needs_verification", False), message=result.get("message", "Unknown error"), verification_type=result.get("verification_type"), tfa_key=result.get("tfa_key"), ) except BambuCloudAuthError as e: raise HTTPException(status_code=401, detail=str(e)) except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.post("/verify", response_model=CloudLoginResponse) async def verify_code( request: CloudVerifyRequest, db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Complete login with verification code (email or TOTP). For email verification: - After calling /cloud/login, the user receives an email with a 6-digit code - Submit the code with email address For TOTP verification: - The user enters the 6-digit code from their authenticator app - Include the tfa_key from the /cloud/login response ``request.region`` must match the region used in /cloud/login so that the TOTP call hits the correct TFA endpoint (bambulab.com vs bambulab.cn). """ cloud = BambuCloudService(region=request.region) try: # Use TOTP verification if tfa_key is provided if request.tfa_key: result = await cloud.verify_totp(request.tfa_key, request.code) else: result = await cloud.verify_code(request.email, request.code) if result.get("success") and cloud.access_token: await store_token(db, cloud.access_token, request.email, request.region, current_user) return CloudLoginResponse( success=result.get("success", False), needs_verification=False, message=result.get("message", "Unknown error"), ) except BambuCloudAuthError as e: raise HTTPException(status_code=401, detail=str(e)) except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.post("/token", response_model=CloudAuthStatus) async def set_token( request: CloudTokenRequest, db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Set access token directly. For users who already have a token (e.g., from Bambu Studio). The selected ``region`` is persisted alongside the token so every subsequent request hits the right Bambu API endpoint, including after a restart. """ cloud = BambuCloudService(region=request.region) cloud.set_token(request.access_token) try: # Verify token works by trying to get profile await cloud.get_user_profile() await store_token(db, request.access_token, "token-auth", request.region, current_user) return CloudAuthStatus(is_authenticated=True, email="token-auth") except BambuCloudError: raise HTTPException(status_code=401, detail="Invalid token") finally: await cloud.close() @router.post("/logout") async def logout( db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """Log out of Bambu Cloud.""" await clear_token(db, current_user) return {"success": True} @router.get("/settings", response_model=SlicerSettingsResponse) async def get_slicer_settings( version: str = "02.04.00.70", db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Get all slicer settings (filament, printer, process presets). Requires authentication. """ cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: raise HTTPException(status_code=401, detail="Not authenticated") try: data = await cloud.get_slicer_settings(version) result = SlicerSettingsResponse() # Map API keys to our types (API uses 'print' for process presets) type_mapping = { "filament": "filament", "printer": "printer", "print": "process", # API calls it 'print', we call it 'process' } for api_key, our_type in type_mapping.items(): type_data = data.get(api_key, {}) private_settings = type_data.get("private", []) public_settings = type_data.get("public", []) parsed = [] # Private (custom) presets first for s in private_settings: parsed.append( SlicerSetting( setting_id=s.get("setting_id", s.get("id", "")), name=s.get("name", "Unknown"), type=our_type, version=s.get("version"), user_id=s.get("user_id"), updated_time=s.get("updated_time"), is_custom=True, ) ) # Public (default) presets for s in public_settings: parsed.append( SlicerSetting( setting_id=s.get("setting_id", s.get("id", "")), name=s.get("name", "Unknown"), type=our_type, version=s.get("version"), user_id=s.get("user_id"), updated_time=s.get("updated_time"), is_custom=False, ) ) setattr(result, our_type, parsed) return result except BambuCloudAuthError: await clear_token(db, current_user) raise HTTPException(status_code=401, detail="Authentication expired") except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.get("/settings/{setting_id}") async def get_setting_detail( setting_id: str, db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Get detailed information for a specific setting/preset. Returns the full preset configuration. """ cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: raise HTTPException(status_code=401, detail="Not authenticated") try: data = await cloud.get_setting_detail(setting_id) return data except BambuCloudAuthError: await clear_token(db, current_user) raise HTTPException(status_code=401, detail="Authentication expired") except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.get("/filaments", response_model=list[SlicerSetting]) async def get_filament_presets( version: str = "02.04.00.70", db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.FILAMENTS_READ), ): """ Get just filament presets (convenience endpoint). Returns all filament presets with custom presets first. Uses the same cache as get_slicer_settings. """ settings = await get_slicer_settings(version=version, db=db, current_user=current_user) return settings.filament # Cache for filament preset info (setting_id -> {name, k}) _filament_cache: dict[str, dict] = {} _filament_cache_time: float = 0 FILAMENT_CACHE_TTL = 300 # 5 minutes # Built-in filament ID → name mapping (fallback when cloud API and local profiles # don't have the entry). Based on Bambu Lab's known filament catalogue. _BUILTIN_FILAMENT_NAMES: dict[str, str] = { "GFA00": "Bambu PLA Basic", "GFA01": "Bambu PLA Matte", "GFA02": "Bambu PLA Metal", "GFA05": "Bambu PLA Silk", "GFA06": "Bambu PLA Silk+", "GFA07": "Bambu PLA Marble", "GFA08": "Bambu PLA Sparkle", "GFA09": "Bambu PLA Tough", "GFA11": "Bambu PLA Aero", "GFA12": "Bambu PLA Glow", "GFA13": "Bambu PLA Dynamic", "GFA15": "Bambu PLA Galaxy", "GFA16": "Bambu PLA Wood", "GFA50": "Bambu PLA-CF", "GFB00": "Bambu ABS", "GFB01": "Bambu ASA", "GFB02": "Bambu ASA-Aero", "GFB50": "Bambu ABS-GF", "GFB51": "Bambu ASA-CF", "GFB60": "PolyLite ABS", "GFB61": "PolyLite ASA", "GFB98": "Generic ASA", "GFB99": "Generic ABS", "GFC00": "Bambu PC", "GFC01": "Bambu PC FR", "GFC99": "Generic PC", "GFG00": "Bambu PETG Basic", "GFG01": "Bambu PETG Translucent", "GFG02": "Bambu PETG HF", "GFG50": "Bambu PETG-CF", "GFG60": "PolyLite PETG", "GFG96": "Generic PETG HF", "GFG97": "Generic PCTG", "GFG98": "Generic PETG-CF", "GFG99": "Generic PETG", "GFL00": "PolyLite PLA", "GFL01": "PolyTerra PLA", "GFL03": "eSUN PLA+", "GFL04": "Overture PLA", "GFL05": "Overture Matte PLA", "GFL06": "Fiberon PETG-ESD", "GFL50": "Fiberon PA6-CF", "GFL51": "Fiberon PA6-GF", "GFL52": "Fiberon PA12-CF", "GFL53": "Fiberon PA612-CF", "GFL54": "Fiberon PET-CF", "GFL55": "Fiberon PETG-rCF", "GFL95": "Generic PLA High Speed", "GFL96": "Generic PLA Silk", "GFL98": "Generic PLA-CF", "GFL99": "Generic PLA", "GFN03": "Bambu PA-CF", "GFN04": "Bambu PAHT-CF", "GFN05": "Bambu PA6-CF", "GFN06": "Bambu PPA-CF", "GFN08": "Bambu PA6-GF", "GFN96": "Generic PPA-GF", "GFN97": "Generic PPA-CF", "GFN98": "Generic PA-CF", "GFN99": "Generic PA", "GFP95": "Generic PP-GF", "GFP96": "Generic PP-CF", "GFP97": "Generic PP", "GFP98": "Generic PE-CF", "GFP99": "Generic PE", "GFR98": "Generic PHA", "GFR99": "Generic EVA", "GFS00": "Bambu Support W", "GFS01": "Bambu Support G", "GFS02": "Bambu Support For PLA", "GFS03": "Bambu Support For PA/PET", "GFS04": "Bambu PVA", "GFS05": "Bambu Support For PLA/PETG", "GFS06": "Bambu Support for ABS", "GFS97": "Generic BVOH", "GFS98": "Generic HIPS", "GFS99": "Generic PVA", "GFT01": "Bambu PET-CF", "GFT02": "Bambu PPS-CF", "GFT97": "Generic PPS", "GFT98": "Generic PPS-CF", "GFU00": "Bambu TPU 95A HF", "GFU01": "Bambu TPU 95A", "GFU02": "Bambu TPU for AMS", "GFU98": "Generic TPU for AMS", "GFU99": "Generic TPU", } async def _enrich_from_local_presets( unresolved_ids: list[str], result: dict, db: AsyncSession, ) -> dict: """Fall back to local profiles for filament IDs not resolved by cloud. Matches by checking the setting_id field inside the local preset's resolved JSON blob (stored in the 'setting' column). """ from sqlalchemy import text from backend.app.models.local_preset import LocalPreset # Build lookup: converted setting_id -> original filament_id id_map: dict[str, str] = {} for fid in unresolved_ids: converted = _filament_id_to_setting_id(fid) id_map[converted] = fid # Also map the original in case the JSON uses that form id_map[fid] = fid try: # Query filament presets that have a setting_id matching any of our IDs from backend.app.core.db_dialect import is_sqlite if is_sqlite(): json_filter = text("json_extract(setting, '$.setting_id') IS NOT NULL") else: json_filter = text("(setting::jsonb->>'setting_id') IS NOT NULL") candidates = await db.execute( select(LocalPreset).where( LocalPreset.preset_type == "filament", json_filter, ) ) for preset in candidates.scalars().all(): try: setting_data = json.loads(preset.setting) if isinstance(preset.setting, str) else preset.setting preset_setting_id = setting_data.get("setting_id", "") if preset_setting_id in id_map: original_id = id_map[preset_setting_id] info = {"name": preset.name, "k": None} # Try to extract K value from the local preset pa = setting_data.get("pressure_advance") if pa is not None: try: k_val = float(pa[0]) if isinstance(pa, list) else float(pa) info["k"] = k_val except (ValueError, TypeError, IndexError): pass _filament_cache[original_id] = info result[original_id] = info except Exception: continue except Exception as e: logger.warning("Failed to search local presets for filament info: %s", e) # Phase 4: Fall back to built-in filament name table for any still without a name for fid in unresolved_ids: if fid not in result or not result[fid].get("name"): name = _BUILTIN_FILAMENT_NAMES.get(fid, "") if name: # Preserve K value from earlier phases if available existing_k = result.get(fid, {}).get("k") info = {"name": name, "k": existing_k} _filament_cache[fid] = info result[fid] = info # Fill remaining unresolved with empty entries for fid in unresolved_ids: if fid not in result: _filament_cache[fid] = {"name": "", "k": None} result[fid] = {"name": "", "k": None} return result # _filament_id_to_setting_id is now imported from backend.app.utils.filament_ids _filament_id_to_setting_id = filament_id_to_setting_id @router.post("/filament-info") async def get_filament_info( setting_ids: list[str] = Body(...), db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.FILAMENTS_READ), ): """ Get filament preset info (name and K value) for multiple setting IDs. Used to enrich AMS tray and nozzle rack tooltips with preset data. Lookup order: cache → cloud → local profiles → built-in table → empty fallback. """ import time logger.info("get_filament_info called with %s IDs: %s", len(setting_ids), setting_ids) global _filament_cache, _filament_cache_time # Clear stale cache if time.time() - _filament_cache_time > FILAMENT_CACHE_TTL: _filament_cache = {} _filament_cache_time = time.time() result = {} unresolved_ids: list[str] = [] # Phase 1: Check cache for setting_id in setting_ids: if not setting_id: continue if setting_id in _filament_cache: result[setting_id] = _filament_cache[setting_id] else: unresolved_ids.append(setting_id) # Phase 2: Try cloud for uncached IDs if unresolved_ids: cloud = await build_authenticated_cloud(db, current_user) if cloud is not None and cloud.is_authenticated: try: still_unresolved: list[str] = [] for setting_id in unresolved_ids: try: api_setting_id = _filament_id_to_setting_id(setting_id) data = await cloud.get_setting_detail(api_setting_id) setting = data.get("setting", {}) name = data.get("name", "") k_value = setting.get("pressure_advance") if k_value is not None: try: k_value = float(k_value) except (ValueError, TypeError): k_value = None info = {"name": name, "k": k_value} _filament_cache[setting_id] = info result[setting_id] = info if not name: still_unresolved.append(setting_id) except Exception as e: logger.warning( f"Failed to get cloud preset {setting_id} " f"(API ID: {_filament_id_to_setting_id(setting_id)}): {e}" ) still_unresolved.append(setting_id) unresolved_ids = still_unresolved finally: await cloud.close() elif cloud is not None: await cloud.close() # Phase 3: Try local profiles for any IDs still without a name if unresolved_ids: result = await _enrich_from_local_presets(unresolved_ids, result, db) return result @router.get("/devices", response_model=list[CloudDevice]) async def get_devices( db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.PRINTERS_READ), ): """ Get list of bound printer devices. Returns printers registered to the user's Bambu account. """ cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: raise HTTPException(status_code=401, detail="Not authenticated") try: data = await cloud.get_devices() devices = data.get("devices", []) return [ CloudDevice( dev_id=d.get("dev_id", ""), name=d.get("name", "Unknown"), dev_model_name=d.get("dev_model_name"), dev_product_name=d.get("dev_product_name"), online=d.get("online", False), ) for d in devices ] except BambuCloudAuthError: await clear_token(db, current_user) raise HTTPException(status_code=401, detail="Authentication expired") except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.get("/firmware-updates", response_model=FirmwareUpdatesResponse) async def get_firmware_updates( db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.FIRMWARE_READ), ): """ Check for firmware updates for all bound devices. Returns firmware version info for each device including: - Current installed version - Latest available version - Whether an update is available - Release notes for the latest version Requires cloud authentication. """ cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: raise HTTPException(status_code=401, detail="Not authenticated") try: # First get list of bound devices devices_data = await cloud.get_devices() devices = devices_data.get("devices", []) updates = [] updates_available = 0 # Check firmware for each device for device in devices: device_id = device.get("dev_id", "") device_name = device.get("name", "Unknown") try: firmware_info = await cloud.get_firmware_version(device_id) update_available = firmware_info.get("update_available", False) if update_available: updates_available += 1 updates.append( FirmwareUpdateInfo( device_id=device_id, device_name=device_name, current_version=firmware_info.get("current_version"), latest_version=firmware_info.get("latest_version"), update_available=update_available, release_notes=firmware_info.get("release_notes"), ) ) except BambuCloudError as e: logger.warning("Failed to get firmware info for %s: %s", device_name, e) # Still include device but with unknown firmware status updates.append( FirmwareUpdateInfo( device_id=device_id, device_name=device_name, current_version=None, latest_version=None, update_available=False, release_notes=None, ) ) return FirmwareUpdatesResponse(updates=updates, updates_available=updates_available) except BambuCloudAuthError: await clear_token(db, current_user) raise HTTPException(status_code=401, detail="Authentication expired") except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.post("/settings") async def create_setting( request: SlicerSettingCreate, db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Create a new slicer preset/setting. Creates a new preset on Bambu Cloud. The preset inherits from a base preset and only stores the delta (modified values). Type should be: 'filament', 'print', or 'printer' """ cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: raise HTTPException(status_code=401, detail="Not authenticated") try: data = await cloud.create_setting( preset_type=request.type, name=request.name, base_id=request.base_id, setting=request.setting, version=request.version, ) return data except BambuCloudAuthError: await clear_token(db, current_user) raise HTTPException(status_code=401, detail="Authentication expired") except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.put("/settings/{setting_id}") async def update_setting( setting_id: str, request: SlicerSettingUpdate, db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Update an existing slicer preset/setting. Updates the preset's name and/or settings on Bambu Cloud. """ cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: raise HTTPException(status_code=401, detail="Not authenticated") try: data = await cloud.update_setting( setting_id=setting_id, name=request.name, setting=request.setting, ) return data except BambuCloudAuthError: await clear_token(db, current_user) raise HTTPException(status_code=401, detail="Authentication expired") except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() @router.delete("/settings/{setting_id}", response_model=SlicerSettingDeleteResponse) async def delete_setting( setting_id: str, db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.CLOUD_AUTH), ): """ Delete a slicer preset/setting. Removes the preset from Bambu Cloud. This cannot be undone. """ cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: raise HTTPException(status_code=401, detail="Not authenticated") try: result = await cloud.delete_setting(setting_id) return SlicerSettingDeleteResponse( success=result.get("success", True), message=result.get("message", "Setting deleted"), ) except BambuCloudAuthError: await clear_token(db, current_user) raise HTTPException(status_code=401, detail="Authentication expired") except BambuCloudError as e: raise HTTPException(status_code=500, detail=str(e)) finally: await cloud.close() # Path to field definition files FIELDS_DATA_DIR = Path(__file__).parent.parent.parent / "data" # Cache for field definitions (loaded once) _fields_cache: dict[str, dict] = {} def _load_fields(preset_type: str) -> dict: """Load field definitions from JSON file.""" if preset_type in _fields_cache: return _fields_cache[preset_type] # Map API type names to file names file_map = { "filament": "filament_fields.json", "print": "process_fields.json", "process": "process_fields.json", "printer": "printer_fields.json", } filename = file_map.get(preset_type) if not filename: raise HTTPException(status_code=400, detail=f"Unknown preset type: {preset_type}") file_path = FIELDS_DATA_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404, detail=f"Field definitions not found for: {preset_type}") with open(file_path) as f: data = json.load(f) _fields_cache[preset_type] = data return data @router.get("/builtin-filaments") async def get_builtin_filaments( _: User | None = RequirePermissionIfAuthEnabled(Permission.FILAMENTS_READ), ): """ Get built-in filament names as a fallback source. Returns the static _BUILTIN_FILAMENT_NAMES table as a list of {filament_id, name} objects. Used by the frontend when cloud and local profiles are unavailable. """ return [{"filament_id": fid, "name": name} for fid, name in _BUILTIN_FILAMENT_NAMES.items()] # Cache for filament_id → name mapping (resolved from cloud preset details) _filament_id_name_cache: dict[str, str] = {} _filament_id_name_cache_time: float = 0 @router.get("/filament-id-map") async def get_filament_id_map( db: AsyncSession = Depends(get_db), current_user: User | None = cloud_caller(Permission.FILAMENTS_READ), ): """ Get filament_id → name mapping for user cloud presets. K-profiles store a filament_id (e.g., "P4d64437") which is different from the cloud preset setting_id (e.g., "PFUS9ac902733670a9"). This endpoint fetches details for all custom presets and returns the mapping. Cached for 5 minutes. """ import time global _filament_id_name_cache, _filament_id_name_cache_time if _filament_id_name_cache and time.time() - _filament_id_name_cache_time < FILAMENT_CACHE_TTL: return _filament_id_name_cache cloud = await build_authenticated_cloud(db, current_user) if cloud is None or not cloud.is_authenticated: if cloud is not None: await cloud.close() return _filament_id_name_cache or {} try: data = await cloud.get_slicer_settings() custom_presets = data.get("filament", {}).get("private", []) result: dict[str, str] = {} for preset in custom_presets: setting_id = preset.get("setting_id", "") if not setting_id: continue try: detail = await cloud.get_setting_detail(setting_id) fid = detail.get("filament_id", "") name = detail.get("name", "") if fid and name: # Strip printer/nozzle suffix: "Devil Design PLA Basic @Bambu Lab H2D 0.4 nozzle" → "Devil Design PLA Basic" clean_name = name.split(" @")[0].strip() if " @" in name else name result[fid] = clean_name except Exception: pass _filament_id_name_cache = result _filament_id_name_cache_time = time.time() return result except Exception: return _filament_id_name_cache or {} finally: await cloud.close() @router.get("/fields/{preset_type}") async def get_preset_fields( preset_type: Literal["filament", "print", "process", "printer"], _: User | None = RequirePermissionIfAuthEnabled(Permission.CLOUD_AUTH), ): """ Get field definitions for a preset type. Returns a list of field definitions including: - key: The setting key name - label: Human-readable label - type: Field type (text, number, boolean, select) - category: Grouping category - description: Field description - options: For select fields, available options - unit: Unit of measurement (if applicable) - min/max/step: For number fields, validation constraints """ data = _load_fields(preset_type) return data @router.get("/fields") async def get_all_preset_fields( _: User | None = RequirePermissionIfAuthEnabled(Permission.CLOUD_AUTH), ): """ Get all field definitions for all preset types. Returns field definitions organized by type. """ return { "filament": _load_fields("filament"), "process": _load_fields("process"), "printer": _load_fields("printer"), }