"""Unit tests for support module helper functions. Tests _anonymize_mqtt_broker, _check_port, _get_container_memory_limit, _format_bytes, and _collect_support_info diagnostic sections. """ import asyncio import tempfile from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest class TestApplyLogLevel: """Tests for _apply_log_level() debug noise suppression.""" def test_debug_mode_suppresses_sqlalchemy_to_warning(self): """Verify sqlalchemy.engine is set to WARNING (not INFO) in debug mode.""" import logging from backend.app.api.routes.support import _apply_log_level _apply_log_level(True) assert logging.getLogger("sqlalchemy.engine").level == logging.WARNING def test_debug_mode_suppresses_aiosqlite(self): """Verify aiosqlite is set to WARNING in debug mode to prevent cursor noise.""" import logging from backend.app.api.routes.support import _apply_log_level _apply_log_level(True) assert logging.getLogger("aiosqlite").level == logging.WARNING def test_debug_mode_enables_httpcore_debug(self): """Verify httpcore stays at DEBUG in debug mode.""" import logging from backend.app.api.routes.support import _apply_log_level _apply_log_level(True) assert logging.getLogger("httpcore").level == logging.DEBUG def test_non_debug_mode_suppresses_all_noisy_loggers(self): """Verify all noisy loggers are set to WARNING in non-debug mode.""" import logging from backend.app.api.routes.support import _apply_log_level _apply_log_level(False) assert logging.getLogger("sqlalchemy.engine").level == logging.WARNING assert logging.getLogger("httpcore").level == logging.WARNING assert logging.getLogger("httpx").level == logging.WARNING assert logging.getLogger("paho.mqtt").level == logging.WARNING class TestAnonymizeMqttBroker: """Tests for _anonymize_mqtt_broker().""" def test_empty_string(self): from backend.app.api.routes.support import _anonymize_mqtt_broker assert _anonymize_mqtt_broker("") == "" def test_ipv4_address(self): from backend.app.api.routes.support import _anonymize_mqtt_broker assert _anonymize_mqtt_broker("192.168.1.100") == "[IP]" def test_ipv6_address(self): from backend.app.api.routes.support import _anonymize_mqtt_broker assert _anonymize_mqtt_broker("::1") == "[IP]" def test_hostname_with_domain(self): from backend.app.api.routes.support import _anonymize_mqtt_broker assert _anonymize_mqtt_broker("mqtt.example.com") == "*.example.com" def test_hostname_with_subdomain(self): from backend.app.api.routes.support import _anonymize_mqtt_broker assert _anonymize_mqtt_broker("broker.mqtt.example.com") == "*.example.com" def test_single_part_hostname(self): from backend.app.api.routes.support import _anonymize_mqtt_broker assert _anonymize_mqtt_broker("localhost") == "localhost" class TestCheckPort: """Tests for _check_port().""" @pytest.mark.asyncio @pytest.mark.unit async def test_reachable_port(self): from backend.app.api.routes.support import _check_port # Mock a successful connection mock_writer = AsyncMock() mock_writer.close = MagicMock() mock_writer.wait_closed = AsyncMock() with patch("backend.app.api.routes.support.asyncio.open_connection", return_value=(AsyncMock(), mock_writer)): result = await _check_port("192.168.1.1", 8883, timeout=1.0) assert result is True @pytest.mark.asyncio @pytest.mark.unit async def test_unreachable_port(self): from backend.app.api.routes.support import _check_port with ( patch( "backend.app.api.routes.support.asyncio.open_connection", side_effect=ConnectionRefusedError, ), patch( "backend.app.api.routes.support.asyncio.wait_for", side_effect=ConnectionRefusedError, ), ): result = await _check_port("192.168.1.1", 8883, timeout=1.0) assert result is False @pytest.mark.asyncio @pytest.mark.unit async def test_timeout(self): from backend.app.api.routes.support import _check_port with patch( "backend.app.api.routes.support.asyncio.wait_for", side_effect=asyncio.TimeoutError, ): result = await _check_port("192.168.1.1", 8883, timeout=0.1) assert result is False class TestGetContainerMemoryLimit: """Tests for _get_container_memory_limit().""" def test_cgroup_v2_with_limit(self): from backend.app.api.routes.support import _get_container_memory_limit with tempfile.TemporaryDirectory() as tmpdir: v2_path = Path(tmpdir) / "memory.max" v2_path.write_text("1073741824\n") with patch("backend.app.api.routes.support.Path") as mock_path: # v2 path exists with value v2_mock = MagicMock() v2_mock.exists.return_value = True v2_mock.read_text.return_value = "1073741824\n" v1_mock = MagicMock() v1_mock.exists.return_value = False mock_path.side_effect = lambda p: v2_mock if "memory.max" in p else v1_mock result = _get_container_memory_limit() assert result == 1073741824 def test_cgroup_v2_unlimited(self): from backend.app.api.routes.support import _get_container_memory_limit with patch("backend.app.api.routes.support.Path") as mock_path: v2_mock = MagicMock() v2_mock.exists.return_value = True v2_mock.read_text.return_value = "max\n" v1_mock = MagicMock() v1_mock.exists.return_value = False mock_path.side_effect = lambda p: v2_mock if "memory.max" in p else v1_mock result = _get_container_memory_limit() assert result is None def test_no_cgroup_files(self): from backend.app.api.routes.support import _get_container_memory_limit with patch("backend.app.api.routes.support.Path") as mock_path: mock_instance = MagicMock() mock_instance.exists.return_value = False mock_path.return_value = mock_instance result = _get_container_memory_limit() assert result is None class TestFormatBytes: """Tests for _format_bytes().""" def test_bytes(self): from backend.app.api.routes.support import _format_bytes assert _format_bytes(500) == "500 B" def test_kilobytes(self): from backend.app.api.routes.support import _format_bytes assert _format_bytes(2048) == "2.0 KB" def test_megabytes(self): from backend.app.api.routes.support import _format_bytes assert _format_bytes(10 * 1024 * 1024) == "10.0 MB" def test_gigabytes(self): from backend.app.api.routes.support import _format_bytes assert _format_bytes(2 * 1024 * 1024 * 1024) == "2.00 GB" def test_zero(self): from backend.app.api.routes.support import _format_bytes assert _format_bytes(0) == "0 B" class TestSanitizeLogContent: """Tests for _sanitize_log_content() redaction.""" def test_ipv4_addresses_redacted(self): """IPv4 addresses in log lines are replaced with [IP].""" from backend.app.api.routes.support import _sanitize_log_content content = "2024-01-15 Connected to printer at 192.168.1.100 on port 8883" result = _sanitize_log_content(content) assert "192.168.1.100" not in result assert "[IP]" in result assert "on port 8883" in result def test_multiple_ipv4_addresses_redacted(self): """Multiple different IPs in the same line are all redacted.""" from backend.app.api.routes.support import _sanitize_log_content content = "Proxy 10.0.0.1 -> 192.168.1.50" result = _sanitize_log_content(content) assert result == "Proxy [IP] -> [IP]" def test_firmware_versions_with_leading_zeros_preserved(self): """Firmware versions like 01.09.01.00 have leading zeros and should NOT be redacted.""" from backend.app.api.routes.support import _sanitize_log_content content = "Firmware version: 01.09.01.00" result = _sanitize_log_content(content) assert "01.09.01.00" in result def test_firmware_version_mixed_with_ip(self): """Firmware versions preserved while real IPs are redacted in the same line.""" from backend.app.api.routes.support import _sanitize_log_content content = "Printer at 192.168.1.5 running firmware 01.07.02.00" result = _sanitize_log_content(content) assert "192.168.1.5" not in result assert "01.07.02.00" in result assert "[IP] running firmware 01.07.02.00" in result def test_printer_ip_from_sensitive_strings(self): """Printer IPs in sensitive_strings are replaced before regex pass.""" from backend.app.api.routes.support import _sanitize_log_content content = "Connecting to 192.168.1.100" result = _sanitize_log_content(content, sensitive_strings={"192.168.1.100": "[IP]"}) assert result == "Connecting to [IP]" def test_edge_case_zero_ip(self): """0.0.0.0 is a valid IP and should be redacted.""" from backend.app.api.routes.support import _sanitize_log_content content = "Binding to 0.0.0.0" result = _sanitize_log_content(content) assert result == "Binding to [IP]" def test_edge_case_broadcast_ip(self): """255.255.255.255 is a valid IP and should be redacted.""" from backend.app.api.routes.support import _sanitize_log_content content = "Broadcast to 255.255.255.255" result = _sanitize_log_content(content) assert result == "Broadcast to [IP]" def test_invalid_octet_not_redacted(self): """Octets >255 are not valid IPs and should not be redacted.""" from backend.app.api.routes.support import _sanitize_log_content content = "Value 999.999.999.999" result = _sanitize_log_content(content) assert "999.999.999.999" in result def test_existing_serial_redaction_still_works(self): """Serial number redaction still functions alongside IP redaction.""" from backend.app.api.routes.support import _sanitize_log_content content = "Printer 01SABCDEF1234 at 10.0.0.5" result = _sanitize_log_content(content) assert "[SERIAL]" in result assert "[IP]" in result assert "01SABCDEF1234" not in result assert "10.0.0.5" not in result def test_existing_email_redaction_still_works(self): """Email redaction still functions alongside IP redaction.""" from backend.app.api.routes.support import _sanitize_log_content content = "User user@example.com from 172.16.0.1" result = _sanitize_log_content(content) assert "[EMAIL]" in result assert "[IP]" in result def test_existing_path_redaction_still_works(self): """Path redaction still functions alongside IP redaction.""" from backend.app.api.routes.support import _sanitize_log_content content = "Config at /home/john/config.yaml from 192.168.0.1" result = _sanitize_log_content(content) assert "/home/[user]/" in result assert "[IP]" in result class TestCollectSupportInfo: """Tests for _collect_support_info() new diagnostic sections.""" @pytest.mark.asyncio @pytest.mark.unit async def test_environment_has_timezone(self): """Verify environment section includes timezone.""" from backend.app.api.routes.support import _collect_support_info with ( patch("backend.app.api.routes.support.is_running_in_docker", return_value=False), patch("backend.app.api.routes.support.async_session") as mock_session_ctx, patch("backend.app.api.routes.support.printer_manager") as mock_pm, patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]), patch("backend.app.api.routes.support.ws_manager") as mock_ws, patch.dict("os.environ", {"TZ": "America/New_York"}), ): mock_pm.get_all_statuses.return_value = {} mock_ws.active_connections = [] mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar.return_value = 0 mock_result.scalar_one_or_none.return_value = None mock_result.scalars.return_value.all.return_value = [] mock_db.execute = AsyncMock(return_value=mock_result) mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) info = await _collect_support_info() assert info["environment"]["timezone"] == "America/New_York" assert info["environment"]["docker"] is False @pytest.mark.asyncio @pytest.mark.unit async def test_docker_section_present_when_in_docker(self): """Verify docker section is added when running in Docker.""" from backend.app.api.routes.support import _collect_support_info with ( patch("backend.app.api.routes.support.is_running_in_docker", return_value=True), patch("backend.app.api.routes.support._get_container_memory_limit", return_value=1073741824), patch("backend.app.api.routes.support._detect_docker_network_mode", return_value="bridge"), patch("backend.app.api.routes.support.async_session") as mock_session_ctx, patch("backend.app.api.routes.support.printer_manager") as mock_pm, patch( "backend.app.api.routes.support.get_network_interfaces", return_value=[{"name": "eth0", "subnet": "172.17.0.0/16"}], ), patch("backend.app.api.routes.support.ws_manager") as mock_ws, ): mock_pm.get_all_statuses.return_value = {} mock_ws.active_connections = [] mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar.return_value = 0 mock_result.scalar_one_or_none.return_value = None mock_result.scalars.return_value.all.return_value = [] mock_db.execute = AsyncMock(return_value=mock_result) mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) info = await _collect_support_info() assert "docker" in info assert info["docker"]["container_memory_limit_bytes"] == 1073741824 assert info["docker"]["container_memory_limit_formatted"] == "1.00 GB" assert info["docker"]["network_mode_hint"] == "bridge" @pytest.mark.asyncio @pytest.mark.unit async def test_docker_section_absent_when_not_docker(self): """Verify docker section is absent when not in Docker.""" from backend.app.api.routes.support import _collect_support_info with ( patch("backend.app.api.routes.support.is_running_in_docker", return_value=False), patch("backend.app.api.routes.support.async_session") as mock_session_ctx, patch("backend.app.api.routes.support.printer_manager") as mock_pm, patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]), patch("backend.app.api.routes.support.ws_manager") as mock_ws, ): mock_pm.get_all_statuses.return_value = {} mock_ws.active_connections = [] mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar.return_value = 0 mock_result.scalar_one_or_none.return_value = None mock_result.scalars.return_value.all.return_value = [] mock_db.execute = AsyncMock(return_value=mock_result) mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) info = await _collect_support_info() assert "docker" not in info @pytest.mark.asyncio @pytest.mark.unit async def test_dependencies_section(self): """Verify dependencies section lists package versions.""" from backend.app.api.routes.support import _collect_support_info with ( patch("backend.app.api.routes.support.is_running_in_docker", return_value=False), patch("backend.app.api.routes.support.async_session") as mock_session_ctx, patch("backend.app.api.routes.support.printer_manager") as mock_pm, patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]), patch("backend.app.api.routes.support.ws_manager") as mock_ws, ): mock_pm.get_all_statuses.return_value = {} mock_ws.active_connections = [] mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar.return_value = 0 mock_result.scalar_one_or_none.return_value = None mock_result.scalars.return_value.all.return_value = [] mock_db.execute = AsyncMock(return_value=mock_result) mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) info = await _collect_support_info() assert "dependencies" in info # fastapi should be installed in test environment assert "fastapi" in info["dependencies"] assert info["dependencies"]["fastapi"] is not None @pytest.mark.asyncio @pytest.mark.unit async def test_websockets_section(self): """Verify websockets section shows connection count.""" from backend.app.api.routes.support import _collect_support_info with ( patch("backend.app.api.routes.support.is_running_in_docker", return_value=False), patch("backend.app.api.routes.support.async_session") as mock_session_ctx, patch("backend.app.api.routes.support.printer_manager") as mock_pm, patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]), patch("backend.app.api.routes.support.ws_manager") as mock_ws, ): mock_pm.get_all_statuses.return_value = {} mock_ws.active_connections = ["conn1", "conn2"] mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar.return_value = 0 mock_result.scalar_one_or_none.return_value = None mock_result.scalars.return_value.all.return_value = [] mock_db.execute = AsyncMock(return_value=mock_result) mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) info = await _collect_support_info() assert info["websockets"]["active_connections"] == 2 @pytest.mark.asyncio @pytest.mark.unit async def test_network_section(self): """Verify network section shows interface subnets.""" from backend.app.api.routes.support import _collect_support_info mock_interfaces = [ {"name": "eth0", "ip": "192.168.1.100", "netmask": "255.255.255.0", "subnet": "192.168.1.0/24"}, {"name": "wlan0", "ip": "10.0.0.50", "netmask": "255.255.255.0", "subnet": "10.0.0.0/24"}, ] with ( patch("backend.app.api.routes.support.is_running_in_docker", return_value=False), patch("backend.app.api.routes.support.async_session") as mock_session_ctx, patch("backend.app.api.routes.support.printer_manager") as mock_pm, patch("backend.app.api.routes.support.get_network_interfaces", return_value=mock_interfaces), patch("backend.app.api.routes.support.ws_manager") as mock_ws, ): mock_pm.get_all_statuses.return_value = {} mock_ws.active_connections = [] mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar.return_value = 0 mock_result.scalar_one_or_none.return_value = None mock_result.scalars.return_value.all.return_value = [] mock_db.execute = AsyncMock(return_value=mock_result) mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) info = await _collect_support_info() assert info["network"]["interface_count"] == 2 assert info["network"]["interfaces"][0]["name"] == "eth0" assert info["network"]["interfaces"][0]["subnet"] == "x.x.1.0/24" # Verify IP addresses are NOT included (first two octets masked) for iface in info["network"]["interfaces"]: assert "ip" not in iface assert iface["subnet"].startswith("x.x.") @pytest.mark.asyncio @pytest.mark.unit async def test_log_file_section(self): """Verify log file section shows size info.""" from backend.app.api.routes.support import _collect_support_info with tempfile.TemporaryDirectory() as tmpdir: log_dir = Path(tmpdir) log_file = log_dir / "bambuddy.log" log_file.write_text("some log content\n" * 100) with ( patch("backend.app.api.routes.support.is_running_in_docker", return_value=False), patch("backend.app.api.routes.support.async_session") as mock_session_ctx, patch("backend.app.api.routes.support.printer_manager") as mock_pm, patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]), patch("backend.app.api.routes.support.ws_manager") as mock_ws, patch("backend.app.api.routes.support.settings") as mock_settings, ): mock_settings.base_dir = Path(tmpdir) mock_settings.log_dir = log_dir mock_settings.debug = False mock_pm.get_all_statuses.return_value = {} mock_ws.active_connections = [] mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar.return_value = 0 mock_result.scalar_one_or_none.return_value = None mock_result.scalars.return_value.all.return_value = [] mock_db.execute = AsyncMock(return_value=mock_result) mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) info = await _collect_support_info() assert "log_file" in info assert info["log_file"]["size_bytes"] > 0 assert "B" in info["log_file"]["size_formatted"] or "KB" in info["log_file"]["size_formatted"]