"""Unit tests for NotificationService. Tests event-based notifications and toggle behavior. """ import json from unittest.mock import AsyncMock, MagicMock, patch import pytest from backend.app.services.notification_service import NotificationService class TestNotificationService: """Tests for NotificationService class.""" @pytest.fixture def service(self): """Create a fresh NotificationService instance.""" return NotificationService() @pytest.fixture def mock_provider(self): """Create a mock notification provider.""" provider = MagicMock() provider.id = 1 provider.name = "Test Provider" provider.provider_type = "webhook" provider.enabled = True provider.config = json.dumps({"webhook_url": "http://test.local/webhook"}) provider.on_print_start = True provider.on_print_complete = True provider.on_print_failed = True provider.on_print_stopped = False provider.on_print_progress = False provider.on_printer_offline = False provider.on_printer_error = False provider.on_filament_low = False provider.on_maintenance_due = False provider.on_ams_humidity_high = False provider.on_ams_temperature_high = False provider.quiet_hours_enabled = False provider.quiet_hours_start = None provider.quiet_hours_end = None provider.daily_digest_enabled = False provider.daily_digest_time = None provider.printer_id = None return provider @pytest.fixture def mock_db(self): """Create a mock database session.""" db = AsyncMock() db.commit = AsyncMock() return db # ======================================================================== # Tests for on_print_start # ======================================================================== @pytest.mark.asyncio async def test_on_print_start_sends_notification(self, service, mock_provider, mock_db): """Verify notification is sent when print starts.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Print Started", "Test Printer: test.3mf") await service.on_print_start( printer_id=1, printer_name="Test Printer", data={"filename": "test.3mf", "subtask_name": "test"}, db=mock_db, ) mock_get.assert_called_once() mock_send.assert_called_once() @pytest.mark.asyncio async def test_on_print_start_skipped_when_no_providers(self, service, mock_db): """Verify no error when no providers are configured for event.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, ): mock_get.return_value = [] await service.on_print_start( printer_id=1, printer_name="Test Printer", data={}, db=mock_db, ) mock_send.assert_not_called() # ======================================================================== # Tests for on_print_complete (status routing) # ======================================================================== @pytest.mark.asyncio async def test_on_print_complete_routes_completed_status(self, service, mock_provider, mock_db): """Verify completed status uses on_print_complete field.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Test", "Test") await service.on_print_complete( printer_id=1, printer_name="Test", status="completed", data={}, db=mock_db, ) # Verify the correct event field was queried call_args = mock_get.call_args assert call_args[0][1] == "on_print_complete" @pytest.mark.asyncio async def test_on_print_complete_routes_failed_status(self, service, mock_provider, mock_db): """Verify failed status uses on_print_failed field.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Test", "Test") await service.on_print_complete( printer_id=1, printer_name="Test", status="failed", data={}, db=mock_db, ) call_args = mock_get.call_args assert call_args[0][1] == "on_print_failed" @pytest.mark.asyncio async def test_on_print_complete_routes_stopped_status(self, service, mock_provider, mock_db): """Verify stopped status uses on_print_stopped field.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Test", "Test") await service.on_print_complete( printer_id=1, printer_name="Test", status="stopped", data={}, db=mock_db, ) call_args = mock_get.call_args assert call_args[0][1] == "on_print_stopped" @pytest.mark.asyncio async def test_on_print_complete_routes_aborted_status(self, service, mock_provider, mock_db): """Verify aborted status uses on_print_stopped field.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Test", "Test") await service.on_print_complete( printer_id=1, printer_name="Test", status="aborted", data={}, db=mock_db, ) call_args = mock_get.call_args assert call_args[0][1] == "on_print_stopped" # ======================================================================== # Tests for provider filtering # ======================================================================== @pytest.mark.asyncio async def test_disabled_provider_not_returned(self, service, mock_provider, mock_db): """CRITICAL: Verify disabled providers don't receive notifications.""" mock_provider.enabled = False # The actual filtering happens in _get_providers_for_event # which queries only enabled providers with patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get: # Simulate the query filtering out disabled providers mock_get.return_value = [] result = await service._get_providers_for_event(mock_db, "on_print_start", printer_id=1) assert len(result) == 0 @pytest.mark.asyncio async def test_provider_filtered_by_printer_id(self, service, mock_provider, mock_db): """Verify providers can be filtered by specific printer.""" mock_provider.printer_id = 2 # Linked to printer 2 with patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get: # When querying for printer 1, provider linked to printer 2 is excluded mock_get.return_value = [] result = await service._get_providers_for_event(mock_db, "on_print_start", printer_id=1) assert len(result) == 0 # ======================================================================== # Tests for quiet hours # ======================================================================== def test_is_in_quiet_hours_during_quiet_period(self, service, mock_provider): """Verify notifications are blocked during quiet hours.""" mock_provider.quiet_hours_enabled = True mock_provider.quiet_hours_start = "22:00" mock_provider.quiet_hours_end = "07:00" with patch("backend.app.services.notification_service.datetime") as mock_datetime: # Test during quiet hours (23:00) mock_now = MagicMock() mock_now.hour = 23 mock_now.minute = 0 mock_datetime.now.return_value = mock_now result = service._is_in_quiet_hours(mock_provider) assert result is True def test_is_in_quiet_hours_outside_quiet_period(self, service, mock_provider): """Verify notifications are allowed outside quiet hours.""" mock_provider.quiet_hours_enabled = True mock_provider.quiet_hours_start = "22:00" mock_provider.quiet_hours_end = "07:00" with patch("backend.app.services.notification_service.datetime") as mock_datetime: # Test outside quiet hours (12:00) mock_now = MagicMock() mock_now.hour = 12 mock_now.minute = 0 mock_datetime.now.return_value = mock_now result = service._is_in_quiet_hours(mock_provider) assert result is False def test_is_in_quiet_hours_disabled(self, service, mock_provider): """Verify quiet hours check returns False when disabled.""" mock_provider.quiet_hours_enabled = False result = service._is_in_quiet_hours(mock_provider) assert result is False def test_is_in_quiet_hours_early_morning(self, service, mock_provider): """Verify quiet hours work across midnight (early morning).""" mock_provider.quiet_hours_enabled = True mock_provider.quiet_hours_start = "22:00" mock_provider.quiet_hours_end = "07:00" with patch("backend.app.services.notification_service.datetime") as mock_datetime: # Test early morning (03:00) - should be in quiet hours mock_now = MagicMock() mock_now.hour = 3 mock_now.minute = 0 mock_datetime.now.return_value = mock_now result = service._is_in_quiet_hours(mock_provider) assert result is True # ======================================================================== # Tests for AMS alarms # ======================================================================== @pytest.mark.asyncio async def test_on_ams_humidity_high_sends_notification(self, service, mock_provider, mock_db): """Verify AMS humidity alarm sends notification.""" mock_provider.on_ams_humidity_high = True with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("AMS Humidity Alert", "High humidity detected") await service.on_ams_humidity_high( printer_id=1, printer_name="Test Printer", ams_label="AMS-A", humidity=75.0, threshold=60.0, db=mock_db, ) mock_send.assert_called_once() # Verify force_immediate is True for alarms call_kwargs = mock_send.call_args[1] assert call_kwargs.get("force_immediate") is True @pytest.mark.asyncio async def test_on_ams_temperature_high_sends_notification(self, service, mock_provider, mock_db): """Verify AMS temperature alarm sends notification.""" mock_provider.on_ams_temperature_high = True with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("AMS Temperature Alert", "High temp detected") await service.on_ams_temperature_high( printer_id=1, printer_name="Test Printer", ams_label="AMS-A", temperature=40.0, threshold=35.0, db=mock_db, ) mock_send.assert_called_once() # Verify force_immediate is True for alarms call_kwargs = mock_send.call_args[1] assert call_kwargs.get("force_immediate") is True @pytest.mark.asyncio async def test_ams_alarm_skipped_when_toggle_disabled(self, service, mock_provider, mock_db): """CRITICAL: Verify AMS alarms respect toggle setting.""" mock_provider.on_ams_humidity_high = False with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, ): # Provider with toggle disabled won't be returned mock_get.return_value = [] await service.on_ams_humidity_high( printer_id=1, printer_name="Test", ams_label="AMS-A", humidity=75.0, threshold=60.0, db=mock_db, ) mock_send.assert_not_called() # ======================================================================== # Tests for daily digest # ======================================================================== @pytest.mark.asyncio async def test_daily_digest_queues_notification(self, service, mock_provider, mock_db): """Verify notifications are queued when digest mode is enabled.""" mock_provider.daily_digest_enabled = True mock_provider.daily_digest_time = "09:00" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Test", "Test") await service.on_print_complete( printer_id=1, printer_name="Test", status="completed", data={}, db=mock_db, ) # When digest is enabled, _send_to_providers should still be called # but internally it will queue instead of send immediately mock_send.assert_called_once() @pytest.mark.asyncio async def test_force_immediate_bypasses_digest(self, service, mock_provider, mock_db): """Verify force_immediate=True bypasses digest mode.""" mock_provider.daily_digest_enabled = True mock_provider.on_ams_humidity_high = True with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Alert", "Alert message") await service.on_ams_humidity_high( printer_id=1, printer_name="Test", ams_label="AMS-A", humidity=75.0, threshold=60.0, db=mock_db, ) # Verify force_immediate is passed call_kwargs = mock_send.call_args[1] assert call_kwargs.get("force_immediate") is True class TestDigestModeAlwaysSendsImmediately: """CRITICAL: Tests that notifications always send immediately regardless of digest setting.""" @pytest.fixture def service(self): return NotificationService() @pytest.mark.asyncio async def test_notification_sends_immediately_even_with_digest_enabled(self, service): """CRITICAL: All notifications must be sent immediately, digest is just a summary.""" # Create a mock provider with digest enabled mock_provider = MagicMock() mock_provider.id = 1 mock_provider.name = "Test Provider" mock_provider.provider_type = "ntfy" mock_provider.enabled = True mock_provider.daily_digest_enabled = True # Digest enabled mock_provider.daily_digest_time = "23:59" mock_provider.config = '{"server": "https://ntfy.sh", "topic": "test"}' mock_db = AsyncMock() # Mock the _send_to_provider method with ( patch.object(service, "_send_to_provider", new_callable=AsyncMock) as mock_send, patch.object(service, "_queue_for_digest", new_callable=AsyncMock) as mock_queue, patch.object(service, "_update_provider_status", new_callable=AsyncMock), patch.object(service, "_log_notification", new_callable=AsyncMock), ): mock_send.return_value = (True, None) await service._send_to_providers( providers=[mock_provider], title="Print Started", message="Your print has started", db=mock_db, event_type="print_start", ) # CRITICAL: _send_to_provider MUST be called (immediate send) mock_send.assert_called_once() # Digest queue should also be called (for daily summary) mock_queue.assert_called_once() @pytest.mark.asyncio async def test_notification_sends_without_digest_queue_when_disabled(self, service): """When digest is disabled, notification sends but no digest queue.""" mock_provider = MagicMock() mock_provider.id = 1 mock_provider.name = "Test Provider" mock_provider.provider_type = "ntfy" mock_provider.enabled = True mock_provider.daily_digest_enabled = False # Digest disabled mock_provider.daily_digest_time = None mock_provider.config = '{"server": "https://ntfy.sh", "topic": "test"}' mock_db = AsyncMock() with ( patch.object(service, "_send_to_provider", new_callable=AsyncMock) as mock_send, patch.object(service, "_queue_for_digest", new_callable=AsyncMock) as mock_queue, patch.object(service, "_update_provider_status", new_callable=AsyncMock), patch.object(service, "_log_notification", new_callable=AsyncMock), ): mock_send.return_value = (True, None) await service._send_to_providers( providers=[mock_provider], title="Print Started", message="Your print has started", db=mock_db, event_type="print_start", ) # Notification must still be sent immediately mock_send.assert_called_once() # Digest queue should NOT be called when digest is disabled mock_queue.assert_not_called() class TestNotificationProviderTypes: """Tests for different notification provider types.""" @pytest.fixture def service(self): return NotificationService() @pytest.mark.asyncio async def test_webhook_provider_sends_request(self, service): """Verify webhook provider sends HTTP request.""" config = { "webhook_url": "http://test.local/webhook", "field_title": "title", "field_message": "message", } # Create a mock response mock_response = MagicMock() mock_response.status_code = 200 # Mock the _get_client method mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client: mock_get_client.return_value = mock_client success, message = await service._send_webhook(config, "Test Title", "Test Message") assert success is True mock_client.post.assert_called_once() @pytest.mark.asyncio async def test_webhook_handles_failure(self, service): """Verify webhook gracefully handles HTTP errors.""" config = { "webhook_url": "http://test.local/webhook", } with patch("httpx.AsyncClient") as mock_client_class: mock_instance = AsyncMock() mock_instance.post.side_effect = Exception("Connection failed") mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_instance) mock_client_class.return_value.__aexit__ = AsyncMock() success, message = await service._send_webhook(config, "Test", "Test") assert success is False assert "Connection failed" in message or "error" in message.lower() @pytest.mark.asyncio async def test_webhook_slack_format_sends_text_only(self, service): """Verify Slack/Mattermost format sends only text field.""" config = { "webhook_url": "http://mattermost.local/hooks/abc123", "payload_format": "slack", } mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client: mock_get_client.return_value = mock_client success, message = await service._send_webhook(config, "Test Title", "Test Message") assert success is True mock_client.post.assert_called_once() # Verify payload format is Slack-compatible call_args = mock_client.post.call_args payload = call_args.kwargs.get("json") or call_args[1].get("json") assert "text" in payload assert "*Test Title*" in payload["text"] assert "Test Message" in payload["text"] # Should NOT have generic fields assert "timestamp" not in payload assert "source" not in payload @pytest.mark.asyncio async def test_webhook_generic_format_includes_image(self, service): """Verify generic webhook includes base64-encoded image when provided.""" config = { "webhook_url": "http://test.local/webhook", "field_title": "title", "field_message": "message", } mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client: mock_get_client.return_value = mock_client image_bytes = b"\xff\xd8\xff\xe0fake-jpeg-data" success, message = await service._send_webhook(config, "Test Title", "Test Message", image_data=image_bytes) assert success is True call_args = mock_client.post.call_args payload = call_args.kwargs.get("json") or call_args[1].get("json") assert "image" in payload import base64 assert payload["image"] == base64.b64encode(image_bytes).decode("ascii") @pytest.mark.asyncio async def test_webhook_generic_format_no_image_when_none(self, service): """Verify generic webhook omits image field when no image_data provided.""" config = { "webhook_url": "http://test.local/webhook", "field_title": "title", "field_message": "message", } mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client: mock_get_client.return_value = mock_client success, message = await service._send_webhook(config, "Test Title", "Test Message") assert success is True call_args = mock_client.post.call_args payload = call_args.kwargs.get("json") or call_args[1].get("json") assert "image" not in payload @pytest.mark.asyncio async def test_webhook_slack_format_excludes_image(self, service): """Verify Slack format does not include image even when provided.""" config = { "webhook_url": "http://mattermost.local/hooks/abc123", "payload_format": "slack", } mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client: mock_get_client.return_value = mock_client success, message = await service._send_webhook( config, "Test Title", "Test Message", image_data=b"fake-image" ) assert success is True call_args = mock_client.post.call_args payload = call_args.kwargs.get("json") or call_args[1].get("json") assert "image" not in payload class TestNtfyPriority: """Per-event ntfy Priority header (#990).""" @pytest.fixture def service(self): return NotificationService() @staticmethod def _mock_client(service): """Patch _get_client and return the mock client + 200 response.""" mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_client.put = AsyncMock(return_value=mock_response) return mock_client @pytest.mark.asyncio async def test_priority_header_set_for_mapped_event(self, service): """Mapped event → ntfy Priority header carries the configured value.""" config = { "topic": "bambuddy", "event_priorities": {"on_print_failed": 5, "on_print_complete": 2}, } mock_client = self._mock_client(service) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_client success, _ = await service._send_ntfy(config, "Title", "Body", event_type="on_print_failed") assert success is True headers = mock_client.post.call_args.kwargs["headers"] assert headers.get("Priority") == "5" @pytest.mark.asyncio async def test_priority_header_omitted_for_unmapped_event(self, service): """Unmapped event → no Priority header so ntfy uses its server default.""" config = { "topic": "bambuddy", "event_priorities": {"on_print_failed": 5}, } mock_client = self._mock_client(service) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_client await service._send_ntfy(config, "Title", "Body", event_type="on_print_complete") headers = mock_client.post.call_args.kwargs["headers"] assert "Priority" not in headers @pytest.mark.asyncio async def test_priority_header_omitted_when_no_priorities_set(self, service): """Existing setups (no event_priorities key) keep current behaviour.""" config = {"topic": "bambuddy"} mock_client = self._mock_client(service) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_client await service._send_ntfy(config, "Title", "Body", event_type="on_print_failed") headers = mock_client.post.call_args.kwargs["headers"] assert "Priority" not in headers @pytest.mark.asyncio async def test_priority_header_omitted_when_event_type_missing(self, service): """Test sends (no event_type) must not emit a Priority header.""" config = { "topic": "bambuddy", "event_priorities": {"on_print_failed": 5}, } mock_client = self._mock_client(service) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_client await service._send_ntfy(config, "Title", "Body") headers = mock_client.post.call_args.kwargs["headers"] assert "Priority" not in headers @pytest.mark.asyncio async def test_priority_out_of_range_is_ignored(self, service): """Values outside 1-5 (or non-numeric) are dropped, not clamped.""" for bad in (0, 6, 99, -1, "not-a-number", None): config = { "topic": "bambuddy", "event_priorities": {"on_print_failed": bad}, } mock_client = self._mock_client(service) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_client await service._send_ntfy(config, "Title", "Body", event_type="on_print_failed") headers = mock_client.post.call_args.kwargs["headers"] assert "Priority" not in headers, f"unexpected header for bad value {bad!r}" @pytest.mark.asyncio async def test_priority_header_set_on_attachment_path(self, service): """Image-attachment path (PUT) must also carry the Priority header.""" config = { "topic": "bambuddy", "event_priorities": {"on_first_layer_complete": 4}, } mock_client = self._mock_client(service) with patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_client await service._send_ntfy( config, "Title", "Body", image_data=b"\xff\xd8\xff\xe0fake-jpeg", event_type="on_first_layer_complete", ) headers = mock_client.put.call_args.kwargs["headers"] assert headers.get("Priority") == "4" class TestHomeAssistantProvider: """Tests for Home Assistant notification provider.""" @pytest.fixture def service(self): return NotificationService() @pytest.mark.asyncio async def test_send_homeassistant_success(self, service): """Verify HA provider sends persistent notification to correct endpoint.""" mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_db = AsyncMock() with ( patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client, patch( "backend.app.api.routes.settings.get_homeassistant_settings", new_callable=AsyncMock, ) as mock_ha_settings, ): mock_get_client.return_value = mock_client mock_ha_settings.return_value = { "ha_url": "http://ha.local:8123", "ha_token": "test-token-123", "ha_enabled": True, } success, message = await service._send_homeassistant({}, "Test Title", "Test Message", db=mock_db) assert success is True mock_client.post.assert_called_once() call_args = mock_client.post.call_args assert call_args[0][0] == "http://ha.local:8123/api/services/persistent_notification/create" payload = call_args.kwargs.get("json") or call_args[1].get("json") assert payload["title"] == "Test Title" assert payload["message"] == "Test Message" @pytest.mark.asyncio async def test_send_homeassistant_no_db_no_env(self, service): """Verify HA provider fails gracefully without DB or env vars.""" with patch.dict("os.environ", {}, clear=True): success, message = await service._send_homeassistant({}, "Test", "Test", db=None) assert success is False assert "not configured" in message.lower() @pytest.mark.asyncio async def test_send_homeassistant_auth_failure(self, service): """Verify HA provider reports auth failure.""" mock_response = MagicMock() mock_response.status_code = 401 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_db = AsyncMock() with ( patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client, patch( "backend.app.api.routes.settings.get_homeassistant_settings", new_callable=AsyncMock, ) as mock_ha_settings, ): mock_get_client.return_value = mock_client mock_ha_settings.return_value = { "ha_url": "http://ha.local:8123", "ha_token": "bad-token", "ha_enabled": True, } success, message = await service._send_homeassistant({}, "Test", "Test", db=mock_db) assert success is False assert "authentication" in message.lower() @pytest.mark.asyncio async def test_send_homeassistant_env_fallback(self, service): """Verify HA provider falls back to env vars when no DB session.""" mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) with ( patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client, patch.dict("os.environ", {"HA_URL": "http://env-ha:8123", "HA_TOKEN": "env-token"}), ): mock_get_client.return_value = mock_client success, message = await service._send_homeassistant({}, "Test", "Test", db=None) assert success is True call_args = mock_client.post.call_args assert "env-ha:8123" in call_args[0][0] @pytest.mark.asyncio async def test_send_homeassistant_empty_config_accepted(self, service): """Verify HA provider works with empty config dict (no fields needed).""" mock_response = MagicMock() mock_response.status_code = 200 mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_db = AsyncMock() with ( patch.object(service, "_get_client", new_callable=AsyncMock) as mock_get_client, patch( "backend.app.api.routes.settings.get_homeassistant_settings", new_callable=AsyncMock, ) as mock_ha_settings, ): mock_get_client.return_value = mock_client mock_ha_settings.return_value = { "ha_url": "http://ha.local:8123", "ha_token": "token", "ha_enabled": True, } success, _ = await service._send_homeassistant({}, "Title", "Body", db=mock_db) assert success is True @pytest.mark.asyncio async def test_send_to_provider_dispatches_homeassistant(self, service): """Verify _send_to_provider dispatches to _send_homeassistant.""" provider = MagicMock() provider.provider_type = "homeassistant" provider.config = "{}" provider.quiet_hours_enabled = False with patch.object(service, "_send_homeassistant", new_callable=AsyncMock) as mock_send: mock_send.return_value = (True, "OK") success, _ = await service._send_to_provider(provider, "Title", "Message", db=AsyncMock()) assert success is True mock_send.assert_called_once() class TestNotificationVariableFallbacks: """Tests for notification variable fallback values.""" @pytest.fixture def service(self): return NotificationService() def test_format_duration_with_valid_seconds(self, service): """Verify duration formats correctly with valid input.""" result = service._format_duration(3661) # 1h 1m 1s assert "1h" in result def test_format_duration_with_none_returns_unknown(self, service): """CRITICAL: Verify None duration returns 'Unknown' fallback.""" result = service._format_duration(None) assert result == "Unknown" def test_format_duration_with_zero(self, service): """Verify zero duration formats correctly.""" result = service._format_duration(0) # Should return some valid string, not "Unknown" assert result is not None assert isinstance(result, str) def test_format_duration_hours_and_minutes(self, service): """Verify duration formats hours and minutes.""" result = service._format_duration(5400) # 1h 30m assert "1h" in result assert "30m" in result def test_format_duration_minutes_only(self, service): """Verify duration formats minutes only when < 1 hour.""" result = service._format_duration(1800) # 30m assert "30m" in result or "30" in result @pytest.mark.asyncio async def test_print_complete_fallback_values(self, service): """CRITICAL: Verify fallback values when archive_data is missing.""" mock_db = AsyncMock() with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [] # No providers, just testing variable setup mock_build.return_value = ("Test", "Test") await service.on_print_complete( printer_id=1, printer_name="Test", status="completed", data={"subtask_name": "test_print"}, db=mock_db, archive_data=None, # No archive data - should use fallbacks ) # Test passes if no exception is raised with missing archive_data @pytest.mark.asyncio async def test_print_complete_with_archive_data(self, service): """Verify archive data values are used when provided.""" mock_db = AsyncMock() captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [] await service.on_print_complete( printer_id=1, printer_name="Test", status="completed", data={"subtask_name": "test_print"}, db=mock_db, archive_data={ "print_time_seconds": 3600, "actual_filament_grams": 50.5, }, ) # When archive data is provided, duration should not be "Unknown" if captured_variables.get("duration"): assert captured_variables["duration"] != "Unknown" @pytest.mark.asyncio async def test_print_complete_with_finish_photo_url(self, service): """Verify finish_photo_url is passed through from archive_data.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [mock_provider] await service.on_print_complete( printer_id=1, printer_name="Test", status="completed", data={"subtask_name": "test_print"}, db=mock_db, archive_data={ "print_time_seconds": 3600, "actual_filament_grams": 50.5, "finish_photo_url": "http://localhost:8000/api/v1/archives/1/photos/finish_test.jpg", }, ) # finish_photo_url should be passed through to template variables assert ( captured_variables.get("finish_photo_url") == "http://localhost:8000/api/v1/archives/1/photos/finish_test.jpg" ) @pytest.mark.asyncio async def test_print_start_estimated_time_fallback(self, service): """Verify estimated time shows 'Unknown' when not available.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None), ): # Need at least one provider to trigger message building mock_get.return_value = [mock_provider] await service.on_print_start( printer_id=1, printer_name="Test", data={ "subtask_name": "test", # No estimated_time or mc_remaining_time }, db=mock_db, ) # When no time data, should show "Unknown" assert captured_variables.get("estimated_time") == "Unknown" @pytest.mark.asyncio async def test_print_progress_remaining_time_fallback(self, service): """Verify remaining time shows 'Unknown' when not available.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None), ): # Need at least one provider to trigger message building mock_get.return_value = [mock_provider] await service.on_print_progress( printer_id=1, printer_name="Test", progress=50, remaining_time=None, # No remaining time filename="test.3mf", db=mock_db, ) # When no remaining time, should show "Unknown" assert captured_variables.get("remaining_time") == "Unknown" @pytest.mark.asyncio async def test_filename_fallback_to_unknown(self, service): """Verify filename defaults to 'Unknown' when not provided.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): # Need at least one provider to trigger message building mock_get.return_value = [mock_provider] await service.on_print_complete( printer_id=1, printer_name="Test", status="completed", data={}, # No subtask_name or filename db=mock_db, ) # Filename should default to something (either "Unknown" or cleaned empty) assert "filename" in captured_variables @pytest.mark.asyncio async def test_print_start_uses_archive_print_time_seconds(self, service): """Verify print_time_seconds from archive_data is used for estimated_time.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None), ): mock_get.return_value = [mock_provider] # Pass archive_data with print_time_seconds (7200 seconds = 2 hours) await service.on_print_start( printer_id=1, printer_name="Test", data={"subtask_name": "test"}, db=mock_db, archive_data={"print_time_seconds": 7200}, ) # Should use archive's print_time_seconds: 7200 seconds = 2h 0m assert captured_variables.get("estimated_time") == "2h 0m" @pytest.mark.asyncio async def test_print_start_archive_data_overrides_mqtt(self, service): """Verify archive_data takes priority over MQTT remaining_time.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None), ): mock_get.return_value = [mock_provider] # Both archive_data and MQTT remaining_time provided # Archive says 2 hours, MQTT says 30 minutes (wrong at start) await service.on_print_start( printer_id=1, printer_name="Test", data={ "subtask_name": "test", "remaining_time": 1800, # 30 minutes from MQTT }, db=mock_db, archive_data={"print_time_seconds": 7200}, # 2 hours from 3MF ) # Should use archive's print_time_seconds (more reliable) assert captured_variables.get("estimated_time") == "2h 0m" @pytest.mark.asyncio async def test_print_start_falls_back_to_mqtt_when_no_archive(self, service): """Verify MQTT remaining_time is used when archive_data not provided.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None), ): mock_get.return_value = [mock_provider] # Only MQTT remaining_time provided (1800 seconds = 30 minutes) await service.on_print_start( printer_id=1, printer_name="Test", data={ "subtask_name": "test", "remaining_time": 1800, }, db=mock_db, # No archive_data ) # Should use MQTT remaining_time assert captured_variables.get("estimated_time") == "30m" @pytest.mark.asyncio async def test_print_start_eta_calculated_from_estimated_time(self, service): """Verify ETA is calculated as wall-clock time from estimated_time.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None), ): mock_get.return_value = [mock_provider] await service.on_print_start( printer_id=1, printer_name="Test", data={"subtask_name": "test"}, db=mock_db, archive_data={"print_time_seconds": 3600}, # 1 hour ) # ETA should be a time string in HH:MM format eta = captured_variables.get("eta") assert eta is not None assert eta != "Unknown" assert ":" in eta # HH:MM format @pytest.mark.asyncio async def test_print_start_eta_unknown_when_no_time(self, service): """Verify ETA shows 'Unknown' when no time data available.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None), ): mock_get.return_value = [mock_provider] await service.on_print_start( printer_id=1, printer_name="Test", data={"subtask_name": "test"}, db=mock_db, ) assert captured_variables.get("eta") == "Unknown" @pytest.mark.asyncio async def test_print_start_eta_respects_12h_format(self, service): """Verify ETA uses 12-hour format when time_format is '12h'.""" mock_db = AsyncMock() mock_provider = MagicMock() mock_provider.id = 1 captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), patch("backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value="12h"), ): mock_get.return_value = [mock_provider] await service.on_print_start( printer_id=1, printer_name="Test", data={"subtask_name": "test"}, db=mock_db, archive_data={"print_time_seconds": 3600}, ) eta = captured_variables.get("eta") assert eta is not None # 12h format should contain AM or PM assert "AM" in eta or "PM" in eta class TestNotificationTemplates: """Tests for notification message template rendering.""" @pytest.fixture def service(self): return NotificationService() @pytest.mark.asyncio async def test_template_renders_variables(self, service): """Verify template variables are replaced correctly.""" template_title = "Print {progress}% Complete" template_body = "{printer}: {filename}\nRemaining: {remaining_time}" variables = { "printer": "Test Printer", "filename": "test.3mf", "progress": "50", "remaining_time": "1h 30m", } title = template_title.format(**variables) body = template_body.format(**variables) assert title == "Print 50% Complete" assert "Test Printer" in body assert "test.3mf" in body assert "1h 30m" in body @pytest.mark.asyncio async def test_template_handles_missing_variables(self, service): """Verify missing template variables don't cause crashes.""" template = "{printer}: {unknown_var}" variables = {"printer": "Test"} # Should handle gracefully - either leave placeholder or skip try: result = template.format_map({**variables, "unknown_var": "{unknown_var}"}) assert "Test" in result except KeyError: pytest.fail("Template should handle missing variables gracefully") class TestPrinterErrorNotifications: """Tests for HMS error (printer error) notifications.""" @pytest.fixture def service(self): return NotificationService() @pytest.fixture def mock_provider(self): """Create a mock notification provider with error notifications enabled.""" provider = MagicMock() provider.id = 1 provider.name = "Test Provider" provider.provider_type = "webhook" provider.enabled = True provider.config = json.dumps({"webhook_url": "http://test.local/webhook"}) provider.on_printer_error = True # Enable error notifications provider.quiet_hours_enabled = False provider.daily_digest_enabled = False provider.printer_id = None return provider @pytest.fixture def mock_db(self): """Create a mock database session.""" db = AsyncMock() db.commit = AsyncMock() return db @pytest.mark.asyncio async def test_on_printer_error_sends_notification(self, service, mock_provider, mock_db): """Verify HMS error notification is sent when triggered.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Printer Error", "AMS/Filament Error: 0700_8010") await service.on_printer_error( printer_id=1, printer_name="Test Printer", error_type="AMS/Filament Error", db=mock_db, error_detail="Error code: 0700_8010", ) mock_get.assert_called_once() mock_send.assert_called_once() @pytest.mark.asyncio async def test_on_printer_error_skipped_when_disabled(self, service, mock_provider, mock_db): """CRITICAL: Verify error notifications respect toggle setting.""" mock_provider.on_printer_error = False with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, ): # Provider with toggle disabled won't be returned mock_get.return_value = [] await service.on_printer_error( printer_id=1, printer_name="Test", error_type="AMS Error", db=mock_db, error_detail="Test error", ) mock_send.assert_not_called() @pytest.mark.asyncio async def test_on_printer_error_includes_error_detail(self, service, mock_provider, mock_db): """Verify error details are passed to template variables.""" captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [mock_provider] await service.on_printer_error( printer_id=1, printer_name="X1 Carbon", error_type="AMS/Filament Error", db=mock_db, error_detail="Error code: 0700_8010", ) assert captured_variables["printer"] == "X1 Carbon" assert captured_variables["error_type"] == "AMS/Filament Error" assert captured_variables["error_detail"] == "Error code: 0700_8010" @pytest.mark.asyncio async def test_on_printer_error_fallback_when_no_detail(self, service, mock_provider, mock_db): """Verify fallback message when error_detail is None.""" captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [mock_provider] await service.on_printer_error( printer_id=1, printer_name="Test Printer", error_type="Unknown Error", db=mock_db, error_detail=None, # No detail provided ) assert captured_variables["error_detail"] == "No details available" class TestPlateNotEmptyNotifications: """Tests for plate not empty (build plate detection) notifications.""" @pytest.fixture def service(self): return NotificationService() @pytest.fixture def mock_provider(self): """Create a mock notification provider with plate detection enabled.""" provider = MagicMock() provider.id = 1 provider.name = "Test Provider" provider.provider_type = "webhook" provider.enabled = True provider.config = json.dumps({"webhook_url": "http://test.local/webhook"}) provider.on_plate_not_empty = True provider.quiet_hours_enabled = False provider.daily_digest_enabled = False provider.printer_id = None return provider @pytest.fixture def mock_db(self): """Create a mock database session.""" db = AsyncMock() db.commit = AsyncMock() return db @pytest.mark.asyncio async def test_on_plate_not_empty_sends_notification(self, service, mock_provider, mock_db): """Verify plate not empty notification is sent when triggered.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Plate Not Empty", "Objects detected on build plate") await service.on_plate_not_empty( printer_id=1, printer_name="Test Printer", db=mock_db, difference_percent=5.2, ) mock_get.assert_called_once() mock_send.assert_called_once() # Verify force_immediate is True (critical alert) call_kwargs = mock_send.call_args[1] assert call_kwargs.get("force_immediate") is True @pytest.mark.asyncio async def test_on_plate_not_empty_skipped_when_disabled(self, service, mock_provider, mock_db): """Verify notification is skipped when toggle is disabled.""" mock_provider.on_plate_not_empty = False with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, ): mock_get.return_value = [] await service.on_plate_not_empty( printer_id=1, printer_name="Test", db=mock_db, ) mock_send.assert_not_called() @pytest.mark.asyncio async def test_on_plate_not_empty_includes_difference_percent(self, service, mock_provider, mock_db): """Verify difference percentage is passed to template variables.""" captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [mock_provider] await service.on_plate_not_empty( printer_id=1, printer_name="X1 Carbon", db=mock_db, difference_percent=3.5, ) assert captured_variables["printer"] == "X1 Carbon" assert captured_variables["difference_percent"] == "3.5" class TestBedCooledNotifications: """Tests for bed cooled (after print) notifications.""" @pytest.fixture def service(self): return NotificationService() @pytest.fixture def mock_provider(self): """Create a mock notification provider with bed cooled enabled.""" provider = MagicMock() provider.id = 1 provider.name = "Test Provider" provider.provider_type = "webhook" provider.enabled = True provider.config = json.dumps({"webhook_url": "http://test.local/webhook"}) provider.on_bed_cooled = True provider.quiet_hours_enabled = False provider.daily_digest_enabled = False provider.printer_id = None return provider @pytest.fixture def mock_db(self): """Create a mock database session.""" db = AsyncMock() db.commit = AsyncMock() return db @pytest.mark.asyncio async def test_on_bed_cooled_sends_notification(self, service, mock_provider, mock_db): """Verify bed cooled notification is sent when triggered.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("Bed Cooled", "Test Printer: Bed cooled to 30°C") await service.on_bed_cooled( printer_id=1, printer_name="Test Printer", bed_temp=30.0, threshold=35.0, filename="benchy.3mf", db=mock_db, ) mock_get.assert_called_once() mock_send.assert_called_once() @pytest.mark.asyncio async def test_on_bed_cooled_skipped_when_no_providers(self, service, mock_db): """Verify notification is skipped when no providers have bed cooled enabled.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, ): mock_get.return_value = [] await service.on_bed_cooled( printer_id=1, printer_name="Test Printer", bed_temp=30.0, threshold=35.0, filename="benchy.3mf", db=mock_db, ) mock_send.assert_not_called() @pytest.mark.asyncio async def test_on_bed_cooled_includes_correct_variables(self, service, mock_provider, mock_db): """Verify bed temp, threshold, and filename are passed to template variables.""" captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [mock_provider] await service.on_bed_cooled( printer_id=1, printer_name="X1 Carbon", bed_temp=28.7, threshold=35.0, filename="benchy.gcode.3mf", db=mock_db, ) assert captured_variables["printer"] == "X1 Carbon" assert captured_variables["bed_temp"] == "29" assert captured_variables["threshold"] == "35" assert captured_variables["filename"] == "benchy" @pytest.mark.asyncio async def test_on_bed_cooled_handles_none_filename(self, service, mock_provider, mock_db): """Verify None filename is handled gracefully.""" captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [mock_provider] await service.on_bed_cooled( printer_id=1, printer_name="Test Printer", bed_temp=30.0, threshold=35.0, filename=None, db=mock_db, ) assert captured_variables["filename"] == "Unknown" class TestFirstLayerCompleteNotifications: """Tests for first layer complete notifications.""" @pytest.fixture def service(self): return NotificationService() @pytest.fixture def mock_provider(self): """Create a mock notification provider with first layer complete enabled.""" provider = MagicMock() provider.id = 1 provider.name = "Test Provider" provider.provider_type = "webhook" provider.enabled = True provider.config = json.dumps({"webhook_url": "http://test.local/webhook"}) provider.on_first_layer_complete = True provider.quiet_hours_enabled = False provider.daily_digest_enabled = False provider.printer_id = None return provider @pytest.fixture def mock_db(self): """Create a mock database session.""" db = AsyncMock() db.commit = AsyncMock() return db @pytest.mark.asyncio async def test_on_first_layer_complete_sends_notification(self, service, mock_provider, mock_db): """Verify first layer complete notification is sent when triggered.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("First Layer Complete", "Test Printer: benchy.3mf") await service.on_first_layer_complete( printer_id=1, printer_name="Test Printer", filename="benchy.3mf", total_layers=50, db=mock_db, ) mock_get.assert_called_once() mock_send.assert_called_once() @pytest.mark.asyncio async def test_on_first_layer_complete_skipped_when_no_providers(self, service, mock_db): """Verify notification is skipped when no providers have first layer complete enabled.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, ): mock_get.return_value = [] await service.on_first_layer_complete( printer_id=1, printer_name="Test Printer", filename="benchy.3mf", total_layers=50, db=mock_db, ) mock_send.assert_not_called() @pytest.mark.asyncio async def test_on_first_layer_complete_includes_correct_variables(self, service, mock_provider, mock_db): """Verify printer name, filename, and total_layers are passed to template variables.""" captured_variables = {} async def capture_build(db, event_type, variables): captured_variables.update(variables) return ("Test", "Test") with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock), patch.object(service, "_build_message_from_template", side_effect=capture_build), ): mock_get.return_value = [mock_provider] await service.on_first_layer_complete( printer_id=1, printer_name="X1 Carbon", filename="benchy.gcode.3mf", total_layers=120, db=mock_db, ) assert captured_variables["printer"] == "X1 Carbon" assert captured_variables["filename"] == "benchy" assert captured_variables["total_layers"] == "120" @pytest.mark.asyncio async def test_on_first_layer_complete_passes_image_data(self, service, mock_provider, mock_db): """Verify image_data is passed through to _send_to_providers.""" with ( patch.object(service, "_get_providers_for_event", new_callable=AsyncMock) as mock_get, patch.object(service, "_send_to_providers", new_callable=AsyncMock) as mock_send, patch.object(service, "_build_message_from_template", new_callable=AsyncMock) as mock_build, ): mock_get.return_value = [mock_provider] mock_build.return_value = ("First Layer Complete", "Test message") fake_image = b"\x89PNG\r\n\x1a\nfakeimage" await service.on_first_layer_complete( printer_id=1, printer_name="Test Printer", filename="benchy.3mf", total_layers=50, db=mock_db, image_data=fake_image, ) mock_send.assert_called_once() call_kwargs = mock_send.call_args assert call_kwargs.kwargs.get("image_data") == fake_image