|
@@ -33,6 +33,12 @@ from backend.app.models.settings import Settings
|
|
|
from backend.app.models.smart_plug import SmartPlug
|
|
from backend.app.models.smart_plug import SmartPlug
|
|
|
from backend.app.models.user import User
|
|
from backend.app.models.user import User
|
|
|
from backend.app.services.discovery import is_running_in_docker
|
|
from backend.app.services.discovery import is_running_in_docker
|
|
|
|
|
+from backend.app.services.log_reader import (
|
|
|
|
|
+ LogEntry,
|
|
|
|
|
+ collect_sensitive_strings,
|
|
|
|
|
+ read_log_entries,
|
|
|
|
|
+ sanitize_log_content,
|
|
|
|
|
+)
|
|
|
from backend.app.services.network_utils import get_network_interfaces
|
|
from backend.app.services.network_utils import get_network_interfaces
|
|
|
from backend.app.services.printer_manager import printer_manager
|
|
from backend.app.services.printer_manager import printer_manager
|
|
|
|
|
|
|
@@ -156,15 +162,6 @@ async def toggle_debug_logging(
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
-class LogEntry(BaseModel):
|
|
|
|
|
- """A single log entry."""
|
|
|
|
|
-
|
|
|
|
|
- timestamp: str
|
|
|
|
|
- level: str
|
|
|
|
|
- logger_name: str
|
|
|
|
|
- message: str
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
class LogsResponse(BaseModel):
|
|
class LogsResponse(BaseModel):
|
|
|
"""Response containing log entries."""
|
|
"""Response containing log entries."""
|
|
|
|
|
|
|
@@ -173,107 +170,6 @@ class LogsResponse(BaseModel):
|
|
|
filtered_count: int
|
|
filtered_count: int
|
|
|
|
|
|
|
|
|
|
|
|
|
-# Log line regex pattern: "2024-01-15 10:30:45,123 INFO [module.name] Message here"
|
|
|
|
|
-LOG_LINE_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+(\w+)\s+\[([^\]]+)\]\s+(.*)$")
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def _parse_log_line(line: str) -> LogEntry | None:
|
|
|
|
|
- """Parse a single log line into a LogEntry."""
|
|
|
|
|
- match = LOG_LINE_PATTERN.match(line.strip())
|
|
|
|
|
- if match:
|
|
|
|
|
- return LogEntry(
|
|
|
|
|
- timestamp=match.group(1),
|
|
|
|
|
- level=match.group(2),
|
|
|
|
|
- logger_name=match.group(3),
|
|
|
|
|
- message=match.group(4),
|
|
|
|
|
- )
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def _read_log_entries(
|
|
|
|
|
- limit: int = 200,
|
|
|
|
|
- level_filter: str | None = None,
|
|
|
|
|
- search: str | None = None,
|
|
|
|
|
-) -> tuple[list[LogEntry], int]:
|
|
|
|
|
- """Read and parse log entries from file with optional filtering."""
|
|
|
|
|
- log_file = settings.log_dir / "bambuddy.log"
|
|
|
|
|
- if not log_file.exists():
|
|
|
|
|
- return [], 0
|
|
|
|
|
-
|
|
|
|
|
- entries: list[LogEntry] = []
|
|
|
|
|
- total_lines = 0
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- with open(log_file, encoding="utf-8", errors="replace") as f:
|
|
|
|
|
- # Read all lines and process
|
|
|
|
|
- lines = f.readlines()
|
|
|
|
|
- total_lines = len(lines)
|
|
|
|
|
-
|
|
|
|
|
- # Parse lines in reverse order (newest first)
|
|
|
|
|
- current_entry: LogEntry | None = None
|
|
|
|
|
- multi_line_buffer: list[str] = []
|
|
|
|
|
-
|
|
|
|
|
- for line in reversed(lines):
|
|
|
|
|
- parsed = _parse_log_line(line)
|
|
|
|
|
- if parsed:
|
|
|
|
|
- # Found a new log entry start
|
|
|
|
|
- if current_entry:
|
|
|
|
|
- # Apply filters and add previous entry (without multi_line_buffer - it belongs to new entry)
|
|
|
|
|
- should_include = True
|
|
|
|
|
-
|
|
|
|
|
- # Level filter
|
|
|
|
|
- if level_filter and current_entry.level.upper() != level_filter.upper():
|
|
|
|
|
- should_include = False
|
|
|
|
|
-
|
|
|
|
|
- # Search filter (case-insensitive)
|
|
|
|
|
- if search and should_include:
|
|
|
|
|
- search_lower = search.lower()
|
|
|
|
|
- if not (
|
|
|
|
|
- search_lower in current_entry.message.lower()
|
|
|
|
|
- or search_lower in current_entry.logger_name.lower()
|
|
|
|
|
- ):
|
|
|
|
|
- should_include = False
|
|
|
|
|
-
|
|
|
|
|
- if should_include:
|
|
|
|
|
- entries.append(current_entry)
|
|
|
|
|
-
|
|
|
|
|
- if len(entries) >= limit:
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- # Set new entry and attach any accumulated multi-line content to it
|
|
|
|
|
- # (in reverse order, continuation lines come before their parent entry)
|
|
|
|
|
- current_entry = parsed
|
|
|
|
|
- if multi_line_buffer:
|
|
|
|
|
- current_entry.message += "\n" + "\n".join(reversed(multi_line_buffer))
|
|
|
|
|
- multi_line_buffer = []
|
|
|
|
|
- elif line.strip():
|
|
|
|
|
- # Continuation of multi-line log entry (will be attached to next parsed entry)
|
|
|
|
|
- multi_line_buffer.append(line.rstrip())
|
|
|
|
|
-
|
|
|
|
|
- # Don't forget the last (oldest) entry
|
|
|
|
|
- # Note: any remaining multi_line_buffer would be orphaned lines before the first entry
|
|
|
|
|
- if current_entry and len(entries) < limit:
|
|
|
|
|
- should_include = True
|
|
|
|
|
- if level_filter and current_entry.level.upper() != level_filter.upper():
|
|
|
|
|
- should_include = False
|
|
|
|
|
- if search and should_include:
|
|
|
|
|
- search_lower = search.lower()
|
|
|
|
|
- if not (
|
|
|
|
|
- search_lower in current_entry.message.lower()
|
|
|
|
|
- or search_lower in current_entry.logger_name.lower()
|
|
|
|
|
- ):
|
|
|
|
|
- should_include = False
|
|
|
|
|
- if should_include:
|
|
|
|
|
- entries.append(current_entry)
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error("Error reading log file: %s", e)
|
|
|
|
|
- return [], 0
|
|
|
|
|
-
|
|
|
|
|
- # Entries are already in newest-first order
|
|
|
|
|
- return entries, total_lines
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
@router.get("/logs", response_model=LogsResponse)
|
|
@router.get("/logs", response_model=LogsResponse)
|
|
|
async def get_logs(
|
|
async def get_logs(
|
|
|
limit: int = Query(200, ge=1, le=1000, description="Maximum number of entries to return"),
|
|
limit: int = Query(200, ge=1, le=1000, description="Maximum number of entries to return"),
|
|
@@ -282,7 +178,7 @@ async def get_logs(
|
|
|
_: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
|
|
_: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
|
|
|
):
|
|
):
|
|
|
"""Get recent application log entries with optional filtering."""
|
|
"""Get recent application log entries with optional filtering."""
|
|
|
- entries, total_lines = _read_log_entries(limit=limit, level_filter=level, search=search)
|
|
|
|
|
|
|
+ entries, total_lines = read_log_entries(limit=limit, level_filter=level, search=search)
|
|
|
|
|
|
|
|
return LogsResponse(
|
|
return LogsResponse(
|
|
|
entries=entries,
|
|
entries=entries,
|
|
@@ -1206,42 +1102,6 @@ async def _collect_support_info() -> dict:
|
|
|
return info
|
|
return info
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _sanitize_log_content(content: str, sensitive_strings: dict[str, str] | None = None) -> str:
|
|
|
|
|
- """Remove sensitive data from log content."""
|
|
|
|
|
- # First, replace known sensitive values (database-aware exact matching)
|
|
|
|
|
- # This catches printer names, usernames, and other arbitrary user-chosen strings
|
|
|
|
|
- # that regex patterns cannot detect
|
|
|
|
|
- if sensitive_strings:
|
|
|
|
|
- # Sort by length descending to avoid partial matches (e.g. "My Printer 1" before "My Printer")
|
|
|
|
|
- for value, label in sorted(sensitive_strings.items(), key=lambda x: len(x[0]), reverse=True):
|
|
|
|
|
- if len(value) < 3:
|
|
|
|
|
- continue # Skip very short strings to prevent over-redaction
|
|
|
|
|
- content = re.sub(re.escape(value), label, content)
|
|
|
|
|
-
|
|
|
|
|
- # Replace credentials in URLs (e.g. http://user:pass@host, rtsps://bblp:code@host)
|
|
|
|
|
- content = re.sub(r"((?:https?|rtsps?)://)[^/:@\s]+:[^/@\s]+@", r"\1[CREDENTIALS]@", content)
|
|
|
|
|
-
|
|
|
|
|
- # Replace email addresses
|
|
|
|
|
- content = re.sub(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "[EMAIL]", content)
|
|
|
|
|
-
|
|
|
|
|
- # Replace Bambu Lab printer serial numbers (format: 00M/01D/01S/01P/03W + alphanumeric, 12-16 chars total)
|
|
|
|
|
- content = re.sub(r"\b0[0-3][A-Z0-9][A-Z0-9]{9,13}\b", "[SERIAL]", content, flags=re.IGNORECASE)
|
|
|
|
|
-
|
|
|
|
|
- # Replace IPv4 addresses (skip firmware versions like 01.09.01.00 which have leading zeros)
|
|
|
|
|
- content = re.sub(
|
|
|
|
|
- r"\b(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)\b",
|
|
|
|
|
- "[IP]",
|
|
|
|
|
- content,
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # Replace paths with usernames
|
|
|
|
|
- content = re.sub(r"/home/[^/\s]+/", "/home/[user]/", content)
|
|
|
|
|
- content = re.sub(r"/Users/[^/\s]+/", "/Users/[user]/", content)
|
|
|
|
|
- content = re.sub(r"/opt/[^/\s]+/", "/opt/[user]/", content)
|
|
|
|
|
-
|
|
|
|
|
- return content
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
def _get_log_content(max_bytes: int = 10 * 1024 * 1024, sensitive_strings: dict[str, str] | None = None) -> bytes:
|
|
def _get_log_content(max_bytes: int = 10 * 1024 * 1024, sensitive_strings: dict[str, str] | None = None) -> bytes:
|
|
|
"""Get log file content, limited to max_bytes from the end."""
|
|
"""Get log file content, limited to max_bytes from the end."""
|
|
|
log_file = settings.log_dir / "bambuddy.log"
|
|
log_file = settings.log_dir / "bambuddy.log"
|
|
@@ -1260,35 +1120,15 @@ def _get_log_content(max_bytes: int = 10 * 1024 * 1024, sensitive_strings: dict[
|
|
|
content = f.read().decode("utf-8", errors="replace")
|
|
content = f.read().decode("utf-8", errors="replace")
|
|
|
|
|
|
|
|
# Sanitize sensitive data
|
|
# Sanitize sensitive data
|
|
|
- content = _sanitize_log_content(content, sensitive_strings)
|
|
|
|
|
|
|
+ content = sanitize_log_content(content, sensitive_strings)
|
|
|
return content.encode("utf-8")
|
|
return content.encode("utf-8")
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _get_recent_sanitized_logs(max_lines: int = 200) -> str:
|
|
async def _get_recent_sanitized_logs(max_lines: int = 200) -> str:
|
|
|
"""Get recent log lines, sanitized for inclusion in bug reports."""
|
|
"""Get recent log lines, sanitized for inclusion in bug reports."""
|
|
|
# Collect sensitive strings from DB for redaction
|
|
# Collect sensitive strings from DB for redaction
|
|
|
- sensitive_strings: dict[str, str] = {}
|
|
|
|
|
async with async_session() as db:
|
|
async with async_session() as db:
|
|
|
- result = await db.execute(select(Printer.name, Printer.serial_number, Printer.ip_address, Printer.access_code))
|
|
|
|
|
- for name, serial, ip_address, access_code in result.all():
|
|
|
|
|
- if name:
|
|
|
|
|
- sensitive_strings[name] = "[PRINTER]"
|
|
|
|
|
- if serial:
|
|
|
|
|
- sensitive_strings[serial] = "[SERIAL]"
|
|
|
|
|
- if ip_address:
|
|
|
|
|
- sensitive_strings[ip_address] = "[IP]"
|
|
|
|
|
- if access_code:
|
|
|
|
|
- sensitive_strings[access_code] = "[ACCESS_CODE]"
|
|
|
|
|
-
|
|
|
|
|
- result = await db.execute(select(User.username))
|
|
|
|
|
- for (username,) in result.all():
|
|
|
|
|
- if username:
|
|
|
|
|
- sensitive_strings[username] = "[USER]"
|
|
|
|
|
-
|
|
|
|
|
- result = await db.execute(select(Settings.value).where(Settings.key == "bambu_cloud_email"))
|
|
|
|
|
- cloud_email = result.scalar_one_or_none()
|
|
|
|
|
- if cloud_email:
|
|
|
|
|
- sensitive_strings[cloud_email] = "[EMAIL]"
|
|
|
|
|
|
|
+ sensitive_strings = await collect_sensitive_strings(db)
|
|
|
|
|
|
|
|
log_file = settings.log_dir / "bambuddy.log"
|
|
log_file = settings.log_dir / "bambuddy.log"
|
|
|
if not log_file.exists():
|
|
if not log_file.exists():
|
|
@@ -1299,7 +1139,7 @@ async def _get_recent_sanitized_logs(max_lines: int = 200) -> str:
|
|
|
content = log_file.read_text(encoding="utf-8", errors="replace")
|
|
content = log_file.read_text(encoding="utf-8", errors="replace")
|
|
|
lines = content.splitlines()
|
|
lines = content.splitlines()
|
|
|
recent = "\n".join(lines[-max_lines:])
|
|
recent = "\n".join(lines[-max_lines:])
|
|
|
- return _sanitize_log_content(recent, sensitive_strings)
|
|
|
|
|
|
|
+ return sanitize_log_content(recent, sensitive_strings)
|
|
|
except Exception:
|
|
except Exception:
|
|
|
logger.debug("Failed to read logs for bug report", exc_info=True)
|
|
logger.debug("Failed to read logs for bug report", exc_info=True)
|
|
|
return ""
|
|
return ""
|
|
@@ -1322,31 +1162,7 @@ async def generate_support_bundle(
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
# Collect known sensitive values for log redaction
|
|
# Collect known sensitive values for log redaction
|
|
|
- sensitive_strings: dict[str, str] = {}
|
|
|
|
|
-
|
|
|
|
|
- # Printer names, serial numbers, IP addresses, and access codes
|
|
|
|
|
- result = await db.execute(select(Printer.name, Printer.serial_number, Printer.ip_address, Printer.access_code))
|
|
|
|
|
- for name, serial, ip_address, access_code in result.all():
|
|
|
|
|
- if name:
|
|
|
|
|
- sensitive_strings[name] = "[PRINTER]"
|
|
|
|
|
- if serial:
|
|
|
|
|
- sensitive_strings[serial] = "[SERIAL]"
|
|
|
|
|
- if ip_address:
|
|
|
|
|
- sensitive_strings[ip_address] = "[IP]"
|
|
|
|
|
- if access_code:
|
|
|
|
|
- sensitive_strings[access_code] = "[ACCESS_CODE]"
|
|
|
|
|
-
|
|
|
|
|
- # Auth usernames
|
|
|
|
|
- result = await db.execute(select(User.username))
|
|
|
|
|
- for (username,) in result.all():
|
|
|
|
|
- if username:
|
|
|
|
|
- sensitive_strings[username] = "[USER]"
|
|
|
|
|
-
|
|
|
|
|
- # Bambu Cloud email
|
|
|
|
|
- result = await db.execute(select(Settings.value).where(Settings.key == "bambu_cloud_email"))
|
|
|
|
|
- cloud_email = result.scalar_one_or_none()
|
|
|
|
|
- if cloud_email:
|
|
|
|
|
- sensitive_strings[cloud_email] = "[EMAIL]"
|
|
|
|
|
|
|
+ sensitive_strings = await collect_sensitive_strings(db)
|
|
|
|
|
|
|
|
# Collect support info
|
|
# Collect support info
|
|
|
support_info = await _collect_support_info()
|
|
support_info = await _collect_support_info()
|