test_spoolbuddy.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. """Integration tests for SpoolBuddy API endpoints."""
  2. from datetime import datetime, timedelta, timezone
  3. from unittest.mock import AsyncMock, MagicMock, patch
  4. import pytest
  5. from httpx import AsyncClient
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from backend.app.models.spool import Spool
  8. from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
  9. API = "/api/v1/spoolbuddy"
  10. @pytest.fixture
  11. def device_factory(db_session: AsyncSession):
  12. """Factory to create SpoolBuddyDevice records."""
  13. _counter = [0]
  14. async def _create(**kwargs):
  15. _counter[0] += 1
  16. n = _counter[0]
  17. defaults = {
  18. "device_id": f"sb-{n:04d}",
  19. "hostname": f"spoolbuddy-{n}",
  20. "ip_address": f"10.0.0.{n}",
  21. "firmware_version": "1.0.0",
  22. "has_nfc": True,
  23. "has_scale": True,
  24. "tare_offset": 0,
  25. "calibration_factor": 1.0,
  26. "last_seen": datetime.now(timezone.utc),
  27. }
  28. defaults.update(kwargs)
  29. device = SpoolBuddyDevice(**defaults)
  30. db_session.add(device)
  31. await db_session.commit()
  32. await db_session.refresh(device)
  33. return device
  34. return _create
  35. @pytest.fixture
  36. def spool_factory(db_session: AsyncSession):
  37. """Factory to create Spool records."""
  38. _counter = [0]
  39. async def _create(**kwargs):
  40. _counter[0] += 1
  41. defaults = {
  42. "material": "PLA",
  43. "subtype": "Basic",
  44. "brand": "Polymaker",
  45. "color_name": "Red",
  46. "rgba": "FF0000FF",
  47. "label_weight": 1000,
  48. "core_weight": 250,
  49. "weight_used": 0,
  50. }
  51. defaults.update(kwargs)
  52. spool = Spool(**defaults)
  53. db_session.add(spool)
  54. await db_session.commit()
  55. await db_session.refresh(spool)
  56. return spool
  57. return _create
  58. # ============================================================================
  59. # Device endpoints
  60. # ============================================================================
  61. class TestDeviceEndpoints:
  62. @pytest.mark.asyncio
  63. @pytest.mark.integration
  64. async def test_register_new_device(self, async_client: AsyncClient):
  65. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  66. mock_ws.broadcast = AsyncMock()
  67. resp = await async_client.post(
  68. f"{API}/devices/register",
  69. json={
  70. "device_id": "sb-new",
  71. "hostname": "spoolbuddy-new",
  72. "ip_address": "10.0.0.99",
  73. "firmware_version": "1.2.0",
  74. },
  75. )
  76. assert resp.status_code == 200
  77. data = resp.json()
  78. assert data["device_id"] == "sb-new"
  79. assert data["hostname"] == "spoolbuddy-new"
  80. assert data["online"] is True
  81. mock_ws.broadcast.assert_called_once()
  82. msg = mock_ws.broadcast.call_args[0][0]
  83. assert msg["type"] == "spoolbuddy_online"
  84. @pytest.mark.asyncio
  85. @pytest.mark.integration
  86. async def test_re_register_existing_device(self, async_client: AsyncClient, device_factory):
  87. device = await device_factory(
  88. device_id="sb-exist",
  89. tare_offset=12345,
  90. calibration_factor=0.0042,
  91. )
  92. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  93. mock_ws.broadcast = AsyncMock()
  94. resp = await async_client.post(
  95. f"{API}/devices/register",
  96. json={
  97. "device_id": "sb-exist",
  98. "hostname": "updated-host",
  99. "ip_address": "10.0.0.200",
  100. "firmware_version": "2.0.0",
  101. },
  102. )
  103. assert resp.status_code == 200
  104. data = resp.json()
  105. assert data["id"] == device.id
  106. assert data["hostname"] == "updated-host"
  107. assert data["ip_address"] == "10.0.0.200"
  108. assert data["firmware_version"] == "2.0.0"
  109. # Calibration preserved on re-register
  110. assert data["tare_offset"] == 12345
  111. assert data["calibration_factor"] == pytest.approx(0.0042)
  112. @pytest.mark.asyncio
  113. @pytest.mark.integration
  114. async def test_list_devices_empty(self, async_client: AsyncClient):
  115. resp = await async_client.get(f"{API}/devices")
  116. assert resp.status_code == 200
  117. assert resp.json() == []
  118. @pytest.mark.asyncio
  119. @pytest.mark.integration
  120. async def test_list_devices(self, async_client: AsyncClient, device_factory):
  121. await device_factory(device_id="sb-a", hostname="alpha")
  122. await device_factory(device_id="sb-b", hostname="beta")
  123. resp = await async_client.get(f"{API}/devices")
  124. assert resp.status_code == 200
  125. devices = resp.json()
  126. assert len(devices) == 2
  127. hostnames = {d["hostname"] for d in devices}
  128. assert hostnames == {"alpha", "beta"}
  129. @pytest.mark.asyncio
  130. @pytest.mark.integration
  131. async def test_heartbeat_updates_status(self, async_client: AsyncClient, device_factory):
  132. device = await device_factory(device_id="sb-hb")
  133. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  134. mock_ws.broadcast = AsyncMock()
  135. resp = await async_client.post(
  136. f"{API}/devices/sb-hb/heartbeat",
  137. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 600},
  138. )
  139. assert resp.status_code == 200
  140. data = resp.json()
  141. assert data["tare_offset"] == device.tare_offset
  142. assert data["calibration_factor"] == pytest.approx(device.calibration_factor)
  143. @pytest.mark.asyncio
  144. @pytest.mark.integration
  145. async def test_heartbeat_returns_pending_command(self, async_client: AsyncClient, device_factory):
  146. await device_factory(device_id="sb-cmd", pending_command="tare")
  147. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  148. mock_ws.broadcast = AsyncMock()
  149. resp = await async_client.post(
  150. f"{API}/devices/sb-cmd/heartbeat",
  151. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  152. )
  153. assert resp.status_code == 200
  154. assert resp.json()["pending_command"] == "tare"
  155. # Second heartbeat should have no pending command (cleared)
  156. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  157. mock_ws.broadcast = AsyncMock()
  158. resp2 = await async_client.post(
  159. f"{API}/devices/sb-cmd/heartbeat",
  160. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
  161. )
  162. assert resp2.json()["pending_command"] is None
  163. @pytest.mark.asyncio
  164. @pytest.mark.integration
  165. async def test_heartbeat_unknown_device_404(self, async_client: AsyncClient):
  166. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  167. mock_ws.broadcast = AsyncMock()
  168. resp = await async_client.post(
  169. f"{API}/devices/nonexistent/heartbeat",
  170. json={"nfc_ok": False, "scale_ok": False, "uptime_s": 0},
  171. )
  172. assert resp.status_code == 404
  173. @pytest.mark.asyncio
  174. @pytest.mark.integration
  175. async def test_heartbeat_broadcasts_online_when_was_offline(self, async_client: AsyncClient, device_factory):
  176. # Create device with last_seen far in the past (offline)
  177. await device_factory(
  178. device_id="sb-offline",
  179. last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
  180. )
  181. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  182. mock_ws.broadcast = AsyncMock()
  183. resp = await async_client.post(
  184. f"{API}/devices/sb-offline/heartbeat",
  185. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
  186. )
  187. assert resp.status_code == 200
  188. # Should broadcast online since device was offline
  189. mock_ws.broadcast.assert_called_once()
  190. msg = mock_ws.broadcast.call_args[0][0]
  191. assert msg["type"] == "spoolbuddy_online"
  192. assert msg["device_id"] == "sb-offline"
  193. # ============================================================================
  194. # NFC endpoints
  195. # ============================================================================
  196. class TestNfcEndpoints:
  197. @pytest.mark.asyncio
  198. @pytest.mark.integration
  199. async def test_tag_scanned_matched(self, async_client: AsyncClient, spool_factory):
  200. spool = await spool_factory(tag_uid="AABB1122", material="PLA")
  201. mock_spool = MagicMock()
  202. mock_spool.id = spool.id
  203. mock_spool.material = spool.material
  204. mock_spool.subtype = spool.subtype
  205. mock_spool.color_name = spool.color_name
  206. mock_spool.rgba = spool.rgba
  207. mock_spool.brand = spool.brand
  208. mock_spool.label_weight = spool.label_weight
  209. mock_spool.core_weight = spool.core_weight
  210. mock_spool.weight_used = spool.weight_used
  211. with (
  212. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  213. patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
  214. ):
  215. mock_ws.broadcast = AsyncMock()
  216. mock_lookup.return_value = mock_spool
  217. resp = await async_client.post(
  218. f"{API}/nfc/tag-scanned",
  219. json={"device_id": "sb-1", "tag_uid": "AABB1122"},
  220. )
  221. assert resp.status_code == 200
  222. data = resp.json()
  223. assert data["matched"] is True
  224. assert data["spool_id"] == spool.id
  225. msg = mock_ws.broadcast.call_args[0][0]
  226. assert msg["type"] == "spoolbuddy_tag_matched"
  227. assert msg["spool"]["id"] == spool.id
  228. @pytest.mark.asyncio
  229. @pytest.mark.integration
  230. async def test_tag_scanned_unmatched(self, async_client: AsyncClient):
  231. with (
  232. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  233. patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
  234. ):
  235. mock_ws.broadcast = AsyncMock()
  236. mock_lookup.return_value = None
  237. resp = await async_client.post(
  238. f"{API}/nfc/tag-scanned",
  239. json={"device_id": "sb-1", "tag_uid": "DEADBEEF"},
  240. )
  241. assert resp.status_code == 200
  242. data = resp.json()
  243. assert data["matched"] is False
  244. assert data["spool_id"] is None
  245. msg = mock_ws.broadcast.call_args[0][0]
  246. assert msg["type"] == "spoolbuddy_unknown_tag"
  247. @pytest.mark.asyncio
  248. @pytest.mark.integration
  249. async def test_tag_removed(self, async_client: AsyncClient):
  250. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  251. mock_ws.broadcast = AsyncMock()
  252. resp = await async_client.post(
  253. f"{API}/nfc/tag-removed",
  254. json={"device_id": "sb-1", "tag_uid": "AABB1122"},
  255. )
  256. assert resp.status_code == 200
  257. msg = mock_ws.broadcast.call_args[0][0]
  258. assert msg["type"] == "spoolbuddy_tag_removed"
  259. assert msg["device_id"] == "sb-1"
  260. assert msg["tag_uid"] == "AABB1122"
  261. # ============================================================================
  262. # Scale endpoints
  263. # ============================================================================
  264. class TestScaleEndpoints:
  265. @pytest.mark.asyncio
  266. @pytest.mark.integration
  267. async def test_scale_reading_broadcast(self, async_client: AsyncClient):
  268. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  269. mock_ws.broadcast = AsyncMock()
  270. resp = await async_client.post(
  271. f"{API}/scale/reading",
  272. json={
  273. "device_id": "sb-1",
  274. "weight_grams": 823.5,
  275. "stable": True,
  276. "raw_adc": 456789,
  277. },
  278. )
  279. assert resp.status_code == 200
  280. msg = mock_ws.broadcast.call_args[0][0]
  281. assert msg["type"] == "spoolbuddy_weight"
  282. assert msg["device_id"] == "sb-1"
  283. assert msg["weight_grams"] == 823.5
  284. assert msg["stable"] is True
  285. assert msg["raw_adc"] == 456789
  286. @pytest.mark.asyncio
  287. @pytest.mark.integration
  288. async def test_update_spool_weight_calculates_correctly(self, async_client: AsyncClient, spool_factory):
  289. # label=1000g, core=250g, scale reads 750g
  290. # net_filament = max(0, 750 - 250) = 500
  291. # weight_used = max(0, 1000 - 500) = 500
  292. spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
  293. resp = await async_client.post(
  294. f"{API}/scale/update-spool-weight",
  295. json={"spool_id": spool.id, "weight_grams": 750},
  296. )
  297. assert resp.status_code == 200
  298. data = resp.json()
  299. assert data["weight_used"] == 500
  300. @pytest.mark.asyncio
  301. @pytest.mark.integration
  302. async def test_update_spool_weight_full_spool(self, async_client: AsyncClient, spool_factory):
  303. # label=1000g, core=250g, scale reads 1250g (full spool)
  304. # net_filament = max(0, 1250 - 250) = 1000
  305. # weight_used = max(0, 1000 - 1000) = 0
  306. spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=200)
  307. resp = await async_client.post(
  308. f"{API}/scale/update-spool-weight",
  309. json={"spool_id": spool.id, "weight_grams": 1250},
  310. )
  311. assert resp.status_code == 200
  312. data = resp.json()
  313. assert data["weight_used"] == 0
  314. @pytest.mark.asyncio
  315. @pytest.mark.integration
  316. async def test_update_spool_weight_missing_spool_404(self, async_client: AsyncClient):
  317. resp = await async_client.post(
  318. f"{API}/scale/update-spool-weight",
  319. json={"spool_id": 99999, "weight_grams": 500},
  320. )
  321. assert resp.status_code == 404
  322. # ============================================================================
  323. # Calibration endpoints
  324. # ============================================================================
  325. class TestCalibrationEndpoints:
  326. @pytest.mark.asyncio
  327. @pytest.mark.integration
  328. async def test_tare_queues_command(self, async_client: AsyncClient, device_factory):
  329. await device_factory(device_id="sb-tare")
  330. resp = await async_client.post(f"{API}/devices/sb-tare/calibration/tare", json={})
  331. assert resp.status_code == 200
  332. assert resp.json()["status"] == "ok"
  333. # Verify pending_command via heartbeat
  334. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  335. mock_ws.broadcast = AsyncMock()
  336. hb = await async_client.post(
  337. f"{API}/devices/sb-tare/heartbeat",
  338. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 1},
  339. )
  340. assert hb.json()["pending_command"] == "tare"
  341. @pytest.mark.asyncio
  342. @pytest.mark.integration
  343. async def test_tare_unknown_device_404(self, async_client: AsyncClient):
  344. resp = await async_client.post(f"{API}/devices/ghost/calibration/tare", json={})
  345. assert resp.status_code == 404
  346. @pytest.mark.asyncio
  347. @pytest.mark.integration
  348. async def test_set_tare_offset(self, async_client: AsyncClient, device_factory):
  349. await device_factory(device_id="sb-st", calibration_factor=0.005)
  350. resp = await async_client.post(
  351. f"{API}/devices/sb-st/calibration/set-tare",
  352. json={"tare_offset": 54321},
  353. )
  354. assert resp.status_code == 200
  355. data = resp.json()
  356. assert data["tare_offset"] == 54321
  357. assert data["calibration_factor"] == pytest.approx(0.005)
  358. @pytest.mark.asyncio
  359. @pytest.mark.integration
  360. async def test_set_calibration_factor(self, async_client: AsyncClient, device_factory):
  361. # known_weight=200g, raw_adc=50000, tare=10000 → factor=200/(50000-10000)=0.005
  362. await device_factory(device_id="sb-cf", tare_offset=10000)
  363. resp = await async_client.post(
  364. f"{API}/devices/sb-cf/calibration/set-factor",
  365. json={"known_weight_grams": 200, "raw_adc": 50000},
  366. )
  367. assert resp.status_code == 200
  368. data = resp.json()
  369. assert data["calibration_factor"] == pytest.approx(0.005)
  370. assert data["tare_offset"] == 10000
  371. @pytest.mark.asyncio
  372. @pytest.mark.integration
  373. async def test_set_calibration_factor_zero_delta_400(self, async_client: AsyncClient, device_factory):
  374. # raw_adc == tare_offset → delta is 0 → 400 error
  375. await device_factory(device_id="sb-zero", tare_offset=5000)
  376. resp = await async_client.post(
  377. f"{API}/devices/sb-zero/calibration/set-factor",
  378. json={"known_weight_grams": 100, "raw_adc": 5000},
  379. )
  380. assert resp.status_code == 400
  381. @pytest.mark.asyncio
  382. @pytest.mark.integration
  383. async def test_get_calibration(self, async_client: AsyncClient, device_factory):
  384. await device_factory(
  385. device_id="sb-gcal",
  386. tare_offset=11111,
  387. calibration_factor=0.0042,
  388. )
  389. resp = await async_client.get(f"{API}/devices/sb-gcal/calibration")
  390. assert resp.status_code == 200
  391. data = resp.json()
  392. assert data["tare_offset"] == 11111
  393. assert data["calibration_factor"] == pytest.approx(0.0042)