| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- """Unit tests for bug report service and route."""
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- class TestBugReportService:
- """Tests for bug_report.submit_report()."""
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_submit_success(self):
- """Successful relay call saves report and returns issue details."""
- from backend.app.services.bug_report import submit_report
- mock_response = MagicMock()
- mock_response.status_code = 200
- mock_response.json.return_value = {
- "success": True,
- "message": "Created",
- "issue_url": "https://github.com/maziggy/bambuddy/issues/99",
- "issue_number": 99,
- }
- mock_db = AsyncMock()
- mock_db.add = MagicMock()
- mock_db.commit = AsyncMock()
- with (
- patch("backend.app.services.bug_report.httpx.AsyncClient") as mock_client_cls,
- patch("backend.app.services.bug_report.async_session") as mock_session,
- patch("backend.app.services.bug_report._rate_limit_timestamps", []),
- patch("backend.app.services.bug_report.BUG_REPORT_RELAY_URL", "https://example.com/api/bug-report"),
- ):
- mock_client = AsyncMock()
- mock_client.post = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- mock_client_cls.return_value = mock_client
- mock_session.return_value.__aenter__ = AsyncMock(return_value=mock_db)
- mock_session.return_value.__aexit__ = AsyncMock(return_value=False)
- result = await submit_report(
- description="Test bug",
- reporter_email="user@test.com",
- screenshot_base64=None,
- support_info=None,
- )
- assert result["success"] is True
- assert result["issue_number"] == 99
- assert result["issue_url"] == "https://github.com/maziggy/bambuddy/issues/99"
- mock_db.add.assert_called_once()
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_submit_rate_limited(self):
- """Returns failure when rate limit exceeded."""
- import time
- from backend.app.services.bug_report import submit_report
- timestamps = [time.time()] * 5 # Already at limit
- with patch("backend.app.services.bug_report._rate_limit_timestamps", timestamps):
- result = await submit_report(
- description="Test",
- reporter_email=None,
- screenshot_base64=None,
- support_info=None,
- )
- assert result["success"] is False
- assert "Rate limit" in result["message"]
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_submit_no_relay_url(self):
- """Returns failure when relay URL is not configured."""
- from backend.app.services.bug_report import submit_report
- with (
- patch("backend.app.services.bug_report._rate_limit_timestamps", []),
- patch("backend.app.services.bug_report.BUG_REPORT_RELAY_URL", ""),
- ):
- result = await submit_report(
- description="Test",
- reporter_email=None,
- screenshot_base64=None,
- support_info=None,
- )
- assert result["success"] is False
- assert "not configured" in result["message"]
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_submit_relay_http_error(self):
- """Non-200 relay response saves failed report."""
- from backend.app.services.bug_report import submit_report
- mock_response = MagicMock()
- mock_response.status_code = 500
- mock_db = AsyncMock()
- mock_db.add = MagicMock()
- mock_db.commit = AsyncMock()
- with (
- patch("backend.app.services.bug_report.httpx.AsyncClient") as mock_client_cls,
- patch("backend.app.services.bug_report.async_session") as mock_session,
- patch("backend.app.services.bug_report._rate_limit_timestamps", []),
- patch("backend.app.services.bug_report.BUG_REPORT_RELAY_URL", "https://example.com/api/bug-report"),
- ):
- mock_client = AsyncMock()
- mock_client.post = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- mock_client_cls.return_value = mock_client
- mock_session.return_value.__aenter__ = AsyncMock(return_value=mock_db)
- mock_session.return_value.__aexit__ = AsyncMock(return_value=False)
- result = await submit_report(
- description="Test",
- reporter_email=None,
- screenshot_base64=None,
- support_info=None,
- )
- assert result["success"] is False
- assert "not available" in result["message"]
- mock_db.add.assert_called_once()
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_submit_relay_connection_error(self):
- """Connection failure saves failed report."""
- from backend.app.services.bug_report import submit_report
- mock_db = AsyncMock()
- mock_db.add = MagicMock()
- mock_db.commit = AsyncMock()
- with (
- patch("backend.app.services.bug_report.httpx.AsyncClient") as mock_client_cls,
- patch("backend.app.services.bug_report.async_session") as mock_session,
- patch("backend.app.services.bug_report._rate_limit_timestamps", []),
- patch("backend.app.services.bug_report.BUG_REPORT_RELAY_URL", "https://example.com/api/bug-report"),
- ):
- mock_client = AsyncMock()
- mock_client.post = AsyncMock(side_effect=ConnectionError("Connection refused"))
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- mock_client_cls.return_value = mock_client
- mock_session.return_value.__aenter__ = AsyncMock(return_value=mock_db)
- mock_session.return_value.__aexit__ = AsyncMock(return_value=False)
- result = await submit_report(
- description="Test",
- reporter_email=None,
- screenshot_base64=None,
- support_info=None,
- )
- assert result["success"] is False
- assert "Failed to submit" in result["message"]
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_submit_relay_failure_response(self):
- """Relay returns success=false in JSON body."""
- from backend.app.services.bug_report import submit_report
- mock_response = MagicMock()
- mock_response.status_code = 200
- mock_response.json.return_value = {
- "success": False,
- "message": "GitHub API error",
- }
- mock_db = AsyncMock()
- mock_db.add = MagicMock()
- mock_db.commit = AsyncMock()
- with (
- patch("backend.app.services.bug_report.httpx.AsyncClient") as mock_client_cls,
- patch("backend.app.services.bug_report.async_session") as mock_session,
- patch("backend.app.services.bug_report._rate_limit_timestamps", []),
- patch("backend.app.services.bug_report.BUG_REPORT_RELAY_URL", "https://example.com/api/bug-report"),
- ):
- mock_client = AsyncMock()
- mock_client.post = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- mock_client_cls.return_value = mock_client
- mock_session.return_value.__aenter__ = AsyncMock(return_value=mock_db)
- mock_session.return_value.__aexit__ = AsyncMock(return_value=False)
- result = await submit_report(
- description="Test",
- reporter_email=None,
- screenshot_base64=None,
- support_info=None,
- )
- assert result["success"] is False
- assert "GitHub API error" in result["message"]
- class TestCollectDebugLogs:
- """Tests for _collect_debug_logs()."""
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_enables_debug_when_not_already_enabled(self):
- """Debug logging is enabled, then restored after collection."""
- from backend.app.api.routes.bug_report import _collect_debug_logs
- apply_calls = []
- mock_db = AsyncMock()
- with (
- patch("backend.app.api.routes.bug_report.async_session") as mock_session,
- patch("backend.app.api.routes.bug_report._get_debug_setting", return_value=(False, None)),
- patch("backend.app.api.routes.bug_report._set_debug_setting", new_callable=AsyncMock) as mock_set,
- patch(
- "backend.app.api.routes.bug_report._apply_log_level",
- side_effect=lambda v: apply_calls.append(v),
- ),
- patch("backend.app.api.routes.bug_report.printer_manager") as mock_pm,
- patch("backend.app.api.routes.bug_report._get_recent_sanitized_logs", return_value="DEBUG log line"),
- patch("backend.app.api.routes.bug_report.asyncio.sleep", new_callable=AsyncMock),
- patch("backend.app.api.routes.bug_report.LOG_COLLECTION_SECONDS", 0),
- ):
- mock_pm._clients = {}
- mock_session.return_value.__aenter__ = AsyncMock(return_value=mock_db)
- mock_session.return_value.__aexit__ = AsyncMock(return_value=False)
- result = await _collect_debug_logs()
- assert result == "DEBUG log line"
- assert apply_calls == [True, False] # enabled then restored
- assert mock_set.call_count == 2
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_skips_enable_when_already_debug(self):
- """Debug logging not toggled when already enabled."""
- from backend.app.api.routes.bug_report import _collect_debug_logs
- with (
- patch("backend.app.api.routes.bug_report.async_session") as mock_session,
- patch("backend.app.api.routes.bug_report._get_debug_setting", return_value=(True, None)),
- patch("backend.app.api.routes.bug_report._set_debug_setting", new_callable=AsyncMock) as mock_set,
- patch("backend.app.api.routes.bug_report._apply_log_level") as mock_apply,
- patch("backend.app.api.routes.bug_report.printer_manager") as mock_pm,
- patch("backend.app.api.routes.bug_report._get_recent_sanitized_logs", return_value="logs"),
- patch("backend.app.api.routes.bug_report.asyncio.sleep", new_callable=AsyncMock),
- patch("backend.app.api.routes.bug_report.LOG_COLLECTION_SECONDS", 0),
- ):
- mock_pm._clients = {}
- mock_db = AsyncMock()
- mock_session.return_value.__aenter__ = AsyncMock(return_value=mock_db)
- mock_session.return_value.__aexit__ = AsyncMock(return_value=False)
- result = await _collect_debug_logs()
- assert result == "logs"
- mock_apply.assert_not_called()
- mock_set.assert_not_called()
- @pytest.mark.asyncio
- @pytest.mark.unit
- async def test_pushes_all_connected_printers(self):
- """Sends status update request to all connected printers."""
- from backend.app.api.routes.bug_report import _collect_debug_logs
- with (
- patch("backend.app.api.routes.bug_report.async_session") as mock_session,
- patch("backend.app.api.routes.bug_report._get_debug_setting", return_value=(True, None)),
- patch("backend.app.api.routes.bug_report._set_debug_setting", new_callable=AsyncMock),
- patch("backend.app.api.routes.bug_report._apply_log_level"),
- patch("backend.app.api.routes.bug_report.printer_manager") as mock_pm,
- patch("backend.app.api.routes.bug_report._get_recent_sanitized_logs", return_value=""),
- patch("backend.app.api.routes.bug_report.asyncio.sleep", new_callable=AsyncMock),
- patch("backend.app.api.routes.bug_report.LOG_COLLECTION_SECONDS", 0),
- ):
- mock_pm._clients = {"printer1": MagicMock(), "printer2": MagicMock()}
- mock_db = AsyncMock()
- mock_session.return_value.__aenter__ = AsyncMock(return_value=mock_db)
- mock_session.return_value.__aexit__ = AsyncMock(return_value=False)
- await _collect_debug_logs()
- assert mock_pm.request_status_update.call_count == 2
- class TestRateLimit:
- """Tests for rate limiting in bug report service."""
- def test_check_rate_limit_allows_first(self):
- """First request within window is allowed."""
- from backend.app.services.bug_report import _check_rate_limit
- with patch("backend.app.services.bug_report._rate_limit_timestamps", []):
- assert _check_rate_limit() is True
- def test_check_rate_limit_blocks_at_max(self):
- """Requests at max limit are blocked."""
- import time
- from backend.app.services.bug_report import _check_rate_limit
- now = time.time()
- timestamps = [now] * 5
- with patch("backend.app.services.bug_report._rate_limit_timestamps", timestamps):
- assert _check_rate_limit() is False
- def test_check_rate_limit_clears_old(self):
- """Old timestamps outside window are cleared."""
- import time
- from backend.app.services.bug_report import _check_rate_limit
- old_time = time.time() - 7200 # 2 hours ago
- timestamps = [old_time] * 5
- with patch("backend.app.services.bug_report._rate_limit_timestamps", timestamps):
- assert _check_rate_limit() is True
|