| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- """Service for importing and resolving OrcaSlicer profiles.
- Handles:
- - Parsing .json, .orca_filament, .zip exports
- - Fetching base Bambu profiles from OrcaSlicer GitHub for inheritance resolution
- - Caching base profiles in the database with TTL
- - Extracting core fields for quick access
- """
- import io
- import json
- import logging
- import zipfile
- from datetime import datetime, timedelta, timezone
- import httpx
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.local_preset import LocalPreset
- from backend.app.models.orca_base_cache import OrcaBaseProfile
- logger = logging.getLogger(__name__)
- ORCA_BASE_URL = "https://raw.githubusercontent.com/SoftFever/OrcaSlicer/main/resources/profiles/BBL"
- CACHE_TTL_DAYS = 7
- MAX_INHERITANCE_DEPTH = 10
- async def get_cached_base_profile(name: str, db: AsyncSession) -> dict | None:
- """Get a base profile from cache if still fresh."""
- result = await db.execute(select(OrcaBaseProfile).where(OrcaBaseProfile.name == name))
- profile = result.scalar_one_or_none()
- if not profile:
- return None
- # Check TTL
- cutoff = datetime.now(timezone.utc) - timedelta(days=CACHE_TTL_DAYS)
- fetched = profile.fetched_at
- if fetched.tzinfo is None:
- fetched = fetched.replace(tzinfo=timezone.utc)
- if fetched < cutoff:
- return None
- try:
- return json.loads(profile.setting)
- except Exception:
- return None
- async def fetch_and_cache_base_profile(name: str, profile_type: str, db: AsyncSession) -> dict | None:
- """Fetch a base profile from OrcaSlicer GitHub and cache it."""
- # Check cache first
- cached = await get_cached_base_profile(name, db)
- if cached is not None:
- return cached
- # Map profile_type to GitHub subdirectory
- type_dirs = {
- "filament": "filament",
- "machine": "machine",
- "printer": "machine",
- "process": "process",
- }
- subdir = type_dirs.get(profile_type, "filament")
- # Try fetching from GitHub
- urls_to_try = [
- f"{ORCA_BASE_URL}/{subdir}/{name}.json",
- ]
- # Also try filament dir as fallback for any type
- if subdir != "filament":
- urls_to_try.append(f"{ORCA_BASE_URL}/filament/{name}.json")
- data = None
- async with httpx.AsyncClient(timeout=15.0) as client:
- for url in urls_to_try:
- try:
- resp = await client.get(url)
- if resp.status_code == 200:
- data = resp.json()
- break
- except Exception as e:
- logger.debug("Failed to fetch %s: %s", url, e)
- if data is None:
- logger.warning("Could not fetch base profile '%s' from GitHub", name)
- return None
- # Cache in DB
- setting_json = json.dumps(data)
- result = await db.execute(select(OrcaBaseProfile).where(OrcaBaseProfile.name == name))
- existing = result.scalar_one_or_none()
- if existing:
- existing.setting = setting_json
- existing.profile_type = profile_type
- existing.fetched_at = datetime.now(timezone.utc)
- else:
- cache_entry = OrcaBaseProfile(
- name=name,
- profile_type=profile_type,
- setting=setting_json,
- fetched_at=datetime.now(timezone.utc),
- )
- db.add(cache_entry)
- return data
- async def resolve_preset(preset_data: dict, profile_type: str, db: AsyncSession, depth: int = 0) -> dict:
- """Recursively resolve inheritance chain, merging parent into child.
- OrcaSlicer uses shallow merge: child keys fully replace parent keys.
- """
- if depth >= MAX_INHERITANCE_DEPTH:
- logger.warning("Inheritance depth limit reached for preset")
- return preset_data
- inherits = preset_data.get("inherits")
- if not inherits:
- return preset_data
- # Fetch the base profile
- base = await fetch_and_cache_base_profile(inherits, profile_type, db)
- if base is None:
- logger.warning("Cannot resolve inherits='%s' — base profile not found", inherits)
- return preset_data
- # Recursively resolve the base first
- resolved_base = await resolve_preset(base, profile_type, db, depth + 1)
- # Shallow merge: start with base, override with child
- merged = {**resolved_base, **preset_data}
- return merged
- def extract_core_fields(data: dict) -> dict:
- """Extract commonly needed fields from a resolved preset for quick access."""
- fields: dict = {}
- # filament_type — often a single-element array like ["PLA"]
- ft = data.get("filament_type")
- if isinstance(ft, list) and ft:
- fields["filament_type"] = str(ft[0])
- elif isinstance(ft, str):
- fields["filament_type"] = ft
- # filament_vendor
- fv = data.get("filament_vendor")
- if isinstance(fv, list) and fv:
- fields["filament_vendor"] = str(fv[0])
- elif isinstance(fv, str):
- fields["filament_vendor"] = fv
- # nozzle_temp_min / max — from nozzle_temperature array or range fields
- nozzle_temp = data.get("nozzle_temperature")
- if isinstance(nozzle_temp, list) and nozzle_temp:
- try:
- temps = [int(t) for t in nozzle_temp if str(t).isdigit()]
- if temps:
- fields["nozzle_temp_min"] = min(temps)
- fields["nozzle_temp_max"] = max(temps)
- except (ValueError, TypeError):
- pass
- # Override with explicit range fields if present
- range_low = data.get("nozzle_temperature_range_low")
- range_high = data.get("nozzle_temperature_range_high")
- if isinstance(range_low, list) and range_low:
- try:
- fields["nozzle_temp_min"] = int(range_low[0])
- except (ValueError, TypeError):
- pass
- if isinstance(range_high, list) and range_high:
- try:
- fields["nozzle_temp_max"] = int(range_high[0])
- except (ValueError, TypeError):
- pass
- # pressure_advance — store as JSON string if it's an array
- pa = data.get("pressure_advance")
- if pa is not None:
- fields["pressure_advance"] = json.dumps(pa) if isinstance(pa, list) else str(pa)
- # default_filament_colour
- colour = data.get("default_filament_colour")
- if colour is not None:
- fields["default_filament_colour"] = json.dumps(colour) if isinstance(colour, list) else str(colour)
- # filament_cost
- cost = data.get("filament_cost")
- if isinstance(cost, list) and cost:
- fields["filament_cost"] = str(cost[0])
- elif cost is not None:
- fields["filament_cost"] = str(cost)
- # filament_density
- density = data.get("filament_density")
- if isinstance(density, list) and density:
- fields["filament_density"] = str(density[0])
- elif density is not None:
- fields["filament_density"] = str(density)
- # compatible_printers
- compat = data.get("compatible_printers")
- if isinstance(compat, list):
- fields["compatible_printers"] = json.dumps(compat)
- return fields
- MATERIAL_TYPES = [
- "PLA",
- "ABS",
- "ASA",
- "PETG",
- "TPU",
- "PA",
- "PC",
- "PVA",
- "HIPS",
- "PET",
- "PP",
- "PEI",
- "PEEK",
- "PCTG",
- "PPA",
- "POM",
- ]
- def _parse_material_from_name(name: str) -> str | None:
- """Extract filament material type from preset name, e.g. 'Overture PLA Matte' -> 'PLA'."""
- import re
- upper = name.upper()
- for mat in MATERIAL_TYPES:
- if re.search(rf"\b{mat}\b", upper):
- return mat
- return None
- def _parse_vendor_from_name(name: str) -> str | None:
- """Extract vendor from preset name, e.g. 'Overture PLA Matte @BBL X1C' -> 'Overture'."""
- import re
- # Strip @printer suffix
- clean = re.sub(r"@.+$", "", name).strip()
- upper = clean.upper()
- for mat in MATERIAL_TYPES:
- idx = upper.find(mat)
- if idx > 0:
- vendor = clean[:idx].strip()
- if vendor and len(vendor) > 1:
- return vendor
- return None
- def _type_from_path(zip_entry: str) -> str | None:
- """Infer profile type from the ZIP directory path."""
- parts = zip_entry.lower().replace("\\", "/").split("/")
- for part in parts:
- if part in ("filament",):
- return "filament"
- if part in ("machine", "printer"):
- return "printer"
- if part in ("process", "print"):
- return "process"
- return None
- def _guess_profile_type(data: dict, path_hint: str | None = None) -> str:
- """Determine the profile type from JSON data and optional ZIP path hint."""
- import re
- # 1. Explicit "type" field set by OrcaSlicer
- explicit = data.get("type", "").lower()
- if explicit in ("filament",):
- return "filament"
- if explicit in ("machine", "printer"):
- return "printer"
- if explicit in ("process", "print"):
- return "process"
- # 2. ZIP directory path hint (e.g. "filament/MyPreset.json")
- if path_hint:
- from_path = _type_from_path(path_hint)
- if from_path:
- return from_path
- # 3. Strong ID-based heuristics — *_settings_id is definitive
- if "print_settings_id" in data:
- return "process"
- if "filament_settings_id" in data:
- return "filament"
- if "printer_settings_id" in data:
- return "printer"
- # 4. Content-based heuristics — check process BEFORE filament because
- # resolved process presets can inherit filament_type from their base
- process_keys = {
- "layer_height",
- "first_layer_height",
- "wall_loops",
- "prime_tower_width",
- "prime_tower_max_speed",
- "prime_tower_rib_wall",
- "outer_wall_speed",
- "inner_wall_speed",
- "interlocking_depth",
- "bottom_shell_layers",
- "top_shell_layers",
- "sparse_infill_density",
- }
- if process_keys & data.keys():
- return "process"
- if "machine_max_speed_x" in data or "printer_model" in data or "bed_shape" in data:
- return "printer"
- if "filament_type" in data or "filament_vendor" in data:
- return "filament"
- # 5. Name-based heuristics as last resort
- name = data.get("name", "")
- if re.search(r"\d+\.\d+mm\s", name):
- return "process"
- if name.lower().endswith("process"):
- return "process"
- return "filament"
- async def import_orca_file(filename: str, content: bytes, db: AsyncSession) -> dict:
- """Import presets from a file (.json, .orca_filament, .bbscfg, .bbsflmt, .zip).
- Returns dict with keys: success, imported, skipped, errors.
- """
- imported = 0
- skipped = 0
- errors: list[str] = []
- # Determine file type
- lower_name = filename.lower()
- if lower_name.endswith(".json"):
- # Single JSON preset
- try:
- data = json.loads(content)
- result = await _import_single_preset(data, db, path_hint=filename)
- if result == "imported":
- imported += 1
- elif result == "skipped":
- skipped += 1
- else:
- errors.append(result)
- except json.JSONDecodeError as e:
- errors.append(f"Invalid JSON: {e}")
- elif lower_name.endswith((".orca_filament", ".zip", ".bbscfg", ".bbsflmt")):
- # ZIP archive — extract and parse each JSON
- try:
- with zipfile.ZipFile(io.BytesIO(content)) as zf:
- for entry in zf.namelist():
- if entry.endswith(".json") and "bundle_structure" not in entry:
- try:
- raw = zf.read(entry)
- data = json.loads(raw)
- result = await _import_single_preset(data, db, path_hint=entry)
- if result == "imported":
- imported += 1
- elif result == "skipped":
- skipped += 1
- else:
- errors.append(f"{entry}: {result}")
- except json.JSONDecodeError:
- errors.append(f"{entry}: Invalid JSON")
- except Exception as e:
- errors.append(f"{entry}: {e}")
- except zipfile.BadZipFile:
- errors.append("Invalid ZIP/orca_filament archive")
- else:
- errors.append(f"Unsupported file type: {filename}")
- return {
- "success": imported > 0 or (imported == 0 and skipped > 0 and not errors),
- "imported": imported,
- "skipped": skipped,
- "errors": errors,
- }
- async def _import_single_preset(data: dict, db: AsyncSession, path_hint: str | None = None) -> str:
- """Import a single preset dict. Returns 'imported', 'skipped', or error string."""
- name = data.get("name")
- if not name:
- return "Preset has no name"
- # Check for duplicate by name
- result = await db.execute(select(LocalPreset).where(LocalPreset.name == name))
- if result.scalar_one_or_none():
- return "skipped"
- profile_type = _guess_profile_type(data, path_hint)
- inherits_value = data.get("inherits")
- # Resolve inheritance
- try:
- resolved = await resolve_preset(data, profile_type, db)
- except Exception as e:
- logger.warning("Failed to resolve inheritance for '%s': %s", name, e)
- resolved = data
- # Extract core fields
- core = extract_core_fields(resolved)
- # Fallback: parse material/vendor from preset name if not found in data
- filament_type = core.get("filament_type") or _parse_material_from_name(name)
- filament_vendor = core.get("filament_vendor") or _parse_vendor_from_name(name)
- preset = LocalPreset(
- name=name,
- preset_type=profile_type,
- source="orcaslicer",
- filament_type=filament_type,
- filament_vendor=filament_vendor,
- nozzle_temp_min=core.get("nozzle_temp_min"),
- nozzle_temp_max=core.get("nozzle_temp_max"),
- pressure_advance=core.get("pressure_advance"),
- default_filament_colour=core.get("default_filament_colour"),
- filament_cost=core.get("filament_cost"),
- filament_density=core.get("filament_density"),
- compatible_printers=core.get("compatible_printers"),
- setting=json.dumps(resolved),
- inherits=inherits_value,
- version=data.get("version"),
- )
- db.add(preset)
- return "imported"
- async def refresh_base_cache(db: AsyncSession) -> dict:
- """Force refresh all cached base profiles."""
- result = await db.execute(select(OrcaBaseProfile))
- profiles = result.scalars().all()
- refreshed = 0
- failed = 0
- for profile in profiles:
- # Clear fetched_at to force re-fetch
- try:
- profile.fetched_at = datetime.min
- data = await fetch_and_cache_base_profile(profile.name, profile.profile_type, db)
- if data:
- refreshed += 1
- else:
- failed += 1
- except Exception:
- failed += 1
- return {"refreshed": refreshed, "failed": failed, "total": len(profiles)}
- async def get_cache_status(db: AsyncSession) -> dict:
- """Get the status of the base profile cache."""
- result = await db.execute(select(OrcaBaseProfile))
- profiles = result.scalars().all()
- cutoff = datetime.now(timezone.utc) - timedelta(days=CACHE_TTL_DAYS)
- fresh = 0
- stale = 0
- for p in profiles:
- fetched = p.fetched_at
- if fetched.tzinfo is None:
- fetched = fetched.replace(tzinfo=timezone.utc)
- if fetched >= cutoff:
- fresh += 1
- else:
- stale += 1
- return {
- "total": len(profiles),
- "fresh": fresh,
- "stale": stale,
- "ttl_days": CACHE_TTL_DAYS,
- }
- async def reclassify_presets(db: AsyncSession) -> dict:
- """Re-evaluate preset_type for all local presets using the improved heuristic."""
- result = await db.execute(select(LocalPreset))
- presets = result.scalars().all()
- reclassified = 0
- for preset in presets:
- try:
- data = json.loads(preset.setting)
- except Exception:
- continue
- new_type = _guess_profile_type(data)
- if new_type != preset.preset_type:
- logger.info(
- "Reclassifying '%s' from '%s' to '%s'",
- preset.name,
- preset.preset_type,
- new_type,
- )
- preset.preset_type = new_type
- reclassified += 1
- return {"total": len(presets), "reclassified": reclassified}
|