log_reader.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. """Shared primitives for reading, parsing, and sanitizing the Bambuddy app log.
  2. Extracted from ``routes/support.py`` so service-layer code (e.g. the log-health
  3. scanner in ``log_health.py``) can reuse log reading and redaction without
  4. importing from the API layer. ``support.py`` re-imports these helpers and keeps
  5. its own route handlers.
  6. """
  7. import logging
  8. import re
  9. from pydantic import BaseModel
  10. from sqlalchemy import select
  11. from sqlalchemy.ext.asyncio import AsyncSession
  12. from backend.app.core.config import settings
  13. from backend.app.models.printer import Printer
  14. from backend.app.models.settings import Settings
  15. from backend.app.models.user import User
  16. logger = logging.getLogger(__name__)
  17. # Log line format: "2024-01-15 10:30:45,123 INFO [module.name] [trace_id] Message"
  18. # The trace_id is left as part of the message group — callers that need it can
  19. # parse it out; the log-health scanner does not.
  20. LOG_LINE_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+(\w+)\s+\[([^\]]+)\]\s+(.*)$")
  21. class LogEntry(BaseModel):
  22. """A single parsed log entry."""
  23. timestamp: str
  24. level: str
  25. logger_name: str
  26. message: str
  27. def parse_log_line(line: str) -> LogEntry | None:
  28. """Parse a single log line into a LogEntry, or None if it is not a line start."""
  29. match = LOG_LINE_PATTERN.match(line.strip())
  30. if match:
  31. return LogEntry(
  32. timestamp=match.group(1),
  33. level=match.group(2),
  34. logger_name=match.group(3),
  35. message=match.group(4),
  36. )
  37. return None
  38. def read_log_entries(
  39. limit: int = 200,
  40. level_filter: str | None = None,
  41. search: str | None = None,
  42. ) -> tuple[list[LogEntry], int]:
  43. """Read and parse log entries from ``bambuddy.log``, newest first.
  44. Continuation lines (tracebacks etc.) are folded into the message of the
  45. entry they belong to. Returns ``(entries, total_lines_in_file)``.
  46. """
  47. log_file = settings.log_dir / "bambuddy.log"
  48. if not log_file.exists():
  49. return [], 0
  50. entries: list[LogEntry] = []
  51. total_lines = 0
  52. try:
  53. with open(log_file, encoding="utf-8", errors="replace") as f:
  54. lines = f.readlines()
  55. total_lines = len(lines)
  56. # Parse lines in reverse order (newest first)
  57. current_entry: LogEntry | None = None
  58. multi_line_buffer: list[str] = []
  59. for line in reversed(lines):
  60. parsed = parse_log_line(line)
  61. if parsed:
  62. # Found a new log entry start
  63. if current_entry:
  64. # Apply filters and add previous entry (without multi_line_buffer - it belongs to new entry)
  65. should_include = True
  66. # Level filter
  67. if level_filter and current_entry.level.upper() != level_filter.upper():
  68. should_include = False
  69. # Search filter (case-insensitive)
  70. if search and should_include:
  71. search_lower = search.lower()
  72. if not (
  73. search_lower in current_entry.message.lower()
  74. or search_lower in current_entry.logger_name.lower()
  75. ):
  76. should_include = False
  77. if should_include:
  78. entries.append(current_entry)
  79. if len(entries) >= limit:
  80. break
  81. # Set new entry and attach any accumulated multi-line content to it
  82. # (in reverse order, continuation lines come before their parent entry)
  83. current_entry = parsed
  84. if multi_line_buffer:
  85. current_entry.message += "\n" + "\n".join(reversed(multi_line_buffer))
  86. multi_line_buffer = []
  87. elif line.strip():
  88. # Continuation of multi-line log entry (will be attached to next parsed entry)
  89. multi_line_buffer.append(line.rstrip())
  90. # Don't forget the last (oldest) entry
  91. # Note: any remaining multi_line_buffer would be orphaned lines before the first entry
  92. if current_entry and len(entries) < limit:
  93. should_include = True
  94. if level_filter and current_entry.level.upper() != level_filter.upper():
  95. should_include = False
  96. if search and should_include:
  97. search_lower = search.lower()
  98. if not (
  99. search_lower in current_entry.message.lower()
  100. or search_lower in current_entry.logger_name.lower()
  101. ):
  102. should_include = False
  103. if should_include:
  104. entries.append(current_entry)
  105. except Exception as e:
  106. logger.error("Error reading log file: %s", e)
  107. return [], 0
  108. # Entries are already in newest-first order
  109. return entries, total_lines
  110. def sanitize_log_content(content: str, sensitive_strings: dict[str, str] | None = None) -> str:
  111. """Remove sensitive data from log content.
  112. ``sensitive_strings`` maps known exact values (printer names, serials, etc.)
  113. to replacement labels; pass the result of :func:`collect_sensitive_strings`.
  114. Regex passes additionally redact credentials in URLs, emails, serials, and
  115. IP addresses that were not captured by exact matching.
  116. """
  117. # First, replace known sensitive values (database-aware exact matching)
  118. # This catches printer names, usernames, and other arbitrary user-chosen strings
  119. # that regex patterns cannot detect
  120. if sensitive_strings:
  121. # Sort by length descending to avoid partial matches (e.g. "My Printer 1" before "My Printer")
  122. for value, label in sorted(sensitive_strings.items(), key=lambda x: len(x[0]), reverse=True):
  123. if len(value) < 3:
  124. continue # Skip very short strings to prevent over-redaction
  125. content = re.sub(re.escape(value), label, content)
  126. # Replace credentials in URLs (e.g. http://user:pass@host, rtsps://bblp:code@host)
  127. content = re.sub(r"((?:https?|rtsps?)://)[^/:@\s]+:[^/@\s]+@", r"\1[CREDENTIALS]@", content)
  128. # Replace email addresses
  129. content = re.sub(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "[EMAIL]", content)
  130. # Replace Bambu Lab printer serial numbers (format: 00M/01D/01S/01P/03W + alphanumeric, 12-16 chars total)
  131. content = re.sub(r"\b0[0-3][A-Z0-9][A-Z0-9]{9,13}\b", "[SERIAL]", content, flags=re.IGNORECASE)
  132. # Replace IPv4 addresses (skip firmware versions like 01.09.01.00 which have leading zeros)
  133. content = re.sub(
  134. r"\b(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)\b",
  135. "[IP]",
  136. content,
  137. )
  138. # Replace paths with usernames
  139. content = re.sub(r"/home/[^/\s]+/", "/home/[user]/", content)
  140. content = re.sub(r"/Users/[^/\s]+/", "/Users/[user]/", content)
  141. content = re.sub(r"/opt/[^/\s]+/", "/opt/[user]/", content)
  142. return content
  143. async def collect_sensitive_strings(db: AsyncSession) -> dict[str, str]:
  144. """Collect known sensitive values from the database for log redaction.
  145. Covers printer names, serial numbers, IP addresses, access codes, auth
  146. usernames, and the Bambu Cloud email. Pass the result to
  147. :func:`sanitize_log_content`.
  148. """
  149. sensitive_strings: dict[str, str] = {}
  150. # Printer names, serial numbers, IP addresses, and access codes
  151. result = await db.execute(select(Printer.name, Printer.serial_number, Printer.ip_address, Printer.access_code))
  152. for name, serial, ip_address, access_code in result.all():
  153. if name:
  154. sensitive_strings[name] = "[PRINTER]"
  155. if serial:
  156. sensitive_strings[serial] = "[SERIAL]"
  157. if ip_address:
  158. sensitive_strings[ip_address] = "[IP]"
  159. if access_code:
  160. sensitive_strings[access_code] = "[ACCESS_CODE]"
  161. # Auth usernames
  162. result = await db.execute(select(User.username))
  163. for (username,) in result.all():
  164. if username:
  165. sensitive_strings[username] = "[USER]"
  166. # Bambu Cloud email
  167. result = await db.execute(select(Settings.value).where(Settings.key == "bambu_cloud_email"))
  168. cloud_email = result.scalar_one_or_none()
  169. if cloud_email:
  170. sensitive_strings[cloud_email] = "[EMAIL]"
  171. return sensitive_strings