"""Integration tests for SpoolBuddy API endpoints.""" from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from backend.app.models.spool import Spool from backend.app.models.spoolbuddy_device import SpoolBuddyDevice API = "/api/v1/spoolbuddy" @pytest.fixture def device_factory(db_session: AsyncSession): """Factory to create SpoolBuddyDevice records.""" _counter = [0] async def _create(**kwargs): _counter[0] += 1 n = _counter[0] defaults = { "device_id": f"sb-{n:04d}", "hostname": f"spoolbuddy-{n}", "ip_address": f"10.0.0.{n}", "firmware_version": "1.0.0", "has_nfc": True, "has_scale": True, "tare_offset": 0, "calibration_factor": 1.0, "last_seen": datetime.now(timezone.utc), } defaults.update(kwargs) device = SpoolBuddyDevice(**defaults) db_session.add(device) await db_session.commit() await db_session.refresh(device) return device return _create @pytest.fixture def spool_factory(db_session: AsyncSession): """Factory to create Spool records.""" _counter = [0] async def _create(**kwargs): _counter[0] += 1 defaults = { "material": "PLA", "subtype": "Basic", "brand": "Polymaker", "color_name": "Red", "rgba": "FF0000FF", "label_weight": 1000, "core_weight": 250, "weight_used": 0, } defaults.update(kwargs) spool = Spool(**defaults) db_session.add(spool) await db_session.commit() await db_session.refresh(spool) return spool return _create # ============================================================================ # Device endpoints # ============================================================================ class TestDeviceEndpoints: @pytest.mark.asyncio @pytest.mark.integration async def test_register_new_device(self, async_client: AsyncClient): with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/devices/register", json={ "device_id": "sb-new", "hostname": "spoolbuddy-new", "ip_address": "10.0.0.99", "firmware_version": "1.2.0", }, ) assert resp.status_code == 200 data = resp.json() assert data["device_id"] == "sb-new" assert data["hostname"] == "spoolbuddy-new" assert data["online"] is True mock_ws.broadcast.assert_called_once() msg = mock_ws.broadcast.call_args[0][0] assert msg["type"] == "spoolbuddy_online" @pytest.mark.asyncio @pytest.mark.integration async def test_re_register_existing_device(self, async_client: AsyncClient, device_factory): device = await device_factory( device_id="sb-exist", tare_offset=12345, calibration_factor=0.0042, ) with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/devices/register", json={ "device_id": "sb-exist", "hostname": "updated-host", "ip_address": "10.0.0.200", "firmware_version": "2.0.0", }, ) assert resp.status_code == 200 data = resp.json() assert data["id"] == device.id assert data["hostname"] == "updated-host" assert data["ip_address"] == "10.0.0.200" assert data["firmware_version"] == "2.0.0" # Calibration preserved on re-register assert data["tare_offset"] == 12345 assert data["calibration_factor"] == pytest.approx(0.0042) @pytest.mark.asyncio @pytest.mark.integration async def test_list_devices_empty(self, async_client: AsyncClient): resp = await async_client.get(f"{API}/devices") assert resp.status_code == 200 assert resp.json() == [] @pytest.mark.asyncio @pytest.mark.integration async def test_list_devices(self, async_client: AsyncClient, device_factory): await device_factory(device_id="sb-a", hostname="alpha") await device_factory(device_id="sb-b", hostname="beta") resp = await async_client.get(f"{API}/devices") assert resp.status_code == 200 devices = resp.json() assert len(devices) == 2 hostnames = {d["hostname"] for d in devices} assert hostnames == {"alpha", "beta"} @pytest.mark.asyncio @pytest.mark.integration async def test_heartbeat_updates_status(self, async_client: AsyncClient, device_factory): device = await device_factory(device_id="sb-hb") with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/devices/sb-hb/heartbeat", json={"nfc_ok": True, "scale_ok": True, "uptime_s": 600}, ) assert resp.status_code == 200 data = resp.json() assert data["tare_offset"] == device.tare_offset assert data["calibration_factor"] == pytest.approx(device.calibration_factor) @pytest.mark.asyncio @pytest.mark.integration async def test_heartbeat_returns_pending_command(self, async_client: AsyncClient, device_factory): await device_factory(device_id="sb-cmd", pending_command="tare") with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/devices/sb-cmd/heartbeat", json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10}, ) assert resp.status_code == 200 assert resp.json()["pending_command"] == "tare" # Second heartbeat should have no pending command (cleared) with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp2 = await async_client.post( f"{API}/devices/sb-cmd/heartbeat", json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20}, ) assert resp2.json()["pending_command"] is None @pytest.mark.asyncio @pytest.mark.integration async def test_heartbeat_unknown_device_404(self, async_client: AsyncClient): with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/devices/nonexistent/heartbeat", json={"nfc_ok": False, "scale_ok": False, "uptime_s": 0}, ) assert resp.status_code == 404 @pytest.mark.asyncio @pytest.mark.integration async def test_heartbeat_broadcasts_online_when_was_offline(self, async_client: AsyncClient, device_factory): # Create device with last_seen far in the past (offline) await device_factory( device_id="sb-offline", last_seen=datetime.now(timezone.utc) - timedelta(seconds=120), ) with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/devices/sb-offline/heartbeat", json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5}, ) assert resp.status_code == 200 # Should broadcast online since device was offline mock_ws.broadcast.assert_called_once() msg = mock_ws.broadcast.call_args[0][0] assert msg["type"] == "spoolbuddy_online" assert msg["device_id"] == "sb-offline" # ============================================================================ # NFC endpoints # ============================================================================ class TestNfcEndpoints: @pytest.mark.asyncio @pytest.mark.integration async def test_tag_scanned_matched(self, async_client: AsyncClient, spool_factory): spool = await spool_factory(tag_uid="AABB1122", material="PLA") mock_spool = MagicMock() mock_spool.id = spool.id mock_spool.material = spool.material mock_spool.subtype = spool.subtype mock_spool.color_name = spool.color_name mock_spool.rgba = spool.rgba mock_spool.brand = spool.brand mock_spool.label_weight = spool.label_weight mock_spool.core_weight = spool.core_weight mock_spool.weight_used = spool.weight_used with ( patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws, patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup, ): mock_ws.broadcast = AsyncMock() mock_lookup.return_value = mock_spool resp = await async_client.post( f"{API}/nfc/tag-scanned", json={"device_id": "sb-1", "tag_uid": "AABB1122"}, ) assert resp.status_code == 200 data = resp.json() assert data["matched"] is True assert data["spool_id"] == spool.id msg = mock_ws.broadcast.call_args[0][0] assert msg["type"] == "spoolbuddy_tag_matched" assert msg["spool"]["id"] == spool.id @pytest.mark.asyncio @pytest.mark.integration async def test_tag_scanned_unmatched(self, async_client: AsyncClient): with ( patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws, patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup, ): mock_ws.broadcast = AsyncMock() mock_lookup.return_value = None resp = await async_client.post( f"{API}/nfc/tag-scanned", json={"device_id": "sb-1", "tag_uid": "DEADBEEF"}, ) assert resp.status_code == 200 data = resp.json() assert data["matched"] is False assert data["spool_id"] is None msg = mock_ws.broadcast.call_args[0][0] assert msg["type"] == "spoolbuddy_unknown_tag" @pytest.mark.asyncio @pytest.mark.integration async def test_tag_removed(self, async_client: AsyncClient): with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/nfc/tag-removed", json={"device_id": "sb-1", "tag_uid": "AABB1122"}, ) assert resp.status_code == 200 msg = mock_ws.broadcast.call_args[0][0] assert msg["type"] == "spoolbuddy_tag_removed" assert msg["device_id"] == "sb-1" assert msg["tag_uid"] == "AABB1122" # ============================================================================ # Scale endpoints # ============================================================================ class TestScaleEndpoints: @pytest.mark.asyncio @pytest.mark.integration async def test_scale_reading_broadcast(self, async_client: AsyncClient): with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() resp = await async_client.post( f"{API}/scale/reading", json={ "device_id": "sb-1", "weight_grams": 823.5, "stable": True, "raw_adc": 456789, }, ) assert resp.status_code == 200 msg = mock_ws.broadcast.call_args[0][0] assert msg["type"] == "spoolbuddy_weight" assert msg["device_id"] == "sb-1" assert msg["weight_grams"] == 823.5 assert msg["stable"] is True assert msg["raw_adc"] == 456789 @pytest.mark.asyncio @pytest.mark.integration async def test_update_spool_weight_calculates_correctly(self, async_client: AsyncClient, spool_factory): # label=1000g, core=250g, scale reads 750g # net_filament = max(0, 750 - 250) = 500 # weight_used = max(0, 1000 - 500) = 500 spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0) resp = await async_client.post( f"{API}/scale/update-spool-weight", json={"spool_id": spool.id, "weight_grams": 750}, ) assert resp.status_code == 200 data = resp.json() assert data["weight_used"] == 500 @pytest.mark.asyncio @pytest.mark.integration async def test_update_spool_weight_full_spool(self, async_client: AsyncClient, spool_factory): # label=1000g, core=250g, scale reads 1250g (full spool) # net_filament = max(0, 1250 - 250) = 1000 # weight_used = max(0, 1000 - 1000) = 0 spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=200) resp = await async_client.post( f"{API}/scale/update-spool-weight", json={"spool_id": spool.id, "weight_grams": 1250}, ) assert resp.status_code == 200 data = resp.json() assert data["weight_used"] == 0 @pytest.mark.asyncio @pytest.mark.integration async def test_update_spool_weight_missing_spool_404(self, async_client: AsyncClient): resp = await async_client.post( f"{API}/scale/update-spool-weight", json={"spool_id": 99999, "weight_grams": 500}, ) assert resp.status_code == 404 # ============================================================================ # Calibration endpoints # ============================================================================ class TestCalibrationEndpoints: @pytest.mark.asyncio @pytest.mark.integration async def test_tare_queues_command(self, async_client: AsyncClient, device_factory): await device_factory(device_id="sb-tare") resp = await async_client.post(f"{API}/devices/sb-tare/calibration/tare", json={}) assert resp.status_code == 200 assert resp.json()["status"] == "ok" # Verify pending_command via heartbeat with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws: mock_ws.broadcast = AsyncMock() hb = await async_client.post( f"{API}/devices/sb-tare/heartbeat", json={"nfc_ok": True, "scale_ok": True, "uptime_s": 1}, ) assert hb.json()["pending_command"] == "tare" @pytest.mark.asyncio @pytest.mark.integration async def test_tare_unknown_device_404(self, async_client: AsyncClient): resp = await async_client.post(f"{API}/devices/ghost/calibration/tare", json={}) assert resp.status_code == 404 @pytest.mark.asyncio @pytest.mark.integration async def test_set_tare_offset(self, async_client: AsyncClient, device_factory): await device_factory(device_id="sb-st", calibration_factor=0.005) resp = await async_client.post( f"{API}/devices/sb-st/calibration/set-tare", json={"tare_offset": 54321}, ) assert resp.status_code == 200 data = resp.json() assert data["tare_offset"] == 54321 assert data["calibration_factor"] == pytest.approx(0.005) @pytest.mark.asyncio @pytest.mark.integration async def test_set_calibration_factor(self, async_client: AsyncClient, device_factory): # known_weight=200g, raw_adc=50000, tare=10000 → factor=200/(50000-10000)=0.005 await device_factory(device_id="sb-cf", tare_offset=10000) resp = await async_client.post( f"{API}/devices/sb-cf/calibration/set-factor", json={"known_weight_grams": 200, "raw_adc": 50000}, ) assert resp.status_code == 200 data = resp.json() assert data["calibration_factor"] == pytest.approx(0.005) assert data["tare_offset"] == 10000 @pytest.mark.asyncio @pytest.mark.integration async def test_set_calibration_factor_zero_delta_400(self, async_client: AsyncClient, device_factory): # raw_adc == tare_offset → delta is 0 → 400 error await device_factory(device_id="sb-zero", tare_offset=5000) resp = await async_client.post( f"{API}/devices/sb-zero/calibration/set-factor", json={"known_weight_grams": 100, "raw_adc": 5000}, ) assert resp.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_get_calibration(self, async_client: AsyncClient, device_factory): await device_factory( device_id="sb-gcal", tare_offset=11111, calibration_factor=0.0042, ) resp = await async_client.get(f"{API}/devices/sb-gcal/calibration") assert resp.status_code == 200 data = resp.json() assert data["tare_offset"] == 11111 assert data["calibration_factor"] == pytest.approx(0.0042)