| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313 |
- import io
- import logging
- import os
- import zipfile
- from datetime import datetime
- from pathlib import Path
- from fastapi import APIRouter, Depends, File, UploadFile
- from fastapi.responses import FileResponse, JSONResponse
- from pydantic import BaseModel, Field
- from sqlalchemy import delete, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.auth import RequirePermissionIfAuthEnabled, caller_is_api_key, require_energy_cost_update
- from backend.app.core.config import settings as app_settings
- from backend.app.core.database import get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- from backend.app.schemas.settings import AppSettings, AppSettingsUpdate
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/settings", tags=["settings"])
- DEFAULT_SETTINGS = AppSettings()
- # Sensitive credential fields blanked for API-key callers
- _SENSITIVE_FIELDS_FOR_API_KEY = (
- "mqtt_password",
- "ha_token",
- "prometheus_token",
- "virtual_printer_access_code",
- "ldap_bind_password",
- )
- def _sqlalchemy_type_to_sqlite_type(type_repr: str) -> str:
- """Map a SQLAlchemy column type's ``str()`` to a SQLite-native column type.
- Used by ``create_backup_zip`` to reconstruct a portable SQLite database
- file from PostgreSQL data. Falling through to TEXT for binary columns
- corrupts non-UTF8 bytes — the BLOB branch is the #1333 regression guard
- for OIDC icon BLOBs.
- Extracted as a pure helper so it can be unit-tested without spinning up
- the full FastAPI app + backup pipeline.
- """
- type_str = type_repr.upper()
- if "INT" in type_str:
- return "INTEGER"
- if "FLOAT" in type_str or "REAL" in type_str or "NUMERIC" in type_str:
- return "REAL"
- if "BOOL" in type_str:
- return "BOOLEAN"
- if "BLOB" in type_str or "BYTEA" in type_str or "BINARY" in type_str:
- # OIDC icon BLOB column (#1333) — without this branch the column
- # was created as TEXT and non-UTF8 bytes were corrupted during the
- # PG→SQLite-ZIP backup round trip.
- return "BLOB"
- return "TEXT"
- async def get_setting(db: AsyncSession, key: str) -> str | None:
- """Get a single setting value by key."""
- result = await db.execute(select(Settings).where(Settings.key == key))
- setting = result.scalar_one_or_none()
- return setting.value if setting else None
- async def get_external_login_url(db: AsyncSession) -> str:
- """Get the external URL for the login page.
- Uses external_url from settings if available, otherwise falls back to APP_URL env var.
- Args:
- db: Database session
- Returns:
- Full URL to the login page
- """
- import os
- external_url = await get_setting(db, "external_url")
- if external_url:
- external_url = external_url.rstrip("/")
- else:
- external_url = os.environ.get("APP_URL", "http://localhost:5173")
- return external_url + "/login"
- async def set_setting(db: AsyncSession, key: str, value: str) -> None:
- """Set a single setting value."""
- from backend.app.core.db_dialect import upsert_setting
- await upsert_setting(db, Settings, key, value)
- async def _build_settings_response(db: AsyncSession, is_api_key: bool = False) -> AppSettings:
- """Build the full settings response, scrubbing secrets for API-key callers."""
- settings_dict = DEFAULT_SETTINGS.model_dump()
- result = await db.execute(select(Settings))
- for setting in result.scalars().all():
- if setting.key not in settings_dict:
- continue
- if setting.key in [
- "auto_archive",
- "save_thumbnails",
- "capture_finish_photo",
- "spoolman_enabled",
- "spoolman_disable_weight_sync",
- "spoolman_report_partial_usage",
- "disable_filament_warnings",
- "prefer_lowest_filament",
- "check_updates",
- "check_printer_firmware",
- "include_beta_updates",
- "virtual_printer_enabled",
- "ftp_retry_enabled",
- "mqtt_enabled",
- "mqtt_use_tls",
- "ha_enabled",
- "per_printer_mapping_expanded",
- "prometheus_enabled",
- "user_notifications_enabled",
- "queue_drying_enabled",
- "queue_drying_block",
- "ambient_drying_enabled",
- "require_plate_clear",
- "queue_shortest_first",
- "default_bed_levelling",
- "default_flow_cali",
- "default_vibration_cali",
- "default_layer_inspect",
- "default_timelapse",
- "ldap_enabled",
- "ldap_auto_provision",
- ]:
- settings_dict[setting.key] = setting.value.lower() == "true"
- elif setting.key in [
- "default_filament_cost",
- "energy_cost_per_kwh",
- "ams_temp_good",
- "ams_temp_fair",
- "library_disk_warning_gb",
- "low_stock_threshold",
- ]:
- settings_dict[setting.key] = float(setting.value)
- elif setting.key in [
- "ams_humidity_good",
- "ams_humidity_fair",
- "ams_history_retention_days",
- "ftp_retry_count",
- "ftp_retry_delay",
- "ftp_timeout",
- "mqtt_port",
- "stagger_group_size",
- "stagger_interval_minutes",
- "forecast_global_lead_time_days",
- ]:
- settings_dict[setting.key] = int(setting.value)
- elif setting.key == "default_printer_id":
- settings_dict[setting.key] = int(setting.value) if setting.value and setting.value != "None" else None
- else:
- settings_dict[setting.key] = setting.value
- ha_settings = await get_homeassistant_settings(db)
- settings_dict.update(ha_settings)
- # ldap_bind_password is never returned to any caller
- settings_dict["ldap_bind_password"] = ""
- if is_api_key:
- for field in _SENSITIVE_FIELDS_FOR_API_KEY:
- if field in settings_dict:
- settings_dict[field] = ""
- return AppSettings(**settings_dict)
- @router.get("", response_model=AppSettings)
- @router.get("/", response_model=AppSettings)
- async def get_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- _is_api_key: bool = Depends(caller_is_api_key),
- ):
- """Get all application settings."""
- return await _build_settings_response(db, is_api_key=_is_api_key)
- @router.put("/", response_model=AppSettings)
- async def update_settings(
- settings_update: AppSettingsUpdate,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Update application settings."""
- update_data = settings_update.model_dump(exclude_unset=True)
- # Check if any MQTT settings are being updated
- mqtt_keys = {
- "mqtt_enabled",
- "mqtt_broker",
- "mqtt_port",
- "mqtt_username",
- "mqtt_password",
- "mqtt_topic_prefix",
- "mqtt_use_tls",
- }
- mqtt_updated = bool(mqtt_keys & set(update_data.keys()))
- for key, value in update_data.items():
- # Convert value to string for storage
- if isinstance(value, bool):
- str_value = "true" if value else "false"
- elif value is None:
- str_value = "None"
- else:
- str_value = str(value)
- await set_setting(db, key, str_value)
- await db.commit()
- # Expire all objects to ensure fresh reads after commit
- db.expire_all()
- # Reconfigure MQTT relay if any MQTT settings changed
- if mqtt_updated:
- try:
- from backend.app.services.mqtt_relay import mqtt_relay
- mqtt_settings = {
- "mqtt_enabled": (await get_setting(db, "mqtt_enabled") or "false") == "true",
- "mqtt_broker": await get_setting(db, "mqtt_broker") or "",
- "mqtt_port": int(await get_setting(db, "mqtt_port") or "1883"),
- "mqtt_username": await get_setting(db, "mqtt_username") or "",
- "mqtt_password": await get_setting(db, "mqtt_password") or "",
- "mqtt_topic_prefix": await get_setting(db, "mqtt_topic_prefix") or "bambuddy",
- "mqtt_use_tls": (await get_setting(db, "mqtt_use_tls") or "false") == "true",
- }
- await mqtt_relay.configure(mqtt_settings)
- except Exception:
- pass # Don't fail the settings update if MQTT reconfiguration fails
- # Return updated settings (never scrub secrets on PUT — caller has SETTINGS_UPDATE permission)
- return await _build_settings_response(db, is_api_key=False)
- @router.patch("/", response_model=AppSettings)
- @router.patch("", response_model=AppSettings)
- async def patch_settings(
- settings_update: AppSettingsUpdate,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Partially update application settings (same as PUT, for REST compatibility)."""
- return await update_settings(settings_update, db, _)
- class ElectricityPriceUpdate(BaseModel):
- """Payload for ``POST /settings/electricity-price`` (#1356).
- Mirrors the field name documented in ``wiki/features/energy.md`` so the
- Home Assistant ``rest_command`` example needs only a URL change, not a
- payload change. Plain non-negative float; tariffs can go as low as 0.0 in
- some markets (e.g. free hours).
- """
- energy_cost_per_kwh: float = Field(ge=0)
- @router.post("/electricity-price", response_model=AppSettings)
- async def update_electricity_price(
- payload: ElectricityPriceUpdate,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_energy_cost_update()),
- _is_api_key: bool = Depends(caller_is_api_key),
- ):
- """Update the per-kWh electricity cost used by the energy-tracking pipeline.
- This is the only settings field writable via API key, gated by the
- ``can_update_energy_cost`` toggle on the key. JWT users still need the
- standard ``SETTINGS_UPDATE`` permission. See #1356 for the rationale —
- the general ``PATCH /settings`` route remains denied for API keys because
- it can rewrite SMTP/LDAP/MQTT credentials, which is a much wider surface
- than the documented dynamic-tariff use case requires.
- """
- await set_setting(db, "energy_cost_per_kwh", str(payload.energy_cost_per_kwh))
- await db.commit()
- db.expire_all()
- return await _build_settings_response(db, is_api_key=_is_api_key)
- @router.post("/reset", response_model=AppSettings)
- async def reset_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Reset all settings to defaults."""
- # Delete all settings
- result = await db.execute(select(Settings))
- for setting in result.scalars().all():
- await db.delete(setting)
- await db.commit()
- return DEFAULT_SETTINGS
- @router.get("/default-sidebar-order")
- async def get_default_sidebar_order(
- db: AsyncSession = Depends(get_db),
- ):
- """Get the admin-set default sidebar order.
- Intentionally unauthenticated: non-admin users need to read this value to apply
- the default sidebar order, but may lack SETTINGS_READ permission.
- The value is non-sensitive (sidebar item IDs only).
- """
- value = await get_setting(db, "default_sidebar_order")
- return {"default_sidebar_order": value or ""}
- # Fields exposed via /ui-preferences without SETTINGS_READ. Each entry MUST be
- # non-sensitive (no credentials, no PII, no secret tokens) — granting SETTINGS_READ
- # also grants visibility of SMTP/LDAP/MQTT passwords and similar, so the goal of
- # this endpoint is exactly to NOT require that permission for UI rendering hints.
- # When adding a field here, confirm it doesn't carry anything sensitive.
- _UI_PREFERENCE_FIELDS: tuple[str, ...] = (
- "require_plate_clear",
- "check_printer_firmware",
- "camera_view_mode",
- "time_format",
- "date_format",
- "drying_presets",
- "ams_humidity_good",
- "ams_humidity_fair",
- "ams_temp_good",
- "ams_temp_fair",
- "bed_cooled_threshold",
- )
- @router.get("/ui-preferences")
- async def get_ui_preferences(db: AsyncSession = Depends(get_db)):
- """Get the curated subset of settings that any page needs to render correctly.
- Intentionally not gated on SETTINGS_READ — every authenticated user (and
- every page that loads for them) needs these fields, but granting SETTINGS_READ
- would also grant visibility of secrets (SMTP/LDAP/MQTT credentials, etc.).
- Same pattern as /default-sidebar-order (#1293).
- Reuses _build_settings_response so the typed values match what /settings
- returns for fields with the same name — bool/int/float/str types stay in
- sync without a separate type-coercion path.
- """
- full = await _build_settings_response(db, is_api_key=False)
- dumped = full.model_dump()
- return {key: dumped[key] for key in _UI_PREFERENCE_FIELDS if key in dumped}
- @router.get("/check-ffmpeg")
- async def check_ffmpeg():
- """Check if ffmpeg is installed and available."""
- from backend.app.services.camera import get_ffmpeg_path
- ffmpeg_path = get_ffmpeg_path()
- return {
- "installed": ffmpeg_path is not None,
- "path": ffmpeg_path,
- }
- @router.get("/spoolman")
- async def get_spoolman_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get Spoolman integration settings."""
- spoolman_enabled = await get_setting(db, "spoolman_enabled") or "false"
- spoolman_url = await get_setting(db, "spoolman_url") or ""
- spoolman_sync_mode = await get_setting(db, "spoolman_sync_mode") or "auto"
- spoolman_disable_weight_sync = await get_setting(db, "spoolman_disable_weight_sync") or "false"
- spoolman_report_partial_usage = await get_setting(db, "spoolman_report_partial_usage") or "true"
- return {
- "spoolman_enabled": spoolman_enabled,
- "spoolman_url": spoolman_url,
- "spoolman_sync_mode": spoolman_sync_mode,
- "spoolman_disable_weight_sync": spoolman_disable_weight_sync,
- "spoolman_report_partial_usage": spoolman_report_partial_usage,
- }
- @router.put("/spoolman")
- async def update_spoolman_settings(
- settings: dict,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Update Spoolman integration settings."""
- if "spoolman_enabled" in settings:
- old_val = await get_setting(db, "spoolman_enabled") or "false"
- new_val = settings["spoolman_enabled"]
- await set_setting(db, "spoolman_enabled", new_val)
- # Switching to Spoolman: clear built-in inventory slot assignments
- if old_val.lower() != "true" and new_val.lower() == "true":
- from backend.app.models.spool_assignment import SpoolAssignment
- result = await db.execute(delete(SpoolAssignment))
- logger.info("Cleared %d spool assignments on switch to Spoolman mode", result.rowcount)
- # Switching back to internal mode: clear Spoolman slot assignments — the
- # symmetric counterpart of the clear above. Without this, stale
- # spoolman_slot_assignments rows linger and would wrongly count as
- # "assigned" in any mode-agnostic check (e.g. the missing-spool-
- # assignment notification, which unions both tables — #1473).
- elif old_val.lower() == "true" and new_val.lower() != "true":
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- result = await db.execute(delete(SpoolmanSlotAssignment))
- logger.info("Cleared %d Spoolman slot assignments on switch to internal mode", result.rowcount)
- if "spoolman_url" in settings:
- await set_setting(db, "spoolman_url", settings["spoolman_url"])
- if "spoolman_sync_mode" in settings:
- await set_setting(db, "spoolman_sync_mode", settings["spoolman_sync_mode"])
- if "spoolman_disable_weight_sync" in settings:
- await set_setting(db, "spoolman_disable_weight_sync", settings["spoolman_disable_weight_sync"])
- if "spoolman_report_partial_usage" in settings:
- await set_setting(db, "spoolman_report_partial_usage", settings["spoolman_report_partial_usage"])
- await db.commit()
- db.expire_all()
- # Return updated settings
- return await get_spoolman_settings(db)
- async def get_homeassistant_settings(db: AsyncSession) -> dict:
- """
- Get Home Assistant integration settings.
- Environment variables (HA_URL, HA_TOKEN) take precedence over database settings.
- """
- import os
- # Check environment variables first
- ha_url_env = os.environ.get("HA_URL")
- ha_token_env = os.environ.get("HA_TOKEN")
- # Fall back to database values
- ha_url = ha_url_env or await get_setting(db, "ha_url") or ""
- ha_token = ha_token_env or await get_setting(db, "ha_token") or ""
- ha_enabled_db = await get_setting(db, "ha_enabled") or "false"
- # Track which settings come from environment
- ha_url_from_env = bool(ha_url_env)
- ha_token_from_env = bool(ha_token_env)
- ha_env_managed = ha_url_from_env and ha_token_from_env
- # Auto-enable when both env vars are set, otherwise use database value
- if ha_url_env and ha_token_env:
- ha_enabled = True
- else:
- ha_enabled = ha_enabled_db.lower() == "true"
- return {
- "ha_enabled": ha_enabled,
- "ha_url": ha_url,
- "ha_token": ha_token,
- "ha_url_from_env": ha_url_from_env,
- "ha_token_from_env": ha_token_from_env,
- "ha_env_managed": ha_env_managed,
- }
- async def create_backup_zip(output_path: Path | None = None) -> tuple[Path, str]:
- """Create a complete backup ZIP (database + all data directories).
- If output_path is given, the ZIP is written there.
- Otherwise a temporary file is created (caller must clean up).
- Returns (zip_path, filename).
- """
- import shutil
- import tempfile
- from backend.app.core.db_dialect import is_sqlite
- base_dir = app_settings.base_dir
- filename = f"bambuddy-backup-{datetime.now().strftime('%Y%m%d-%H%M%S')}.zip"
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_path = Path(temp_dir)
- if is_sqlite():
- from sqlalchemy import text
- from backend.app.core.database import engine
- db_path = Path(app_settings.database_url.replace("sqlite+aiosqlite:///", ""))
- # Checkpoint WAL to ensure all data is in main db file
- async with engine.begin() as conn:
- await conn.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
- # Copy database file
- shutil.copy2(db_path, temp_path / "bambuddy.db")
- else:
- # PostgreSQL: export to a portable SQLite file via SQLAlchemy.
- # This makes backups restorable on both SQLite and Postgres installs.
- import json
- import sqlite3
- from backend.app.core.database import Base, engine
- backup_db_path = temp_path / "bambuddy.db"
- dst = sqlite3.connect(str(backup_db_path))
- metadata = Base.metadata
- # Create tables in SQLite backup (simplified — just column names and types)
- for table in metadata.sorted_tables:
- cols = []
- pk_cols = [col.name for col in table.columns if col.primary_key]
- for col in table.columns:
- col_type = _sqlalchemy_type_to_sqlite_type(str(col.type))
- # Only inline PRIMARY KEY for single-column PKs
- pk = " PRIMARY KEY" if col.primary_key and len(pk_cols) == 1 else ""
- cols.append(f"{col.name} {col_type}{pk}")
- # Add composite primary key constraint if needed
- if len(pk_cols) > 1:
- cols.append(f"PRIMARY KEY ({', '.join(pk_cols)})")
- dst.execute(f"CREATE TABLE IF NOT EXISTS {table.name} ({', '.join(cols)})") # noqa: S608
- # Export data from Postgres to SQLite
- async with engine.connect() as conn:
- for table in metadata.sorted_tables:
- result = await conn.execute(table.select())
- rows = result.fetchall()
- if not rows:
- continue
- columns = list(result.keys())
- placeholders = ", ".join(["?"] * len(columns))
- col_list = ", ".join(columns)
- insert_sql = f"INSERT INTO {table.name} ({col_list}) VALUES ({placeholders})" # noqa: S608 # nosec B608 — table/column names from ORM metadata, not user input
- def _serialize_row(row):
- return tuple(json.dumps(v) if isinstance(v, (list, dict)) else v for v in row)
- dst.executemany(insert_sql, [_serialize_row(row) for row in rows])
- dst.commit()
- dst.close()
- logger.info("PostgreSQL backup exported to portable SQLite format")
- # Copy data directories (if they exist)
- dirs_to_backup = [
- ("archive", base_dir / "archive"),
- ("virtual_printer", base_dir / "virtual_printer"),
- ("plate_calibration", app_settings.plate_calibration_dir),
- ("icons", base_dir / "icons"),
- ("projects", base_dir / "projects"),
- ]
- for name, src_dir in dirs_to_backup:
- if src_dir.exists() and any(src_dir.iterdir()):
- try:
- shutil.copytree(src_dir, temp_path / name)
- except shutil.Error as e:
- logger.warning("Some files in %s could not be copied: %s", name, e)
- except PermissionError as e:
- logger.warning("Permission denied copying %s: %s", name, e)
- # Include the MFA encryption key as a ZIP top-level entry alongside
- # bambuddy.db. Without it, encrypted client_secret / TOTP secret rows
- # would be unrecoverable after restore on a host without MFA_ENCRYPTION_KEY set.
- from backend.app.core.paths import resolve_data_dir
- mfa_key_src = resolve_data_dir() / ".mfa_encryption_key"
- if mfa_key_src.exists() and mfa_key_src.is_file():
- try:
- shutil.copy2(mfa_key_src, temp_path / ".mfa_encryption_key")
- except OSError as exc:
- logger.error(
- "Could not include MFA encryption key in backup (%s). "
- "The backup ZIP will not contain the key — restore on a "
- "keyless host will fail for encrypted secrets.",
- exc,
- )
- raise
- # Create ZIP
- if output_path is not None:
- zip_file = output_path / filename
- else:
- fd, tmp = tempfile.mkstemp(suffix=".zip")
- os.close(fd)
- zip_file = Path(tmp)
- with zipfile.ZipFile(zip_file, "w", zipfile.ZIP_DEFLATED) as zf:
- for file_path in temp_path.rglob("*"):
- if file_path.is_file():
- arcname = file_path.relative_to(temp_path)
- zf.write(file_path, arcname)
- return zip_file, filename
- @router.get("/backup")
- async def create_backup(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_BACKUP),
- ):
- """Create a complete backup (database + all files) as a ZIP download."""
- from starlette.background import BackgroundTask
- try:
- zip_file, filename = await create_backup_zip()
- return FileResponse(
- path=zip_file,
- filename=filename,
- media_type="application/zip",
- background=BackgroundTask(lambda: zip_file.unlink(missing_ok=True)),
- )
- except Exception as e:
- logger.error("Backup failed: %s", e, exc_info=True)
- return JSONResponse(
- status_code=500,
- content={"success": False, "message": "Backup failed. Check server logs for details."},
- )
- async def _import_sqlite_to_postgres(sqlite_path: Path, postgres_url: str):
- """Import data from a SQLite database file into the current PostgreSQL database.
- Used for cross-database restore (SQLite backup → PostgreSQL).
- Reads all tables from the SQLite file and bulk-inserts into Postgres.
- """
- import sqlite3
- from sqlalchemy import text
- from backend.app.core.database import Base, _create_engine
- # Create a temporary engine for the import (current engine was disposed)
- pg_engine = _create_engine()
- try:
- # Open SQLite file directly (sync — it's a local file read)
- src = sqlite3.connect(str(sqlite_path))
- src.row_factory = sqlite3.Row
- # Get list of tables from SQLite (skip internal/FTS tables)
- cursor = src.execute(
- "SELECT name FROM sqlite_master WHERE type='table' "
- "AND name NOT LIKE 'sqlite_%' AND name NOT LIKE 'archive_fts%'"
- )
- src_tables = {row["name"] for row in cursor.fetchall()}
- # Get Postgres tables from our ORM models
- metadata = Base.metadata
- pg_tables = set(metadata.tables.keys())
- # Only import tables that exist in both source and destination
- tables_to_import = src_tables & pg_tables
- sorted_tables = [t.name for t in metadata.sorted_tables if t.name in tables_to_import]
- # Phase 1: Drop all tables and recreate WITHOUT foreign keys.
- # This avoids all FK ordering/orphan issues during import.
- saved_fks = {}
- for table in metadata.sorted_tables:
- fks = list(table.foreign_key_constraints)
- if fks:
- saved_fks[table.name] = fks
- for fk in fks:
- table.constraints.discard(fk)
- async with pg_engine.begin() as conn:
- # Drop every existing table in the public schema with CASCADE
- # rather than `metadata.drop_all`. Two reasons:
- # 1. The user's live DB may carry orphan tables from removed
- # features (e.g. the legacy `spoolman_slot_assignments`,
- # `spoolman_k_profile`) that hold FK constraints back to
- # ORM tables. `drop_all` doesn't know they exist and emits
- # `DROP TABLE printers` without CASCADE — Postgres refuses
- # and the whole restore aborts (#XXXX).
- # 2. Even within the metadata, `drop_all` is FK-ordered and
- # breaks if a future schema rename leaves old constraints
- # around. CASCADE is the right tool for a destructive
- # restore: the user is intentionally wiping state.
- await conn.execute(
- text(
- "DO $$ DECLARE r RECORD; BEGIN "
- "FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') LOOP "
- "EXECUTE 'DROP TABLE IF EXISTS public.' || quote_ident(r.tablename) || ' CASCADE'; "
- "END LOOP; END $$;"
- )
- )
- await conn.run_sync(metadata.create_all)
- # Restore FK definitions in metadata (needed for re-adding later)
- for table_name, fks in saved_fks.items():
- table_obj = metadata.tables[table_name]
- for fk in fks:
- table_obj.constraints.add(fk)
- # Phase 2: Import data (no FKs to worry about)
- async with pg_engine.begin() as conn:
- # Import each table in dependency order (parents before children)
- for table_name in sorted_tables:
- rows = src.execute(f"SELECT * FROM {table_name}").fetchall() # noqa: S608 # nosec B608
- if not rows:
- continue
- # Filter to columns that exist in the Postgres table
- src_columns = rows[0].keys()
- pg_table = metadata.tables.get(table_name)
- pg_columns = {c.name for c in pg_table.columns} if pg_table is not None else set()
- columns = [c for c in src_columns if c in pg_columns]
- if not columns:
- continue
- col_list = ", ".join(columns)
- param_list = ", ".join(f":{c}" for c in columns)
- # ON CONFLICT DO NOTHING handles duplicate rows from SQLite (which doesn't enforce unique constraints)
- insert_sql = text(f"INSERT INTO {table_name} ({col_list}) VALUES ({param_list}) ON CONFLICT DO NOTHING") # noqa: S608 # nosec B608
- # Identify columns that need type conversion (SQLite stores booleans
- # as int and datetimes as str — asyncpg requires native Python types)
- from datetime import datetime as dt
- bool_columns = set()
- datetime_columns = set()
- not_null_defaults = {} # col_name -> default value for NOT NULL columns
- if pg_table is not None:
- for col in pg_table.columns:
- if col.name not in columns:
- continue
- col_type = str(col.type)
- if col_type == "BOOLEAN":
- bool_columns.add(col.name)
- elif col_type in ("DATETIME", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"):
- datetime_columns.add(col.name)
- # Track NOT NULL columns with defaults — older backups may have NULL
- # for columns added after the backup was created
- if not col.nullable:
- if col.default is not None:
- default = col.default.arg
- if callable(default):
- default = default(None)
- not_null_defaults[col.name] = default
- elif col.server_default is not None:
- # server_default=func.now() → use current timestamp
- if col.name in datetime_columns:
- not_null_defaults[col.name] = "__now__"
- else:
- # Try to extract literal server default
- sd = str(col.server_default.arg) if hasattr(col.server_default, "arg") else None
- if sd is not None:
- not_null_defaults[col.name] = sd
- now = dt.now()
- def _convert_row(
- row, cols=columns, bools=bool_columns, dts=datetime_columns, nn_defaults=not_null_defaults, _now=now
- ):
- result = {}
- for c in cols:
- val = row[c]
- if val is None and c in nn_defaults:
- val = _now if nn_defaults[c] == "__now__" else nn_defaults[c]
- if val is not None:
- if c in bools:
- val = bool(val)
- elif c in dts and isinstance(val, str):
- try:
- val = dt.fromisoformat(val)
- except ValueError:
- pass
- result[c] = val
- return result
- batch = [_convert_row(row) for row in rows]
- await conn.execute(insert_sql, batch)
- logger.info("Imported %d rows into %s", len(batch), table_name)
- # Reset sequences to max(id) + 1 for each table with an id column
- for table_name in sorted_tables:
- try:
- async with conn.begin_nested():
- result = await conn.execute(text(f"SELECT MAX(id) FROM {table_name}")) # noqa: S608 # nosec B608
- max_id = result.scalar()
- if max_id is not None:
- seq_name = f"{table_name}_id_seq"
- await conn.execute(text(f"SELECT setval('{seq_name}', {max_id})")) # noqa: S608
- except Exception:
- pass # Table may not have an id column or sequence
- src.close()
- logger.info("Cross-database import complete: %d tables imported", len(tables_to_import))
- # Recreate FK constraints from ORM metadata (not from saved definitions).
- # Use individual transactions so orphaned SQLite data doesn't block valid FKs.
- from sqlalchemy.schema import AddConstraint
- failed_fks = []
- for table in metadata.sorted_tables:
- for fk in table.foreign_key_constraints:
- try:
- async with pg_engine.begin() as fk_conn:
- await fk_conn.execute(AddConstraint(fk))
- except Exception:
- failed_fks.append(f"{table.name}.{fk.name}")
- if failed_fks:
- logger.warning(
- "Could not restore %d FK constraints (orphaned data in SQLite): %s",
- len(failed_fks),
- ", ".join(failed_fks),
- )
- finally:
- await pg_engine.dispose()
- @router.post("/restore")
- async def restore_backup(
- file: UploadFile = File(...),
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_RESTORE),
- ):
- """Restore from a complete backup ZIP.
- Replaces the database and all data directories from the backup ZIP.
- Requires a restart after restore.
- """
- import shutil
- import tempfile
- from fastapi import HTTPException
- from backend.app.core.database import close_all_connections, init_db, reinitialize_database
- from backend.app.core.db_dialect import is_sqlite
- from backend.app.services.virtual_printer import virtual_printer_manager
- base_dir = app_settings.base_dir
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_path = Path(temp_dir)
- # 1. Read and extract ZIP
- content = await file.read()
- # Check if it's a valid ZIP
- if not file.filename or not file.filename.endswith(".zip"):
- raise HTTPException(400, "Invalid backup file: must be a .zip file")
- try:
- with zipfile.ZipFile(io.BytesIO(content), "r") as zf:
- for name in zf.namelist():
- # Reject path-traversal payloads: any entry whose resolved
- # path escapes temp_path would allow writing arbitrary files
- # on the host (ZipSlip / CVE-2006-5456).
- dest = (temp_path / name).resolve()
- # is_relative_to (Python 3.9+) covers both relative
- # path-traversal (../etc/passwd) and absolute-path overrides
- # (/etc/passwd) — str.startswith was vulnerable to
- # prefix-collision attacks (e.g. /tmp/abc_evil/file passing
- # a /tmp/abc prefix check).
- if not dest.is_relative_to(temp_path.resolve()):
- raise HTTPException(400, f"Invalid backup: unsafe path in ZIP: {name!r}")
- zf.extractall(temp_path)
- except zipfile.BadZipFile:
- raise HTTPException(400, "Invalid backup file: not a valid ZIP")
- # 2. Validate backup
- backup_db = temp_path / "bambuddy.db"
- if not backup_db.exists():
- raise HTTPException(400, "Invalid backup: missing bambuddy.db")
- try:
- import asyncio
- # 3. Stop virtual printer if running (releases file locks)
- try:
- if virtual_printer_manager.is_enabled:
- logger.info("Stopping virtual printer for restore...")
- await virtual_printer_manager.configure(enabled=False)
- await asyncio.sleep(1)
- except Exception as e:
- logger.warning("Failed to stop virtual printer: %s", e)
- # 4. Close current database connections
- logger.info("Closing database connections...")
- await close_all_connections()
- # B1: Restore the MFA encryption key file BEFORE the database swap.
- # If the key write fails (OSError, RO disk, full disk, EACCES) we
- # can still abort while the live DB is intact. Doing this AFTER the
- # DB swap would leave the database with rows encrypted under the
- # backup's key but the running install holding only the old key —
- # every encrypted secret becomes unrecoverable.
- from backend.app.core.paths import resolve_data_dir
- mfa_key_src = temp_path / ".mfa_encryption_key"
- if mfa_key_src.exists() and mfa_key_src.is_file():
- dst_key = resolve_data_dir() / ".mfa_encryption_key"
- tmp_key = dst_key.parent / ".mfa_encryption_key.restore-tmp"
- try:
- dst_key.parent.mkdir(parents=True, exist_ok=True)
- # S1: atomic write with restrictive mode from creation.
- # O_TRUNC because a stale tmp may exist from a prior
- # failed restore attempt — we want to overwrite it.
- fd = os.open(str(tmp_key), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
- try:
- os.write(fd, mfa_key_src.read_bytes())
- finally:
- os.close(fd)
- # POSIX rename(2) — atomic when source/dest are on the
- # same filesystem (we're staying inside dst_key.parent).
- os.replace(str(tmp_key), str(dst_key))
- # S9: warn if the FS doesn't enforce 0o600
- actual_mode = dst_key.stat().st_mode & 0o777
- if actual_mode != 0o600:
- logger.warning(
- "Restored MFA key file %s: filesystem did not enforce 0o600 "
- "(actual: 0o%o). Key may be world-readable on Windows / SMB / FUSE.",
- dst_key,
- actual_mode,
- )
- logger.info("Restored .mfa_encryption_key from backup")
- except OSError as e:
- logger.error(
- "Could not write restored MFA key file to %s: %s — "
- "aborting BEFORE database swap (DB unchanged).",
- dst_key,
- e,
- exc_info=True,
- )
- raise HTTPException(
- status_code=500,
- detail=("Restore aborted: MFA key write failed. Database is unchanged. Check server logs."),
- ) from e
- # 5. Replace database
- logger.info("Restoring database from backup...")
- if is_sqlite():
- db_path = Path(app_settings.database_url.replace("sqlite+aiosqlite:///", ""))
- # Use SQLite's online backup API instead of shutil.copy2.
- # The pragma at database.py:19 runs the live DB in WAL mode,
- # which means a naive file copy is unsafe: anything written
- # to the live DB before this call that hasn't been
- # checkpointed yet (seed_default_groups + init_db on first
- # start, plus whatever background heartbeats wrote during
- # the request window) sits in bambuddy.db-wal with valid
- # checksums. The route handler's own `db: Depends(get_db)`
- # session also keeps a connection checked out across
- # engine.dispose(), holding fds to the WAL inode. With
- # `shutil.copy2` SQLite finds the stale WAL on the next
- # open and silently re-applies those page-level writes on
- # top of the restored DB, partially clobbering it with
- # fresh-install state — the user sees a "successful"
- # restore where most rows and settings have reverted to
- # defaults (#1211 / #668). The page-by-page backup API
- # opens both DBs as real SQLite connections, takes the
- # right locks, and routes new pages through the live DB's
- # own WAL — so concurrent open sessions see their own
- # snapshot until they close (transaction isolation) but
- # can't corrupt the restored state.
- import sqlite3
- src_conn = sqlite3.connect(str(backup_db))
- try:
- dst_conn = sqlite3.connect(str(db_path))
- try:
- src_conn.backup(dst_conn)
- finally:
- dst_conn.close()
- finally:
- src_conn.close()
- else:
- # Import SQLite backup into PostgreSQL
- logger.info("Importing SQLite backup into PostgreSQL...")
- await _import_sqlite_to_postgres(backup_db, app_settings.database_url)
- # 6. Replace data directories
- # For Docker compatibility: clear contents then copy (don't delete mount points)
- dirs_to_restore = [
- ("archive", base_dir / "archive"),
- ("virtual_printer", base_dir / "virtual_printer"),
- ("plate_calibration", app_settings.plate_calibration_dir),
- ("icons", base_dir / "icons"),
- ("projects", base_dir / "projects"),
- ]
- skipped_dirs = []
- for name, dest_dir in dirs_to_restore:
- src_dir = temp_path / name
- if src_dir.exists():
- logger.info("Restoring %s directory...", name)
- try:
- # Clear destination contents (not the dir itself - may be Docker mount)
- if dest_dir.exists():
- for item in dest_dir.iterdir():
- try:
- if item.is_dir():
- shutil.rmtree(item)
- else:
- item.unlink()
- except OSError as e:
- logger.warning("Could not delete %s: %s", item, e)
- else:
- dest_dir.mkdir(parents=True, exist_ok=True)
- # Copy contents from backup
- for item in src_dir.iterdir():
- dest_item = dest_dir / item.name
- if item.is_dir():
- shutil.copytree(item, dest_item)
- else:
- shutil.copy2(item, dest_item)
- except OSError as e:
- logger.warning("Could not restore %s directory: %s", name, e)
- skipped_dirs.append(name)
- # 7. Reset the encryption singleton so the migration that runs
- # inside init_db() picks up the restored key file (if a new one
- # was written above). Without this reset, _get_fernet would
- # return the cached Fernet instance built from the previous key.
- import backend.app.core.encryption as _enc_mod
- _enc_mod._fernet_instance = None
- _enc_mod._key_source = None
- _enc_mod._warn_shown = False
- # 8. Reinitialize the database engine and apply schema migrations so that
- # tables added after the backup was created (e.g. ams_labels) exist
- # immediately, without requiring a manual restart.
- await reinitialize_database()
- await init_db()
- logger.info("Restore complete - restart required")
- message = "Backup restored successfully. Please restart Bambuddy for changes to take effect."
- if skipped_dirs:
- message += f" Note: Some directories could not be restored ({', '.join(skipped_dirs)})."
- return {
- "success": True,
- "message": message,
- }
- except HTTPException:
- # Preserve specific HTTP error responses raised inside the restore
- # body (e.g. the key-write OSError → 500). The blanket
- # except Exception below would otherwise swallow them and replace
- # the operator-facing detail with a generic message.
- raise
- except Exception as e:
- logger.error("Restore failed: %s", e, exc_info=True)
- return JSONResponse(
- status_code=500,
- content={"success": False, "message": "Restore failed. Check server logs for details."},
- )
- @router.get("/network-interfaces")
- async def get_network_interfaces(
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get available network interfaces with all IPs (primary + aliases)."""
- from backend.app.services.network_utils import get_all_interface_ips
- interfaces = get_all_interface_ips()
- return {"interfaces": interfaces}
- @router.get("/virtual-printer/models")
- async def get_virtual_printer_models(
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get available virtual printer models."""
- from backend.app.services.virtual_printer import (
- DEFAULT_VIRTUAL_PRINTER_MODEL,
- VIRTUAL_PRINTER_MODELS,
- )
- return {
- "models": VIRTUAL_PRINTER_MODELS,
- "default": DEFAULT_VIRTUAL_PRINTER_MODEL,
- }
- @router.get("/virtual-printer")
- async def get_virtual_printer_settings(
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get virtual printer settings and status."""
- from backend.app.services.virtual_printer import (
- DEFAULT_VIRTUAL_PRINTER_MODEL,
- virtual_printer_manager,
- )
- enabled = await get_setting(db, "virtual_printer_enabled")
- access_code = await get_setting(db, "virtual_printer_access_code")
- mode = await get_setting(db, "virtual_printer_mode")
- model = await get_setting(db, "virtual_printer_model")
- target_printer_id = await get_setting(db, "virtual_printer_target_printer_id")
- remote_interface_ip = await get_setting(db, "virtual_printer_remote_interface_ip")
- tailscale_disabled_raw = await get_setting(db, "virtual_printer_tailscale_disabled")
- archive_name_source = await get_setting(db, "virtual_printer_archive_name_source")
- return {
- "enabled": enabled == "true" if enabled else False,
- "access_code_set": bool(access_code),
- "mode": mode or "immediate",
- "model": model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- "target_printer_id": int(target_printer_id) if target_printer_id else None,
- "remote_interface_ip": remote_interface_ip or "",
- "tailscale_disabled": tailscale_disabled_raw == "true" if tailscale_disabled_raw else True,
- "archive_name_source": archive_name_source if archive_name_source in ("metadata", "filename") else "metadata",
- "status": virtual_printer_manager.get_status(),
- }
- @router.put("/virtual-printer")
- async def update_virtual_printer_settings(
- enabled: bool = None,
- access_code: str = None,
- mode: str = None,
- model: str = None,
- target_printer_id: int = None,
- remote_interface_ip: str = None,
- tailscale_disabled: bool = None,
- archive_name_source: str = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
- ):
- """Update virtual printer settings and restart services if needed.
- For proxy mode with SSDP proxy (dual-homed setup):
- - remote_interface_ip: IP of interface on slicer's network (LAN B)
- - Local interface is auto-detected based on target printer IP
- """
- from sqlalchemy import select
- from backend.app.models.printer import Printer
- from backend.app.services.virtual_printer import (
- DEFAULT_VIRTUAL_PRINTER_MODEL,
- VIRTUAL_PRINTER_MODELS,
- virtual_printer_manager,
- )
- # Get current values
- current_enabled = await get_setting(db, "virtual_printer_enabled") == "true"
- current_access_code = await get_setting(db, "virtual_printer_access_code") or ""
- current_mode = await get_setting(db, "virtual_printer_mode") or "immediate"
- current_model = await get_setting(db, "virtual_printer_model") or DEFAULT_VIRTUAL_PRINTER_MODEL
- current_target_id_str = await get_setting(db, "virtual_printer_target_printer_id")
- current_target_id = int(current_target_id_str) if current_target_id_str else None
- current_remote_iface = await get_setting(db, "virtual_printer_remote_interface_ip") or ""
- current_ts_disabled_raw = await get_setting(db, "virtual_printer_tailscale_disabled")
- # Default True (opt-in) when the setting has never been saved — matches the model default.
- current_ts_disabled = current_ts_disabled_raw == "true" if current_ts_disabled_raw else True
- # Apply updates
- new_enabled = enabled if enabled is not None else current_enabled
- new_access_code = access_code if access_code is not None else current_access_code
- new_mode = mode if mode is not None else current_mode
- new_model = model if model is not None else current_model
- new_target_id = target_printer_id if target_printer_id is not None else current_target_id
- new_remote_iface = remote_interface_ip if remote_interface_ip is not None else current_remote_iface
- new_ts_disabled = tailscale_disabled if tailscale_disabled is not None else current_ts_disabled
- # Validate mode
- # "review" is the new name for "queue" (pending review before archiving)
- # "print_queue" archives and adds to print queue (unassigned)
- # "proxy" is transparent TCP proxy to a real printer
- if new_mode not in ("immediate", "queue", "review", "print_queue", "proxy"):
- return JSONResponse(
- status_code=400,
- content={"detail": "Mode must be 'immediate', 'review', 'print_queue', or 'proxy'"},
- )
- # Validate archive_name_source
- if archive_name_source is not None and archive_name_source not in ("metadata", "filename"):
- return JSONResponse(
- status_code=400,
- content={"detail": "archive_name_source must be 'metadata' or 'filename'"},
- )
- # Normalize legacy "queue" to "review" for storage
- if new_mode == "queue":
- new_mode = "review"
- # Validate model
- if model is not None and model not in VIRTUAL_PRINTER_MODELS:
- return JSONResponse(
- status_code=400,
- content={"detail": f"Invalid model. Must be one of: {', '.join(VIRTUAL_PRINTER_MODELS.keys())}"},
- )
- # Mode-specific validation and printer lookup
- target_printer_ip = ""
- target_printer_serial = ""
- if new_mode == "proxy":
- # Proxy mode requires target printer when enabling
- if new_enabled and not new_target_id:
- # If just switching to proxy mode (not explicitly enabling), auto-disable
- if enabled is None:
- new_enabled = False
- else:
- return JSONResponse(
- status_code=400,
- content={"detail": "Target printer is required for proxy mode"},
- )
- # Look up printer IP and serial if we have a target
- if new_target_id:
- result = await db.execute(select(Printer).where(Printer.id == new_target_id))
- printer = result.scalar_one_or_none()
- if not printer:
- return JSONResponse(
- status_code=400,
- content={"detail": f"Printer with ID {new_target_id} not found"},
- )
- target_printer_ip = printer.ip_address
- target_printer_serial = printer.serial_number
- # Access code not required for proxy mode
- else:
- # Non-proxy modes require access code when enabling
- if new_enabled and not new_access_code:
- # If just switching modes (not explicitly enabling), auto-disable
- if enabled is None:
- new_enabled = False
- else:
- return JSONResponse(
- status_code=400,
- content={"detail": "Access code is required when enabling virtual printer"},
- )
- # Validate access code length (Bambu Studio requires exactly 8 characters)
- if access_code is not None and access_code and len(access_code) != 8:
- return JSONResponse(
- status_code=400,
- content={"detail": "Access code must be exactly 8 characters"},
- )
- # Save settings
- await set_setting(db, "virtual_printer_enabled", "true" if new_enabled else "false")
- if access_code is not None:
- await set_setting(db, "virtual_printer_access_code", access_code)
- await set_setting(db, "virtual_printer_mode", new_mode)
- if model is not None:
- await set_setting(db, "virtual_printer_model", model)
- if target_printer_id is not None:
- await set_setting(db, "virtual_printer_target_printer_id", str(target_printer_id))
- if remote_interface_ip is not None:
- await set_setting(db, "virtual_printer_remote_interface_ip", remote_interface_ip)
- if tailscale_disabled is not None:
- await set_setting(db, "virtual_printer_tailscale_disabled", "true" if tailscale_disabled else "false")
- if archive_name_source is not None:
- await set_setting(db, "virtual_printer_archive_name_source", archive_name_source)
- # Propagate tailscale_disabled to the first VirtualPrinter row so sync_from_db() picks it up
- if tailscale_disabled is not None:
- from backend.app.models.virtual_printer import VirtualPrinter as VPModel
- vp_result = await db.execute(select(VPModel).order_by(VPModel.position).limit(1))
- first_vp = vp_result.scalar_one_or_none()
- if first_vp is not None:
- first_vp.tailscale_disabled = new_ts_disabled
- await db.commit()
- db.expire_all()
- # Reconfigure virtual printer
- try:
- await virtual_printer_manager.configure(
- enabled=new_enabled,
- access_code=new_access_code,
- mode=new_mode,
- model=new_model,
- target_printer_ip=target_printer_ip,
- target_printer_serial=target_printer_serial,
- remote_interface_ip=new_remote_iface,
- )
- except ValueError as e:
- logger.warning("Virtual printer configuration validation error: %s", e)
- return JSONResponse(
- status_code=400,
- content={"detail": "Invalid virtual printer configuration. Check the provided values."},
- )
- except Exception as e:
- logger.error("Failed to configure virtual printer: %s", e, exc_info=True)
- return JSONResponse(
- status_code=500,
- content={"detail": "Failed to configure virtual printer. Check server logs for details."},
- )
- return await get_virtual_printer_settings(db)
- # =============================================================================
- # MQTT Relay Settings
- # =============================================================================
- @router.get("/mqtt/status")
- async def get_mqtt_status(
- _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
- ):
- """Get MQTT relay connection status."""
- from backend.app.services.mqtt_relay import mqtt_relay
- return mqtt_relay.get_status()
|