test_log_health.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. """Tests for the log-health scanner (backend/app/services/log_health.py)."""
  2. import pytest
  3. from backend.app.core.config import settings
  4. from backend.app.services.log_health import SIGNATURES, scan_logs
  5. def _line(level, logger, msg, ts="2026-05-22 10:00:00,000"):
  6. """Build one log line in the app's log format: TS LEVEL [logger] message."""
  7. return f"{ts} {level} [{logger}] {msg}"
  8. def _write_log(tmp_path, monkeypatch, lines):
  9. log_file = tmp_path / "bambuddy.log"
  10. log_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
  11. monkeypatch.setattr(settings, "log_dir", tmp_path)
  12. return log_file
  13. FTP_LOGGER = "backend.app.services.bambu_ftp"
  14. MQTT_LOGGER = "backend.app.services.bambu_mqtt"
  15. CAM_LOGGER = "backend.app.services.camera"
  16. def test_clean_log_has_no_findings(tmp_path, monkeypatch):
  17. _write_log(
  18. tmp_path,
  19. monkeypatch,
  20. [
  21. _line("INFO", "backend.app.main", "Application startup complete"),
  22. _line("INFO", FTP_LOGGER, "FTP connected, logging in as bblp"),
  23. ],
  24. )
  25. result = scan_logs()
  26. assert result.findings == []
  27. assert result.log_available is True
  28. assert result.scanned_entries == 2
  29. assert result.summary == {"total": 0, "layer8": 0, "environment": 0, "bug": 0}
  30. def test_log_unavailable_when_file_missing(tmp_path, monkeypatch):
  31. # No log file written.
  32. monkeypatch.setattr(settings, "log_dir", tmp_path)
  33. result = scan_logs()
  34. assert result.log_available is False
  35. assert result.findings == []
  36. def test_ftp_auth_rejected_is_detected(tmp_path, monkeypatch):
  37. _write_log(
  38. tmp_path,
  39. monkeypatch,
  40. [_line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9: 530 Login incorrect")],
  41. )
  42. result = scan_logs()
  43. assert len(result.findings) == 1
  44. f = result.findings[0]
  45. assert f.signature_id == "ftp-auth-rejected"
  46. assert f.severity == "error"
  47. assert f.category == "layer8"
  48. assert f.count == 1
  49. def test_min_count_gates_low_frequency_signals(tmp_path, monkeypatch):
  50. # ftp-connection-timeout requires min_count=3 — two hits must not surface.
  51. _write_log(
  52. tmp_path,
  53. monkeypatch,
  54. [_line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9: timed out")] * 2,
  55. )
  56. assert scan_logs().findings == []
  57. _write_log(
  58. tmp_path,
  59. monkeypatch,
  60. [_line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9: timed out")] * 3,
  61. )
  62. findings = scan_logs().findings
  63. assert len(findings) == 1
  64. assert findings[0].signature_id == "ftp-connection-timeout"
  65. assert findings[0].count == 3
  66. def test_aggregation_tracks_count_and_seen_range(tmp_path, monkeypatch):
  67. _write_log(
  68. tmp_path,
  69. monkeypatch,
  70. [
  71. _line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9", ts="2026-05-22 09:00:00,000"),
  72. _line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9", ts="2026-05-22 10:30:00,000"),
  73. ],
  74. )
  75. f = scan_logs().findings[0]
  76. assert f.count == 2
  77. assert f.first_seen == "2026-05-22 09:00:00,000"
  78. assert f.last_seen == "2026-05-22 10:30:00,000"
  79. def test_logger_prefix_filters_unrelated_loggers(tmp_path, monkeypatch):
  80. # Same text, but logged by an unrelated logger — must not match the
  81. # bambu_ftp-scoped signature.
  82. _write_log(
  83. tmp_path,
  84. monkeypatch,
  85. [_line("WARNING", "backend.app.services.something_else", "FTP connection permission error to 10.0.0.9")],
  86. )
  87. assert scan_logs().findings == []
  88. def test_min_level_filters_below_threshold(tmp_path, monkeypatch):
  89. # ftp-auth-rejected requires at least WARNING — an INFO line must not match.
  90. _write_log(
  91. tmp_path,
  92. monkeypatch,
  93. [_line("INFO", FTP_LOGGER, "FTP connection permission error to 10.0.0.9")],
  94. )
  95. assert scan_logs().findings == []
  96. def test_sample_is_sanitized(tmp_path, monkeypatch):
  97. _write_log(
  98. tmp_path,
  99. monkeypatch,
  100. [_line("WARNING", FTP_LOGGER, "FTP connection permission error to 192.168.1.50: 530")],
  101. )
  102. f = scan_logs().findings[0]
  103. assert "192.168.1.50" not in f.sample
  104. assert "[IP]" in f.sample
  105. def test_database_locked_matches_inside_traceback(tmp_path, monkeypatch):
  106. # The signature text appears on a continuation line of a multi-line entry;
  107. # read_log_entries folds it into the parent message.
  108. _write_log(
  109. tmp_path,
  110. monkeypatch,
  111. [
  112. _line("ERROR", "backend.app.core.database", "Unhandled DB error"),
  113. "Traceback (most recent call last):",
  114. "sqlite3.OperationalError: database is locked",
  115. ],
  116. )
  117. findings = scan_logs().findings
  118. assert len(findings) == 1
  119. assert findings[0].signature_id == "database-locked"
  120. assert findings[0].category == "environment"
  121. def test_findings_sorted_layer8_then_environment(tmp_path, monkeypatch):
  122. _write_log(
  123. tmp_path,
  124. monkeypatch,
  125. [
  126. _line("ERROR", "backend.app.core.database", "x: database is locked"),
  127. _line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9"),
  128. _line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9"),
  129. _line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9"),
  130. _line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9"),
  131. ],
  132. )
  133. ids = [f.signature_id for f in scan_logs().findings]
  134. # layer8 error, then layer8 warning, then environment.
  135. assert ids == ["ftp-auth-rejected", "ftp-connection-timeout", "database-locked"]
  136. def test_every_signature_id_is_unique():
  137. ids = [s.id for s in SIGNATURES]
  138. assert len(ids) == len(set(ids))