| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166 |
- """Integration tests for SpoolBuddy API endpoints."""
- from datetime import datetime, timedelta, timezone
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from httpx import AsyncClient
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.api.routes import spoolbuddy as spoolbuddy_routes
- from backend.app.models.spool import Spool
- from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
- API = "/api/v1/spoolbuddy"
- @pytest.fixture
- def device_factory(db_session: AsyncSession):
- """Factory to create SpoolBuddyDevice records."""
- _counter = [0]
- async def _create(**kwargs):
- _counter[0] += 1
- n = _counter[0]
- defaults = {
- "device_id": f"sb-{n:04d}",
- "hostname": f"spoolbuddy-{n}",
- "ip_address": f"10.0.0.{n}",
- "firmware_version": "1.0.0",
- "has_nfc": True,
- "has_scale": True,
- "tare_offset": 0,
- "calibration_factor": 1.0,
- "last_seen": datetime.now(timezone.utc),
- }
- defaults.update(kwargs)
- device = SpoolBuddyDevice(**defaults)
- db_session.add(device)
- await db_session.commit()
- await db_session.refresh(device)
- return device
- return _create
- @pytest.fixture
- def spool_factory(db_session: AsyncSession):
- """Factory to create Spool records."""
- _counter = [0]
- async def _create(**kwargs):
- _counter[0] += 1
- defaults = {
- "material": "PLA",
- "subtype": "Basic",
- "brand": "Polymaker",
- "color_name": "Red",
- "rgba": "FF0000FF",
- "label_weight": 1000,
- "core_weight": 250,
- "weight_used": 0,
- }
- defaults.update(kwargs)
- spool = Spool(**defaults)
- db_session.add(spool)
- await db_session.commit()
- await db_session.refresh(spool)
- return spool
- return _create
- # ============================================================================
- # Device endpoints
- # ============================================================================
- class TestDeviceEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_register_new_device(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/register",
- json={
- "device_id": "sb-new",
- "hostname": "spoolbuddy-new",
- "ip_address": "10.0.0.99",
- "firmware_version": "1.2.0",
- },
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["device_id"] == "sb-new"
- assert data["hostname"] == "spoolbuddy-new"
- assert data["online"] is True
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_re_register_existing_device(self, async_client: AsyncClient, device_factory):
- device = await device_factory(
- device_id="sb-exist",
- tare_offset=12345,
- calibration_factor=0.0042,
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/register",
- json={
- "device_id": "sb-exist",
- "hostname": "updated-host",
- "ip_address": "10.0.0.200",
- "firmware_version": "2.0.0",
- },
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["id"] == device.id
- assert data["hostname"] == "updated-host"
- assert data["ip_address"] == "10.0.0.200"
- assert data["firmware_version"] == "2.0.0"
- # Calibration preserved on re-register
- assert data["tare_offset"] == 12345
- assert data["calibration_factor"] == pytest.approx(0.0042)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_devices_empty(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- assert resp.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_devices(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-a", hostname="alpha")
- await device_factory(device_id="sb-b", hostname="beta")
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- devices = resp.json()
- assert len(devices) == 2
- hostnames = {d["hostname"] for d in devices}
- assert hostnames == {"alpha", "beta"}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_updates_status(self, async_client: AsyncClient, device_factory):
- device = await device_factory(device_id="sb-hb")
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 600},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == device.tare_offset
- assert data["calibration_factor"] == pytest.approx(device.calibration_factor)
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-hb"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_returns_pending_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-cmd", pending_command="tare")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-cmd/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert resp.status_code == 200
- assert resp.json()["pending_command"] == "tare"
- # Second heartbeat should have no pending command (cleared)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp2 = await async_client.post(
- f"{API}/devices/sb-cmd/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
- )
- assert resp2.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_unknown_device_404(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/nonexistent/heartbeat",
- json={"nfc_ok": False, "scale_ok": False, "uptime_s": 0},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_broadcasts_online_when_was_offline(self, async_client: AsyncClient, device_factory):
- # Create device with last_seen far in the past (offline)
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-offline",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-offline/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
- )
- assert resp.status_code == 200
- # Should broadcast online since device was offline
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-offline"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_broadcasts_online_when_already_online(self, async_client: AsyncClient, device_factory):
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-already-online",
- last_seen=datetime.now(timezone.utc),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-already-online/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 42},
- )
- assert resp.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-already-online"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_online_broadcast_is_throttled(self, async_client: AsyncClient, device_factory):
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-throttle",
- last_seen=datetime.now(timezone.utc),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp1 = await async_client.post(
- f"{API}/devices/sb-throttle/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- resp2 = await async_client.post(
- f"{API}/devices/sb-throttle/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 11},
- )
- assert resp1.status_code == 200
- assert resp2.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-throttle"
- # ============================================================================
- # NFC endpoints
- # ============================================================================
- class TestNfcEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_scanned_matched(self, async_client: AsyncClient, spool_factory):
- spool = await spool_factory(tag_uid="AABB1122", material="PLA")
- mock_spool = MagicMock()
- mock_spool.id = spool.id
- mock_spool.material = spool.material
- mock_spool.subtype = spool.subtype
- mock_spool.color_name = spool.color_name
- mock_spool.rgba = spool.rgba
- mock_spool.brand = spool.brand
- mock_spool.label_weight = spool.label_weight
- mock_spool.core_weight = spool.core_weight
- mock_spool.weight_used = spool.weight_used
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_lookup.return_value = mock_spool
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- assert data["spool_id"] == spool.id
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_matched"
- assert msg["spool"]["id"] == spool.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_scanned_unmatched(self, async_client: AsyncClient):
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_lookup.return_value = None
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "DEADBEEF"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
- assert data["spool_id"] is None
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_unknown_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_removed(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-removed",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_removed"
- assert msg["device_id"] == "sb-1"
- assert msg["tag_uid"] == "AABB1122"
- # ============================================================================
- # NFC write-tag endpoints
- # ============================================================================
- class TestWriteTagEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_queues_command(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-wt")
- spool = await spool_factory(material="PLA", brand="Polymaker", color_name="Red", rgba="FF0000FF")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- # Verify heartbeat returns write_tag command with payload
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- hb_data = hb.json()
- assert hb_data["pending_command"] == "write_tag"
- assert hb_data["pending_write_payload"] is not None
- assert hb_data["pending_write_payload"]["spool_id"] == spool.id
- assert "ndef_data_hex" in hb_data["pending_write_payload"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_heartbeat_not_cleared(self, async_client: AsyncClient, device_factory, spool_factory):
- """write_tag command persists across heartbeats until write-result clears it."""
- device = await device_factory(device_id="sb-wt-persist")
- spool = await spool_factory(material="PETG")
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- # First heartbeat — command present
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb1 = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb1.json()["pending_command"] == "write_tag"
- # Second heartbeat — should still be present (not cleared like tare)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb2 = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
- )
- assert hb2.json()["pending_command"] == "write_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_missing_spool_404(self, async_client: AsyncClient, device_factory):
- device = await device_factory(device_id="sb-wt-nospool")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": 99999},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_missing_device_404(self, async_client: AsyncClient, spool_factory):
- spool = await spool_factory()
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "nonexistent", "spool_id": spool.id},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_success_links_tag(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-wr", pending_command="write_tag")
- spool = await spool_factory(material="PLA", tag_uid=None)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "04AABB11223344",
- "success": True,
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_written"
- assert msg["spool_id"] == spool.id
- assert msg["tag_uid"] == "04AABB11223344"
- # Verify spool got tag linked
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- spool_data = spool_resp.json()
- assert spool_data["tag_uid"] == "04AABB11223344"
- assert spool_data["tag_type"] == "ntag"
- assert spool_data["data_origin"] == "opentag3d"
- assert spool_data["encode_time"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_failure_broadcasts_error(
- self, async_client: AsyncClient, device_factory, spool_factory
- ):
- device = await device_factory(device_id="sb-wr-fail", pending_command="write_tag")
- spool = await spool_factory(material="PLA", tag_uid=None)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "04AABB",
- "success": False,
- "message": "Write or verification failed",
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_write_failed"
- assert msg["message"] == "Write or verification failed"
- # Verify spool NOT linked
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- assert spool_resp.json()["tag_uid"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_clears_pending_command(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(
- device_id="sb-wr-clear",
- pending_command="write_tag",
- pending_write_payload='{"spool_id": 1, "ndef_data_hex": "E110120003"}',
- )
- spool = await spool_factory()
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "AABB",
- "success": True,
- },
- )
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 30},
- )
- assert hb.json()["pending_command"] is None
- assert hb.json()["pending_write_payload"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_cancel_write(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-cancel")
- spool = await spool_factory()
- # Queue a write
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- # Cancel it
- resp = await async_client.post(f"{API}/devices/{device.device_id}/cancel-write", json={})
- assert resp.status_code == 200
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_cancel_write_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/cancel-write", json={})
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_ndef_data_is_valid(self, async_client: AsyncClient, device_factory, spool_factory):
- """Verify the NDEF data in the heartbeat is a valid OpenTag3D message."""
- device = await device_factory(device_id="sb-wt-ndef")
- spool = await spool_factory(
- material="PLA",
- brand="Polymaker",
- color_name="White",
- rgba="FFFFFFFF",
- label_weight=1000,
- )
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- payload = hb.json()["pending_write_payload"]
- ndef_bytes = bytes.fromhex(payload["ndef_data_hex"])
- # CC bytes
- assert ndef_bytes[:4] == bytes([0xE1, 0x10, 0x12, 0x00])
- # TLV type
- assert ndef_bytes[4] == 0x03
- # NDEF record: TNF=MIME, type=application/opentag3d
- assert ndef_bytes[6] == 0xD2
- assert ndef_bytes[9:30] == b"application/opentag3d"
- # Terminator
- assert ndef_bytes[-1] == 0xFE
- # Total size fits NTAG213
- assert len(ndef_bytes) <= 144
- # ============================================================================
- # Scale endpoints
- # ============================================================================
- class TestScaleEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_scale_reading_broadcast(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/scale/reading",
- json={
- "device_id": "sb-1",
- "weight_grams": 823.5,
- "stable": True,
- "raw_adc": 456789,
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_weight"
- assert msg["device_id"] == "sb-1"
- assert msg["weight_grams"] == 823.5
- assert msg["stable"] is True
- assert msg["raw_adc"] == 456789
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_calculates_correctly(self, async_client: AsyncClient, spool_factory):
- # label=1000g, core=250g, scale reads 750g
- # net_filament = max(0, 750 - 250) = 500
- # weight_used = max(0, 1000 - 500) = 500
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["weight_used"] == 500
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_full_spool(self, async_client: AsyncClient, spool_factory):
- # label=1000g, core=250g, scale reads 1250g (full spool)
- # net_filament = max(0, 1250 - 250) = 1000
- # weight_used = max(0, 1000 - 1000) = 0
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=200)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 1250},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["weight_used"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_stores_scale_reading(self, async_client: AsyncClient, spool_factory):
- """Verify last_scale_weight and last_weighed_at are stored after weight sync."""
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # Fetch the spool via inventory API to verify stored fields
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- assert spool_resp.status_code == 200
- spool_data = spool_resp.json()
- assert spool_data["last_scale_weight"] == 750
- assert spool_data["last_weighed_at"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_missing_spool_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 99999, "weight_grams": 500},
- )
- assert resp.status_code == 404
- # ============================================================================
- # Calibration endpoints
- # ============================================================================
- class TestCalibrationEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tare_queues_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-tare")
- resp = await async_client.post(f"{API}/devices/sb-tare/calibration/tare", json={})
- assert resp.status_code == 200
- assert resp.json()["status"] == "ok"
- # Verify pending_command via heartbeat
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-tare/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 1},
- )
- assert hb.json()["pending_command"] == "tare"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tare_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/calibration/tare", json={})
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_tare_offset(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-st", calibration_factor=0.005)
- resp = await async_client.post(
- f"{API}/devices/sb-st/calibration/set-tare",
- json={"tare_offset": 54321},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == 54321
- assert data["calibration_factor"] == pytest.approx(0.005)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_calibration_factor(self, async_client: AsyncClient, device_factory):
- # known_weight=200g, raw_adc=50000, tare=10000 → factor=200/(50000-10000)=0.005
- await device_factory(device_id="sb-cf", tare_offset=10000)
- resp = await async_client.post(
- f"{API}/devices/sb-cf/calibration/set-factor",
- json={"known_weight_grams": 200, "raw_adc": 50000},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["calibration_factor"] == pytest.approx(0.005)
- assert data["tare_offset"] == 10000
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_calibration_factor_zero_delta_400(self, async_client: AsyncClient, device_factory):
- # raw_adc == tare_offset → delta is 0 → 400 error
- await device_factory(device_id="sb-zero", tare_offset=5000)
- resp = await async_client.post(
- f"{API}/devices/sb-zero/calibration/set-factor",
- json={"known_weight_grams": 100, "raw_adc": 5000},
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_calibration(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-gcal",
- tare_offset=11111,
- calibration_factor=0.0042,
- )
- resp = await async_client.get(f"{API}/devices/sb-gcal/calibration")
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == 11111
- assert data["calibration_factor"] == pytest.approx(0.0042)
- # ============================================================================
- # Display endpoints
- # ============================================================================
- class TestDisplayEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_settings(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp", display_brightness=100, display_blank_timeout=0)
- resp = await async_client.put(
- f"{API}/devices/sb-disp/display",
- json={"brightness": 75, "blank_timeout": 300},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["brightness"] == 75
- assert data["blank_timeout"] == 300
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_persists_via_heartbeat(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp-hb")
- await async_client.put(
- f"{API}/devices/sb-disp-hb/display",
- json={"brightness": 50, "blank_timeout": 600},
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-disp-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["display_brightness"] == 50
- assert hb.json()["display_blank_timeout"] == 600
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.put(
- f"{API}/devices/ghost/display",
- json={"brightness": 50, "blank_timeout": 60},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_validates_brightness(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp-val")
- resp = await async_client.put(
- f"{API}/devices/sb-disp-val/display",
- json={"brightness": 150, "blank_timeout": 0},
- )
- assert resp.status_code == 422 # Validation error: brightness > 100
- # ============================================================================
- # Update endpoints
- # ============================================================================
- class TestUpdateEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_starts_ssh_update(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(f"{API}/devices/sb-upd/update")
- assert resp.status_code == 200
- assert resp.json()["status"] == "ok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_offline_device_409(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-upd-off",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- resp = await async_client.post(f"{API}/devices/sb-upd-off/update")
- assert resp.status_code == 409
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/update")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_already_updating(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-dup", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(f"{API}/devices/sb-upd-dup/update")
- assert resp.status_code == 200
- assert resp.json()["status"] == "already_updating"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_updating(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-st", pending_command="update", update_status="pending")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-upd-st/update-status",
- json={"status": "updating", "message": "Fetching latest code..."},
- )
- assert resp.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_update"
- assert msg["update_status"] == "updating"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_complete_clears_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-done", pending_command="update", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- await async_client.post(
- f"{API}/devices/sb-upd-done/update-status",
- json={"status": "complete", "message": "Update complete, restarting..."},
- )
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-upd-done/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_error(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-err", pending_command="update", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-upd-err/update-status",
- json={"status": "error", "message": "git fetch failed: network unreachable"},
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["update_status"] == "error"
- assert "git fetch failed" in msg["update_message"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/devices/ghost/update-status",
- json={"status": "updating", "message": "test"},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_device_response_includes_update_fields(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-resp", update_status="complete", update_message="Done!")
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- device = next(d for d in resp.json() if d["device_id"] == "sb-upd-resp")
- assert device["update_status"] == "complete"
- assert device["update_message"] == "Done!"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_returns_version_info(self, async_client: AsyncClient, device_factory):
- """GET /devices/{id}/update-check compares device version against APP_VERSION."""
- await device_factory(device_id="sb-uc", firmware_version="0.1.0")
- resp = await async_client.get(f"{API}/devices/sb-uc/update-check")
- assert resp.status_code == 200
- data = resp.json()
- assert data["current_version"] == "0.1.0"
- assert data["latest_version"] is not None
- assert data["update_available"] is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_up_to_date(self, async_client: AsyncClient, device_factory):
- from backend.app.core.config import APP_VERSION
- await device_factory(device_id="sb-uc2", firmware_version=APP_VERSION)
- resp = await async_client.get(f"{API}/devices/sb-uc2/update-check")
- assert resp.status_code == 200
- assert resp.json()["update_available"] is False
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices/ghost/update-check")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_broadcasts_websocket(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-ws")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
- ):
- mock_ws.broadcast = AsyncMock()
- await async_client.post(f"{API}/devices/sb-upd-ws/update")
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_update"
- assert msg["device_id"] == "sb-upd-ws"
- assert msg["update_status"] == "pending"
- # ============================================================================
- # System command endpoints
- # ============================================================================
- class TestSystemCommandEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_reboot(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-reboot")
- resp = await async_client.post(
- f"{API}/devices/sb-reboot/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["status"] == "queued"
- assert data["command"] == "reboot"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_shutdown(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-shutdown")
- resp = await async_client.post(
- f"{API}/devices/sb-shutdown/system/command",
- json={"command": "shutdown"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "shutdown"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_restart_daemon(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-rd")
- resp = await async_client.post(
- f"{API}/devices/sb-rd/system/command",
- json={"command": "restart_daemon"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "restart_daemon"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_restart_browser(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-rb")
- resp = await async_client.post(
- f"{API}/devices/sb-rb/system/command",
- json={"command": "restart_browser"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "restart_browser"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_command_rejected(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-invalid")
- resp = await async_client.post(
- f"{API}/devices/sb-invalid/system/command",
- json={"command": "format_disk"},
- )
- assert resp.status_code == 400
- assert "Invalid command" in resp.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/devices/ghost/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_offline_device_409(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-offline-cmd",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- resp = await async_client.post(
- f"{API}/devices/sb-offline-cmd/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 409
- assert "offline" in resp.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_sets_pending_command(self, async_client: AsyncClient, device_factory, db_session):
- device = await device_factory(device_id="sb-pending")
- await async_client.post(
- f"{API}/devices/sb-pending/system/command",
- json={"command": "restart_daemon"},
- )
- await db_session.refresh(device)
- assert device.pending_command == "restart_daemon"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_clears_system_command(self, async_client: AsyncClient, device_factory):
- """System commands (reboot/shutdown/restart_*) are fire-and-forget — heartbeat clears them."""
- await device_factory(device_id="sb-hb-clear")
- # Queue a command
- await async_client.post(
- f"{API}/devices/sb-hb-clear/system/command",
- json={"command": "restart_browser"},
- )
- # Heartbeat should return the command and clear it
- resp = await async_client.post(
- f"{API}/devices/sb-hb-clear/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 100},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["pending_command"] == "restart_browser"
|