| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- """Tests for the log-health scanner (backend/app/services/log_health.py)."""
- import pytest
- from backend.app.core.config import settings
- from backend.app.services.log_health import SIGNATURES, scan_logs
- def _line(level, logger, msg, ts="2026-05-22 10:00:00,000"):
- """Build one log line in the app's log format: TS LEVEL [logger] message."""
- return f"{ts} {level} [{logger}] {msg}"
- def _write_log(tmp_path, monkeypatch, lines):
- log_file = tmp_path / "bambuddy.log"
- log_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
- monkeypatch.setattr(settings, "log_dir", tmp_path)
- return log_file
- FTP_LOGGER = "backend.app.services.bambu_ftp"
- MQTT_LOGGER = "backend.app.services.bambu_mqtt"
- CAM_LOGGER = "backend.app.services.camera"
- def test_clean_log_has_no_findings(tmp_path, monkeypatch):
- _write_log(
- tmp_path,
- monkeypatch,
- [
- _line("INFO", "backend.app.main", "Application startup complete"),
- _line("INFO", FTP_LOGGER, "FTP connected, logging in as bblp"),
- ],
- )
- result = scan_logs()
- assert result.findings == []
- assert result.log_available is True
- assert result.scanned_entries == 2
- assert result.summary == {"total": 0, "layer8": 0, "environment": 0, "bug": 0}
- def test_log_unavailable_when_file_missing(tmp_path, monkeypatch):
- # No log file written.
- monkeypatch.setattr(settings, "log_dir", tmp_path)
- result = scan_logs()
- assert result.log_available is False
- assert result.findings == []
- def test_ftp_auth_rejected_is_detected(tmp_path, monkeypatch):
- _write_log(
- tmp_path,
- monkeypatch,
- [_line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9: 530 Login incorrect")],
- )
- result = scan_logs()
- assert len(result.findings) == 1
- f = result.findings[0]
- assert f.signature_id == "ftp-auth-rejected"
- assert f.severity == "error"
- assert f.category == "layer8"
- assert f.count == 1
- def test_min_count_gates_low_frequency_signals(tmp_path, monkeypatch):
- # ftp-connection-timeout requires min_count=3 — two hits must not surface.
- _write_log(
- tmp_path,
- monkeypatch,
- [_line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9: timed out")] * 2,
- )
- assert scan_logs().findings == []
- _write_log(
- tmp_path,
- monkeypatch,
- [_line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9: timed out")] * 3,
- )
- findings = scan_logs().findings
- assert len(findings) == 1
- assert findings[0].signature_id == "ftp-connection-timeout"
- assert findings[0].count == 3
- def test_aggregation_tracks_count_and_seen_range(tmp_path, monkeypatch):
- _write_log(
- tmp_path,
- monkeypatch,
- [
- _line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9", ts="2026-05-22 09:00:00,000"),
- _line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9", ts="2026-05-22 10:30:00,000"),
- ],
- )
- f = scan_logs().findings[0]
- assert f.count == 2
- assert f.first_seen == "2026-05-22 09:00:00,000"
- assert f.last_seen == "2026-05-22 10:30:00,000"
- def test_logger_prefix_filters_unrelated_loggers(tmp_path, monkeypatch):
- # Same text, but logged by an unrelated logger — must not match the
- # bambu_ftp-scoped signature.
- _write_log(
- tmp_path,
- monkeypatch,
- [_line("WARNING", "backend.app.services.something_else", "FTP connection permission error to 10.0.0.9")],
- )
- assert scan_logs().findings == []
- def test_min_level_filters_below_threshold(tmp_path, monkeypatch):
- # ftp-auth-rejected requires at least WARNING — an INFO line must not match.
- _write_log(
- tmp_path,
- monkeypatch,
- [_line("INFO", FTP_LOGGER, "FTP connection permission error to 10.0.0.9")],
- )
- assert scan_logs().findings == []
- def test_sample_is_sanitized(tmp_path, monkeypatch):
- _write_log(
- tmp_path,
- monkeypatch,
- [_line("WARNING", FTP_LOGGER, "FTP connection permission error to 192.168.1.50: 530")],
- )
- f = scan_logs().findings[0]
- assert "192.168.1.50" not in f.sample
- assert "[IP]" in f.sample
- def test_database_locked_matches_inside_traceback(tmp_path, monkeypatch):
- # The signature text appears on a continuation line of a multi-line entry;
- # read_log_entries folds it into the parent message.
- _write_log(
- tmp_path,
- monkeypatch,
- [
- _line("ERROR", "backend.app.core.database", "Unhandled DB error"),
- "Traceback (most recent call last):",
- "sqlite3.OperationalError: database is locked",
- ],
- )
- findings = scan_logs().findings
- assert len(findings) == 1
- assert findings[0].signature_id == "database-locked"
- assert findings[0].category == "environment"
- def test_findings_sorted_layer8_then_environment(tmp_path, monkeypatch):
- _write_log(
- tmp_path,
- monkeypatch,
- [
- _line("ERROR", "backend.app.core.database", "x: database is locked"),
- _line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9"),
- _line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9"),
- _line("WARNING", FTP_LOGGER, "FTP connection timed out to 10.0.0.9"),
- _line("WARNING", FTP_LOGGER, "FTP connection permission error to 10.0.0.9"),
- ],
- )
- ids = [f.signature_id for f in scan_logs().findings]
- # layer8 error, then layer8 warning, then environment.
- assert ids == ["ftp-auth-rejected", "ftp-connection-timeout", "database-locked"]
- def test_every_signature_id_is_unique():
- ids = [s.id for s in SIGNATURES]
- assert len(ids) == len(set(ids))
|