test_spoolbuddy.py 96 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441
  1. """Integration tests for SpoolBuddy API endpoints."""
  2. from datetime import datetime, timedelta, timezone
  3. from unittest.mock import AsyncMock, MagicMock, patch
  4. import pytest
  5. from httpx import AsyncClient
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. import backend.app.services.spoolbuddy_ssh # noqa: F401 — ensures patch() can resolve the dotted path
  8. from backend.app.api.routes import spoolbuddy as spoolbuddy_routes
  9. from backend.app.models.spool import Spool
  10. from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
  11. from backend.app.services.spoolman import SpoolmanNotFoundError, SpoolmanUnavailableError
  12. API = "/api/v1/spoolbuddy"
  13. @pytest.fixture
  14. def device_factory(db_session: AsyncSession):
  15. """Factory to create SpoolBuddyDevice records."""
  16. _counter = [0]
  17. async def _create(**kwargs):
  18. _counter[0] += 1
  19. n = _counter[0]
  20. defaults = {
  21. "device_id": f"sb-{n:04d}",
  22. "hostname": f"spoolbuddy-{n}",
  23. "ip_address": f"10.0.0.{n}",
  24. "firmware_version": "1.0.0",
  25. "has_nfc": True,
  26. "has_scale": True,
  27. "tare_offset": 0,
  28. "calibration_factor": 1.0,
  29. "last_seen": datetime.now(timezone.utc),
  30. }
  31. defaults.update(kwargs)
  32. device = SpoolBuddyDevice(**defaults)
  33. db_session.add(device)
  34. await db_session.commit()
  35. await db_session.refresh(device)
  36. return device
  37. return _create
  38. @pytest.fixture
  39. def spool_factory(db_session: AsyncSession):
  40. """Factory to create Spool records."""
  41. _counter = [0]
  42. async def _create(**kwargs):
  43. _counter[0] += 1
  44. defaults = {
  45. "material": "PLA",
  46. "subtype": "Basic",
  47. "brand": "Polymaker",
  48. "color_name": "Red",
  49. "rgba": "FF0000FF",
  50. "label_weight": 1000,
  51. "core_weight": 250,
  52. "weight_used": 0,
  53. }
  54. defaults.update(kwargs)
  55. spool = Spool(**defaults)
  56. db_session.add(spool)
  57. await db_session.commit()
  58. await db_session.refresh(spool)
  59. return spool
  60. return _create
  61. # ============================================================================
  62. # Device endpoints
  63. # ============================================================================
  64. class TestDeviceEndpoints:
  65. @pytest.mark.asyncio
  66. @pytest.mark.integration
  67. async def test_register_new_device(self, async_client: AsyncClient):
  68. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  69. mock_ws.broadcast = AsyncMock()
  70. resp = await async_client.post(
  71. f"{API}/devices/register",
  72. json={
  73. "device_id": "sb-new",
  74. "hostname": "spoolbuddy-new",
  75. "ip_address": "10.0.0.99",
  76. "firmware_version": "1.2.0",
  77. },
  78. )
  79. assert resp.status_code == 200
  80. data = resp.json()
  81. assert data["device_id"] == "sb-new"
  82. assert data["hostname"] == "spoolbuddy-new"
  83. assert data["online"] is True
  84. mock_ws.broadcast.assert_called_once()
  85. msg = mock_ws.broadcast.call_args[0][0]
  86. assert msg["type"] == "spoolbuddy_online"
  87. @pytest.mark.asyncio
  88. @pytest.mark.integration
  89. async def test_re_register_existing_device(self, async_client: AsyncClient, device_factory):
  90. device = await device_factory(
  91. device_id="sb-exist",
  92. tare_offset=12345,
  93. calibration_factor=0.0042,
  94. )
  95. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  96. mock_ws.broadcast = AsyncMock()
  97. resp = await async_client.post(
  98. f"{API}/devices/register",
  99. json={
  100. "device_id": "sb-exist",
  101. "hostname": "updated-host",
  102. "ip_address": "10.0.0.200",
  103. "firmware_version": "2.0.0",
  104. },
  105. )
  106. assert resp.status_code == 200
  107. data = resp.json()
  108. assert data["id"] == device.id
  109. assert data["hostname"] == "updated-host"
  110. assert data["ip_address"] == "10.0.0.200"
  111. assert data["firmware_version"] == "2.0.0"
  112. # Calibration preserved on re-register
  113. assert data["tare_offset"] == 12345
  114. assert data["calibration_factor"] == pytest.approx(0.0042)
  115. @pytest.mark.asyncio
  116. @pytest.mark.integration
  117. async def test_list_devices_empty(self, async_client: AsyncClient):
  118. resp = await async_client.get(f"{API}/devices")
  119. assert resp.status_code == 200
  120. assert resp.json() == []
  121. @pytest.mark.asyncio
  122. @pytest.mark.integration
  123. async def test_list_devices(self, async_client: AsyncClient, device_factory):
  124. await device_factory(device_id="sb-a", hostname="alpha")
  125. await device_factory(device_id="sb-b", hostname="beta")
  126. resp = await async_client.get(f"{API}/devices")
  127. assert resp.status_code == 200
  128. devices = resp.json()
  129. assert len(devices) == 2
  130. hostnames = {d["hostname"] for d in devices}
  131. assert hostnames == {"alpha", "beta"}
  132. @pytest.mark.asyncio
  133. @pytest.mark.integration
  134. async def test_unregister_device(self, async_client: AsyncClient, device_factory, db_session):
  135. await device_factory(device_id="sb-keep", hostname="keep")
  136. await device_factory(device_id="sb-drop", hostname="drop")
  137. spoolbuddy_routes._spoolbuddy_online_last_broadcast["sb-drop"] = 123.0
  138. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  139. mock_ws.broadcast = AsyncMock()
  140. resp = await async_client.delete(f"{API}/devices/sb-drop")
  141. assert resp.status_code == 200
  142. assert resp.json() == {"status": "deleted", "device_id": "sb-drop"}
  143. assert "sb-drop" not in spoolbuddy_routes._spoolbuddy_online_last_broadcast
  144. mock_ws.broadcast.assert_called_once()
  145. msg = mock_ws.broadcast.call_args[0][0]
  146. assert msg["type"] == "spoolbuddy_unregistered"
  147. assert msg["device_id"] == "sb-drop"
  148. # Other device still present
  149. resp = await async_client.get(f"{API}/devices")
  150. remaining = {d["device_id"] for d in resp.json()}
  151. assert remaining == {"sb-keep"}
  152. @pytest.mark.asyncio
  153. @pytest.mark.integration
  154. async def test_unregister_device_not_found(self, async_client: AsyncClient):
  155. resp = await async_client.delete(f"{API}/devices/sb-ghost")
  156. assert resp.status_code == 404
  157. @pytest.mark.asyncio
  158. @pytest.mark.integration
  159. async def test_heartbeat_updates_status(self, async_client: AsyncClient, device_factory):
  160. device = await device_factory(device_id="sb-hb")
  161. spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
  162. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  163. mock_ws.broadcast = AsyncMock()
  164. resp = await async_client.post(
  165. f"{API}/devices/sb-hb/heartbeat",
  166. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 600},
  167. )
  168. assert resp.status_code == 200
  169. data = resp.json()
  170. assert data["tare_offset"] == device.tare_offset
  171. assert data["calibration_factor"] == pytest.approx(device.calibration_factor)
  172. mock_ws.broadcast.assert_called_once()
  173. msg = mock_ws.broadcast.call_args[0][0]
  174. assert msg["type"] == "spoolbuddy_online"
  175. assert msg["device_id"] == "sb-hb"
  176. @pytest.mark.asyncio
  177. @pytest.mark.integration
  178. async def test_heartbeat_returns_ssh_public_key(self, async_client: AsyncClient, device_factory):
  179. """Heartbeat response carries the current SSH public key so the daemon
  180. can re-deploy it whenever Bambuddy's keypair rotates without waiting
  181. for a service restart."""
  182. await device_factory(device_id="sb-ssh-hb")
  183. fake_key = "ssh-ed25519 AAAATESTKEY bambuddy-spoolbuddy"
  184. with (
  185. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  186. patch(
  187. "backend.app.services.spoolbuddy_ssh.get_public_key",
  188. AsyncMock(return_value=fake_key),
  189. ),
  190. ):
  191. mock_ws.broadcast = AsyncMock()
  192. resp = await async_client.post(
  193. f"{API}/devices/sb-ssh-hb/heartbeat",
  194. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
  195. )
  196. assert resp.status_code == 200
  197. assert resp.json()["ssh_public_key"] == fake_key
  198. @pytest.mark.asyncio
  199. @pytest.mark.integration
  200. async def test_heartbeat_ssh_key_failure_does_not_break_heartbeat(self, async_client: AsyncClient, device_factory):
  201. """If the backend can't read its own SSH key, the heartbeat must still
  202. succeed — telemetry/commands are far more critical than key sync."""
  203. await device_factory(device_id="sb-ssh-fail")
  204. with (
  205. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  206. patch(
  207. "backend.app.services.spoolbuddy_ssh.get_public_key",
  208. AsyncMock(side_effect=OSError("disk full")),
  209. ),
  210. ):
  211. mock_ws.broadcast = AsyncMock()
  212. resp = await async_client.post(
  213. f"{API}/devices/sb-ssh-fail/heartbeat",
  214. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
  215. )
  216. assert resp.status_code == 200
  217. assert resp.json()["ssh_public_key"] is None
  218. @pytest.mark.asyncio
  219. @pytest.mark.integration
  220. async def test_heartbeat_returns_pending_command(self, async_client: AsyncClient, device_factory):
  221. await device_factory(device_id="sb-cmd", pending_command="tare")
  222. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  223. mock_ws.broadcast = AsyncMock()
  224. resp = await async_client.post(
  225. f"{API}/devices/sb-cmd/heartbeat",
  226. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  227. )
  228. assert resp.status_code == 200
  229. assert resp.json()["pending_command"] == "tare"
  230. # Second heartbeat should have no pending command (cleared)
  231. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  232. mock_ws.broadcast = AsyncMock()
  233. resp2 = await async_client.post(
  234. f"{API}/devices/sb-cmd/heartbeat",
  235. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
  236. )
  237. assert resp2.json()["pending_command"] is None
  238. @pytest.mark.asyncio
  239. @pytest.mark.integration
  240. async def test_heartbeat_unknown_device_404(self, async_client: AsyncClient):
  241. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  242. mock_ws.broadcast = AsyncMock()
  243. resp = await async_client.post(
  244. f"{API}/devices/nonexistent/heartbeat",
  245. json={"nfc_ok": False, "scale_ok": False, "uptime_s": 0},
  246. )
  247. assert resp.status_code == 404
  248. @pytest.mark.asyncio
  249. @pytest.mark.integration
  250. async def test_heartbeat_broadcasts_online_when_was_offline(self, async_client: AsyncClient, device_factory):
  251. # Create device with last_seen far in the past (offline)
  252. spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
  253. await device_factory(
  254. device_id="sb-offline",
  255. last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
  256. )
  257. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  258. mock_ws.broadcast = AsyncMock()
  259. resp = await async_client.post(
  260. f"{API}/devices/sb-offline/heartbeat",
  261. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
  262. )
  263. assert resp.status_code == 200
  264. # Should broadcast online since device was offline
  265. mock_ws.broadcast.assert_called_once()
  266. msg = mock_ws.broadcast.call_args[0][0]
  267. assert msg["type"] == "spoolbuddy_online"
  268. assert msg["device_id"] == "sb-offline"
  269. @pytest.mark.asyncio
  270. @pytest.mark.integration
  271. async def test_heartbeat_broadcasts_online_when_already_online(self, async_client: AsyncClient, device_factory):
  272. spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
  273. await device_factory(
  274. device_id="sb-already-online",
  275. last_seen=datetime.now(timezone.utc),
  276. )
  277. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  278. mock_ws.broadcast = AsyncMock()
  279. resp = await async_client.post(
  280. f"{API}/devices/sb-already-online/heartbeat",
  281. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 42},
  282. )
  283. assert resp.status_code == 200
  284. mock_ws.broadcast.assert_called_once()
  285. msg = mock_ws.broadcast.call_args[0][0]
  286. assert msg["type"] == "spoolbuddy_online"
  287. assert msg["device_id"] == "sb-already-online"
  288. @pytest.mark.asyncio
  289. @pytest.mark.integration
  290. async def test_heartbeat_online_broadcast_is_throttled(self, async_client: AsyncClient, device_factory):
  291. spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
  292. await device_factory(
  293. device_id="sb-throttle",
  294. last_seen=datetime.now(timezone.utc),
  295. )
  296. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  297. mock_ws.broadcast = AsyncMock()
  298. resp1 = await async_client.post(
  299. f"{API}/devices/sb-throttle/heartbeat",
  300. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  301. )
  302. resp2 = await async_client.post(
  303. f"{API}/devices/sb-throttle/heartbeat",
  304. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 11},
  305. )
  306. assert resp1.status_code == 200
  307. assert resp2.status_code == 200
  308. mock_ws.broadcast.assert_called_once()
  309. msg = mock_ws.broadcast.call_args[0][0]
  310. assert msg["type"] == "spoolbuddy_online"
  311. assert msg["device_id"] == "sb-throttle"
  312. # ============================================================================
  313. # NFC endpoints
  314. # ============================================================================
  315. class TestNfcEndpoints:
  316. @pytest.mark.asyncio
  317. @pytest.mark.integration
  318. async def test_tag_scanned_matched(self, async_client: AsyncClient, spool_factory):
  319. spool = await spool_factory(tag_uid="AABB1122", material="PLA")
  320. mock_spool = MagicMock()
  321. mock_spool.id = spool.id
  322. mock_spool.material = spool.material
  323. mock_spool.subtype = spool.subtype
  324. mock_spool.color_name = spool.color_name
  325. mock_spool.rgba = spool.rgba
  326. mock_spool.brand = spool.brand
  327. mock_spool.label_weight = spool.label_weight
  328. mock_spool.core_weight = spool.core_weight
  329. mock_spool.weight_used = spool.weight_used
  330. with (
  331. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  332. patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
  333. ):
  334. mock_ws.broadcast = AsyncMock()
  335. mock_lookup.return_value = mock_spool
  336. resp = await async_client.post(
  337. f"{API}/nfc/tag-scanned",
  338. json={"device_id": "sb-1", "tag_uid": "AABB1122"},
  339. )
  340. assert resp.status_code == 200
  341. data = resp.json()
  342. assert data["matched"] is True
  343. assert data["spool_id"] == spool.id
  344. msg = mock_ws.broadcast.call_args[0][0]
  345. assert msg["type"] == "spoolbuddy_tag_matched"
  346. assert msg["spool"]["id"] == spool.id
  347. @pytest.mark.asyncio
  348. @pytest.mark.integration
  349. async def test_tag_scanned_unmatched(self, async_client: AsyncClient):
  350. with (
  351. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  352. patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
  353. ):
  354. mock_ws.broadcast = AsyncMock()
  355. mock_lookup.return_value = None
  356. resp = await async_client.post(
  357. f"{API}/nfc/tag-scanned",
  358. json={"device_id": "sb-1", "tag_uid": "DEADBEEF"},
  359. )
  360. assert resp.status_code == 200
  361. data = resp.json()
  362. assert data["matched"] is False
  363. assert data["spool_id"] is None
  364. msg = mock_ws.broadcast.call_args[0][0]
  365. assert msg["type"] == "spoolbuddy_unknown_tag"
  366. @pytest.mark.asyncio
  367. @pytest.mark.integration
  368. async def test_tag_removed(self, async_client: AsyncClient):
  369. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  370. mock_ws.broadcast = AsyncMock()
  371. resp = await async_client.post(
  372. f"{API}/nfc/tag-removed",
  373. json={"device_id": "sb-1", "tag_uid": "AABB1122"},
  374. )
  375. assert resp.status_code == 200
  376. msg = mock_ws.broadcast.call_args[0][0]
  377. assert msg["type"] == "spoolbuddy_tag_removed"
  378. assert msg["device_id"] == "sb-1"
  379. assert msg["tag_uid"] == "AABB1122"
  380. # ============================================================================
  381. # NFC write-tag endpoints
  382. # ============================================================================
  383. class TestWriteTagEndpoints:
  384. @pytest.mark.asyncio
  385. @pytest.mark.integration
  386. async def test_write_tag_queues_command(self, async_client: AsyncClient, device_factory, spool_factory):
  387. device = await device_factory(device_id="sb-wt")
  388. spool = await spool_factory(material="PLA", brand="Polymaker", color_name="Red", rgba="FF0000FF")
  389. resp = await async_client.post(
  390. f"{API}/nfc/write-tag",
  391. json={"device_id": device.device_id, "spool_id": spool.id},
  392. )
  393. assert resp.status_code == 200
  394. assert resp.json()["status"] == "queued"
  395. # Verify heartbeat returns write_tag command with payload
  396. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  397. mock_ws.broadcast = AsyncMock()
  398. hb = await async_client.post(
  399. f"{API}/devices/{device.device_id}/heartbeat",
  400. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  401. )
  402. hb_data = hb.json()
  403. assert hb_data["pending_command"] == "write_tag"
  404. assert hb_data["pending_write_payload"] is not None
  405. assert hb_data["pending_write_payload"]["spool_id"] == spool.id
  406. assert "ndef_data_hex" in hb_data["pending_write_payload"]
  407. @pytest.mark.asyncio
  408. @pytest.mark.integration
  409. async def test_write_tag_heartbeat_not_cleared(self, async_client: AsyncClient, device_factory, spool_factory):
  410. """write_tag command persists across heartbeats until write-result clears it."""
  411. device = await device_factory(device_id="sb-wt-persist")
  412. spool = await spool_factory(material="PETG")
  413. await async_client.post(
  414. f"{API}/nfc/write-tag",
  415. json={"device_id": device.device_id, "spool_id": spool.id},
  416. )
  417. # First heartbeat — command present
  418. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  419. mock_ws.broadcast = AsyncMock()
  420. hb1 = await async_client.post(
  421. f"{API}/devices/{device.device_id}/heartbeat",
  422. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  423. )
  424. assert hb1.json()["pending_command"] == "write_tag"
  425. # Second heartbeat — should still be present (not cleared like tare)
  426. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  427. mock_ws.broadcast = AsyncMock()
  428. hb2 = await async_client.post(
  429. f"{API}/devices/{device.device_id}/heartbeat",
  430. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
  431. )
  432. assert hb2.json()["pending_command"] == "write_tag"
  433. @pytest.mark.asyncio
  434. @pytest.mark.integration
  435. async def test_write_tag_missing_spool_404(self, async_client: AsyncClient, device_factory):
  436. device = await device_factory(device_id="sb-wt-nospool")
  437. resp = await async_client.post(
  438. f"{API}/nfc/write-tag",
  439. json={"device_id": device.device_id, "spool_id": 99999},
  440. )
  441. assert resp.status_code == 404
  442. @pytest.mark.asyncio
  443. @pytest.mark.integration
  444. async def test_write_tag_missing_device_404(self, async_client: AsyncClient, spool_factory):
  445. spool = await spool_factory()
  446. resp = await async_client.post(
  447. f"{API}/nfc/write-tag",
  448. json={"device_id": "nonexistent", "spool_id": spool.id},
  449. )
  450. assert resp.status_code == 404
  451. @pytest.mark.asyncio
  452. @pytest.mark.integration
  453. async def test_write_result_success_links_tag(self, async_client: AsyncClient, device_factory, spool_factory):
  454. device = await device_factory(device_id="sb-wr", pending_command="write_tag")
  455. spool = await spool_factory(material="PLA", tag_uid=None)
  456. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  457. mock_ws.broadcast = AsyncMock()
  458. resp = await async_client.post(
  459. f"{API}/nfc/write-result",
  460. json={
  461. "device_id": device.device_id,
  462. "spool_id": spool.id,
  463. "tag_uid": "04AABB11223344",
  464. "success": True,
  465. },
  466. )
  467. assert resp.status_code == 200
  468. msg = mock_ws.broadcast.call_args[0][0]
  469. assert msg["type"] == "spoolbuddy_tag_written"
  470. assert msg["spool_id"] == spool.id
  471. assert msg["tag_uid"] == "04AABB11223344"
  472. # Verify spool got tag linked
  473. spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
  474. spool_data = spool_resp.json()
  475. assert spool_data["tag_uid"] == "04AABB11223344"
  476. assert spool_data["tag_type"] == "ntag"
  477. assert spool_data["data_origin"] == "opentag3d"
  478. assert spool_data["encode_time"] is not None
  479. @pytest.mark.asyncio
  480. @pytest.mark.integration
  481. async def test_write_result_failure_broadcasts_error(
  482. self, async_client: AsyncClient, device_factory, spool_factory
  483. ):
  484. device = await device_factory(device_id="sb-wr-fail", pending_command="write_tag")
  485. spool = await spool_factory(material="PLA", tag_uid=None)
  486. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  487. mock_ws.broadcast = AsyncMock()
  488. resp = await async_client.post(
  489. f"{API}/nfc/write-result",
  490. json={
  491. "device_id": device.device_id,
  492. "spool_id": spool.id,
  493. "tag_uid": "04AABBCC",
  494. "success": False,
  495. "message": "Write or verification failed",
  496. },
  497. )
  498. assert resp.status_code == 200
  499. msg = mock_ws.broadcast.call_args[0][0]
  500. assert msg["type"] == "spoolbuddy_tag_write_failed"
  501. assert msg["message"] == "Write or verification failed"
  502. # Verify spool NOT linked
  503. spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
  504. assert spool_resp.json()["tag_uid"] is None
  505. @pytest.mark.asyncio
  506. @pytest.mark.integration
  507. async def test_write_result_clears_pending_command(self, async_client: AsyncClient, device_factory, spool_factory):
  508. device = await device_factory(
  509. device_id="sb-wr-clear",
  510. pending_command="write_tag",
  511. pending_write_payload='{"spool_id": 1, "ndef_data_hex": "E110120003"}',
  512. )
  513. spool = await spool_factory()
  514. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  515. mock_ws.broadcast = AsyncMock()
  516. await async_client.post(
  517. f"{API}/nfc/write-result",
  518. json={
  519. "device_id": device.device_id,
  520. "spool_id": spool.id,
  521. "tag_uid": "AABBCCDD",
  522. "success": True,
  523. },
  524. )
  525. # Heartbeat should have no pending command
  526. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  527. mock_ws.broadcast = AsyncMock()
  528. hb = await async_client.post(
  529. f"{API}/devices/{device.device_id}/heartbeat",
  530. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 30},
  531. )
  532. assert hb.json()["pending_command"] is None
  533. assert hb.json()["pending_write_payload"] is None
  534. @pytest.mark.asyncio
  535. @pytest.mark.integration
  536. async def test_cancel_write(self, async_client: AsyncClient, device_factory, spool_factory):
  537. device = await device_factory(device_id="sb-cancel")
  538. spool = await spool_factory()
  539. # Queue a write
  540. await async_client.post(
  541. f"{API}/nfc/write-tag",
  542. json={"device_id": device.device_id, "spool_id": spool.id},
  543. )
  544. # Cancel it
  545. resp = await async_client.post(f"{API}/devices/{device.device_id}/cancel-write", json={})
  546. assert resp.status_code == 200
  547. # Heartbeat should have no pending command
  548. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  549. mock_ws.broadcast = AsyncMock()
  550. hb = await async_client.post(
  551. f"{API}/devices/{device.device_id}/heartbeat",
  552. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  553. )
  554. assert hb.json()["pending_command"] is None
  555. @pytest.mark.asyncio
  556. @pytest.mark.integration
  557. async def test_cancel_write_unknown_device_404(self, async_client: AsyncClient):
  558. resp = await async_client.post(f"{API}/devices/ghost/cancel-write", json={})
  559. assert resp.status_code == 404
  560. @pytest.mark.asyncio
  561. @pytest.mark.integration
  562. async def test_write_tag_ndef_data_is_valid(self, async_client: AsyncClient, device_factory, spool_factory):
  563. """Verify the NDEF data in the heartbeat is a valid OpenTag3D message."""
  564. device = await device_factory(device_id="sb-wt-ndef")
  565. spool = await spool_factory(
  566. material="PLA",
  567. brand="Polymaker",
  568. color_name="White",
  569. rgba="FFFFFFFF",
  570. label_weight=1000,
  571. )
  572. await async_client.post(
  573. f"{API}/nfc/write-tag",
  574. json={"device_id": device.device_id, "spool_id": spool.id},
  575. )
  576. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  577. mock_ws.broadcast = AsyncMock()
  578. hb = await async_client.post(
  579. f"{API}/devices/{device.device_id}/heartbeat",
  580. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  581. )
  582. payload = hb.json()["pending_write_payload"]
  583. ndef_bytes = bytes.fromhex(payload["ndef_data_hex"])
  584. # CC bytes
  585. assert ndef_bytes[:4] == bytes([0xE1, 0x10, 0x12, 0x00])
  586. # TLV type
  587. assert ndef_bytes[4] == 0x03
  588. # NDEF record: TNF=MIME, type=application/opentag3d
  589. assert ndef_bytes[6] == 0xD2
  590. assert ndef_bytes[9:30] == b"application/opentag3d"
  591. # Terminator
  592. assert ndef_bytes[-1] == 0xFE
  593. # Total size fits NTAG213
  594. assert len(ndef_bytes) <= 144
  595. # ============================================================================
  596. # Scale endpoints
  597. # ============================================================================
  598. class TestScaleEndpoints:
  599. @pytest.mark.asyncio
  600. @pytest.mark.integration
  601. async def test_scale_reading_broadcast(self, async_client: AsyncClient):
  602. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  603. mock_ws.broadcast = AsyncMock()
  604. resp = await async_client.post(
  605. f"{API}/scale/reading",
  606. json={
  607. "device_id": "sb-1",
  608. "weight_grams": 823.5,
  609. "stable": True,
  610. "raw_adc": 456789,
  611. },
  612. )
  613. assert resp.status_code == 200
  614. msg = mock_ws.broadcast.call_args[0][0]
  615. assert msg["type"] == "spoolbuddy_weight"
  616. assert msg["device_id"] == "sb-1"
  617. assert msg["weight_grams"] == 823.5
  618. assert msg["stable"] is True
  619. assert msg["raw_adc"] == 456789
  620. @pytest.mark.asyncio
  621. @pytest.mark.integration
  622. async def test_update_spool_weight_calculates_correctly(self, async_client: AsyncClient, spool_factory):
  623. # label=1000g, core=250g, scale reads 750g
  624. # net_filament = max(0, 750 - 250) = 500
  625. # weight_used = max(0, 1000 - 500) = 500
  626. spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
  627. resp = await async_client.post(
  628. f"{API}/scale/update-spool-weight",
  629. json={"spool_id": spool.id, "weight_grams": 750},
  630. )
  631. assert resp.status_code == 200
  632. data = resp.json()
  633. assert data["weight_used"] == 500
  634. @pytest.mark.asyncio
  635. @pytest.mark.integration
  636. async def test_update_spool_weight_full_spool(self, async_client: AsyncClient, spool_factory):
  637. # label=1000g, core=250g, scale reads 1250g (full spool)
  638. # net_filament = max(0, 1250 - 250) = 1000
  639. # weight_used = max(0, 1000 - 1000) = 0
  640. spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=200)
  641. resp = await async_client.post(
  642. f"{API}/scale/update-spool-weight",
  643. json={"spool_id": spool.id, "weight_grams": 1250},
  644. )
  645. assert resp.status_code == 200
  646. data = resp.json()
  647. assert data["weight_used"] == 0
  648. @pytest.mark.asyncio
  649. @pytest.mark.integration
  650. async def test_update_spool_weight_stores_scale_reading(self, async_client: AsyncClient, spool_factory):
  651. """Verify last_scale_weight and last_weighed_at are stored after weight sync."""
  652. spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
  653. resp = await async_client.post(
  654. f"{API}/scale/update-spool-weight",
  655. json={"spool_id": spool.id, "weight_grams": 750},
  656. )
  657. assert resp.status_code == 200
  658. # Fetch the spool via inventory API to verify stored fields
  659. spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
  660. assert spool_resp.status_code == 200
  661. spool_data = spool_resp.json()
  662. assert spool_data["last_scale_weight"] == 750
  663. assert spool_data["last_weighed_at"] is not None
  664. @pytest.mark.asyncio
  665. @pytest.mark.integration
  666. async def test_update_spool_weight_missing_spool_404(self, async_client: AsyncClient):
  667. resp = await async_client.post(
  668. f"{API}/scale/update-spool-weight",
  669. json={"spool_id": 99999, "weight_grams": 500},
  670. )
  671. assert resp.status_code == 404
  672. # ============================================================================
  673. # Calibration endpoints
  674. # ============================================================================
  675. class TestCalibrationEndpoints:
  676. @pytest.mark.asyncio
  677. @pytest.mark.integration
  678. async def test_tare_queues_command(self, async_client: AsyncClient, device_factory):
  679. await device_factory(device_id="sb-tare")
  680. resp = await async_client.post(f"{API}/devices/sb-tare/calibration/tare", json={})
  681. assert resp.status_code == 200
  682. assert resp.json()["status"] == "ok"
  683. # Verify pending_command via heartbeat
  684. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  685. mock_ws.broadcast = AsyncMock()
  686. hb = await async_client.post(
  687. f"{API}/devices/sb-tare/heartbeat",
  688. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 1},
  689. )
  690. assert hb.json()["pending_command"] == "tare"
  691. @pytest.mark.asyncio
  692. @pytest.mark.integration
  693. async def test_tare_unknown_device_404(self, async_client: AsyncClient):
  694. resp = await async_client.post(f"{API}/devices/ghost/calibration/tare", json={})
  695. assert resp.status_code == 404
  696. @pytest.mark.asyncio
  697. @pytest.mark.integration
  698. async def test_set_tare_offset(self, async_client: AsyncClient, device_factory):
  699. await device_factory(device_id="sb-st", calibration_factor=0.005)
  700. resp = await async_client.post(
  701. f"{API}/devices/sb-st/calibration/set-tare",
  702. json={"tare_offset": 54321},
  703. )
  704. assert resp.status_code == 200
  705. data = resp.json()
  706. assert data["tare_offset"] == 54321
  707. assert data["calibration_factor"] == pytest.approx(0.005)
  708. @pytest.mark.asyncio
  709. @pytest.mark.integration
  710. async def test_set_calibration_factor(self, async_client: AsyncClient, device_factory):
  711. # known_weight=200g, raw_adc=50000, tare=10000 → factor=200/(50000-10000)=0.005
  712. await device_factory(device_id="sb-cf", tare_offset=10000)
  713. resp = await async_client.post(
  714. f"{API}/devices/sb-cf/calibration/set-factor",
  715. json={"known_weight_grams": 200, "raw_adc": 50000},
  716. )
  717. assert resp.status_code == 200
  718. data = resp.json()
  719. assert data["calibration_factor"] == pytest.approx(0.005)
  720. assert data["tare_offset"] == 10000
  721. @pytest.mark.asyncio
  722. @pytest.mark.integration
  723. async def test_set_calibration_factor_zero_delta_400(self, async_client: AsyncClient, device_factory):
  724. # raw_adc == tare_offset → delta is 0 → 400 error
  725. await device_factory(device_id="sb-zero", tare_offset=5000)
  726. resp = await async_client.post(
  727. f"{API}/devices/sb-zero/calibration/set-factor",
  728. json={"known_weight_grams": 100, "raw_adc": 5000},
  729. )
  730. assert resp.status_code == 400
  731. @pytest.mark.asyncio
  732. @pytest.mark.integration
  733. async def test_get_calibration(self, async_client: AsyncClient, device_factory):
  734. await device_factory(
  735. device_id="sb-gcal",
  736. tare_offset=11111,
  737. calibration_factor=0.0042,
  738. )
  739. resp = await async_client.get(f"{API}/devices/sb-gcal/calibration")
  740. assert resp.status_code == 200
  741. data = resp.json()
  742. assert data["tare_offset"] == 11111
  743. assert data["calibration_factor"] == pytest.approx(0.0042)
  744. # ============================================================================
  745. # Display endpoints
  746. # ============================================================================
  747. class TestDisplayEndpoints:
  748. @pytest.mark.asyncio
  749. @pytest.mark.integration
  750. async def test_update_display_settings(self, async_client: AsyncClient, device_factory):
  751. await device_factory(device_id="sb-disp", display_brightness=100, display_blank_timeout=0)
  752. resp = await async_client.put(
  753. f"{API}/devices/sb-disp/display",
  754. json={"brightness": 75, "blank_timeout": 300},
  755. )
  756. assert resp.status_code == 200
  757. data = resp.json()
  758. assert data["brightness"] == 75
  759. assert data["blank_timeout"] == 300
  760. @pytest.mark.asyncio
  761. @pytest.mark.integration
  762. async def test_update_display_persists_via_heartbeat(self, async_client: AsyncClient, device_factory):
  763. await device_factory(device_id="sb-disp-hb")
  764. await async_client.put(
  765. f"{API}/devices/sb-disp-hb/display",
  766. json={"brightness": 50, "blank_timeout": 600},
  767. )
  768. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  769. mock_ws.broadcast = AsyncMock()
  770. hb = await async_client.post(
  771. f"{API}/devices/sb-disp-hb/heartbeat",
  772. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  773. )
  774. assert hb.json()["display_brightness"] == 50
  775. assert hb.json()["display_blank_timeout"] == 600
  776. @pytest.mark.asyncio
  777. @pytest.mark.integration
  778. async def test_update_display_unknown_device_404(self, async_client: AsyncClient):
  779. resp = await async_client.put(
  780. f"{API}/devices/ghost/display",
  781. json={"brightness": 50, "blank_timeout": 60},
  782. )
  783. assert resp.status_code == 404
  784. @pytest.mark.asyncio
  785. @pytest.mark.integration
  786. async def test_update_display_validates_brightness(self, async_client: AsyncClient, device_factory):
  787. await device_factory(device_id="sb-disp-val")
  788. resp = await async_client.put(
  789. f"{API}/devices/sb-disp-val/display",
  790. json={"brightness": 150, "blank_timeout": 0},
  791. )
  792. assert resp.status_code == 422 # Validation error: brightness > 100
  793. @pytest.mark.asyncio
  794. @pytest.mark.integration
  795. async def test_get_display_settings(self, async_client: AsyncClient, device_factory):
  796. """The kiosk idle watchdog (install/spoolbuddy-idle.sh) reads this
  797. endpoint on autostart to configure swayidle with the user-selected
  798. blank timeout before launching. See issue #937."""
  799. await device_factory(device_id="sb-disp-get", display_brightness=60, display_blank_timeout=450)
  800. resp = await async_client.get(f"{API}/devices/sb-disp-get/display")
  801. assert resp.status_code == 200
  802. data = resp.json()
  803. assert data["brightness"] == 60
  804. assert data["blank_timeout"] == 450
  805. @pytest.mark.asyncio
  806. @pytest.mark.integration
  807. async def test_get_display_unknown_device_404(self, async_client: AsyncClient):
  808. resp = await async_client.get(f"{API}/devices/ghost/display")
  809. assert resp.status_code == 404
  810. # ============================================================================
  811. # Update endpoints
  812. # ============================================================================
  813. class TestUpdateEndpoints:
  814. @pytest.mark.asyncio
  815. @pytest.mark.integration
  816. async def test_trigger_update_starts_ssh_update(self, async_client: AsyncClient, device_factory):
  817. await device_factory(device_id="sb-upd")
  818. with (
  819. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  820. patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
  821. ):
  822. mock_ws.broadcast = AsyncMock()
  823. resp = await async_client.post(f"{API}/devices/sb-upd/update")
  824. assert resp.status_code == 200
  825. assert resp.json()["status"] == "ok"
  826. @pytest.mark.asyncio
  827. @pytest.mark.integration
  828. async def test_trigger_update_offline_device_409(self, async_client: AsyncClient, device_factory):
  829. await device_factory(
  830. device_id="sb-upd-off",
  831. last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
  832. )
  833. resp = await async_client.post(f"{API}/devices/sb-upd-off/update")
  834. assert resp.status_code == 409
  835. @pytest.mark.asyncio
  836. @pytest.mark.integration
  837. async def test_trigger_update_unknown_device_404(self, async_client: AsyncClient):
  838. resp = await async_client.post(f"{API}/devices/ghost/update")
  839. assert resp.status_code == 404
  840. @pytest.mark.asyncio
  841. @pytest.mark.integration
  842. async def test_trigger_update_already_updating(self, async_client: AsyncClient, device_factory):
  843. await device_factory(device_id="sb-upd-dup", update_status="updating")
  844. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  845. mock_ws.broadcast = AsyncMock()
  846. resp = await async_client.post(f"{API}/devices/sb-upd-dup/update")
  847. assert resp.status_code == 200
  848. assert resp.json()["status"] == "already_updating"
  849. @pytest.mark.asyncio
  850. @pytest.mark.integration
  851. async def test_report_update_status_updating(self, async_client: AsyncClient, device_factory):
  852. await device_factory(device_id="sb-upd-st", pending_command="update", update_status="pending")
  853. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  854. mock_ws.broadcast = AsyncMock()
  855. resp = await async_client.post(
  856. f"{API}/devices/sb-upd-st/update-status",
  857. json={"status": "updating", "message": "Fetching latest code..."},
  858. )
  859. assert resp.status_code == 200
  860. mock_ws.broadcast.assert_called_once()
  861. msg = mock_ws.broadcast.call_args[0][0]
  862. assert msg["type"] == "spoolbuddy_update"
  863. assert msg["update_status"] == "updating"
  864. @pytest.mark.asyncio
  865. @pytest.mark.integration
  866. async def test_report_update_status_complete_clears_command(self, async_client: AsyncClient, device_factory):
  867. await device_factory(device_id="sb-upd-done", pending_command="update", update_status="updating")
  868. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  869. mock_ws.broadcast = AsyncMock()
  870. await async_client.post(
  871. f"{API}/devices/sb-upd-done/update-status",
  872. json={"status": "complete", "message": "Update complete, restarting..."},
  873. )
  874. # Heartbeat should have no pending command
  875. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  876. mock_ws.broadcast = AsyncMock()
  877. hb = await async_client.post(
  878. f"{API}/devices/sb-upd-done/heartbeat",
  879. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
  880. )
  881. assert hb.json()["pending_command"] is None
  882. @pytest.mark.asyncio
  883. @pytest.mark.integration
  884. async def test_report_update_status_error(self, async_client: AsyncClient, device_factory):
  885. await device_factory(device_id="sb-upd-err", pending_command="update", update_status="updating")
  886. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  887. mock_ws.broadcast = AsyncMock()
  888. resp = await async_client.post(
  889. f"{API}/devices/sb-upd-err/update-status",
  890. json={"status": "error", "message": "git fetch failed: network unreachable"},
  891. )
  892. assert resp.status_code == 200
  893. msg = mock_ws.broadcast.call_args[0][0]
  894. assert msg["update_status"] == "error"
  895. assert "git fetch failed" in msg["update_message"]
  896. @pytest.mark.asyncio
  897. @pytest.mark.integration
  898. async def test_report_update_status_unknown_device_404(self, async_client: AsyncClient):
  899. resp = await async_client.post(
  900. f"{API}/devices/ghost/update-status",
  901. json={"status": "updating", "message": "test"},
  902. )
  903. assert resp.status_code == 404
  904. @pytest.mark.asyncio
  905. @pytest.mark.integration
  906. async def test_report_update_status_invalid_status_422(self, async_client: AsyncClient, device_factory):
  907. """Arbitrary status strings must be rejected with 422 (H2: UpdateStatusRequest validation)."""
  908. await device_factory(device_id="sb-upd-inv")
  909. resp = await async_client.post(
  910. f"{API}/devices/sb-upd-inv/update-status",
  911. json={"status": "hacked", "message": "injected"},
  912. )
  913. assert resp.status_code == 422
  914. @pytest.mark.asyncio
  915. @pytest.mark.integration
  916. async def test_report_update_status_oversized_message_422(self, async_client: AsyncClient, device_factory):
  917. """Message exceeding 255 chars must be rejected with 422 (H2/M4)."""
  918. await device_factory(device_id="sb-upd-big")
  919. resp = await async_client.post(
  920. f"{API}/devices/sb-upd-big/update-status",
  921. json={"status": "updating", "message": "x" * 256},
  922. )
  923. assert resp.status_code == 422
  924. @pytest.mark.asyncio
  925. @pytest.mark.integration
  926. async def test_ssh_public_key_error_does_not_leak_exception_text(self, async_client: AsyncClient):
  927. """SSH public-key 500 must not expose raw exception details (M3)."""
  928. from backend.app.services.spoolbuddy_ssh import get_public_key
  929. with patch(
  930. "backend.app.services.spoolbuddy_ssh.get_public_key",
  931. AsyncMock(side_effect=RuntimeError("REDACT_ME internal path /data/keys/id_ed25519")),
  932. ):
  933. resp = await async_client.get(f"{API}/ssh/public-key")
  934. assert resp.status_code == 500
  935. body = resp.json()["detail"]
  936. assert "REDACT_ME" not in body
  937. assert "/data/keys" not in body
  938. assert "id_ed25519" not in body
  939. @pytest.mark.asyncio
  940. @pytest.mark.integration
  941. async def test_device_response_includes_update_fields(self, async_client: AsyncClient, device_factory):
  942. await device_factory(device_id="sb-upd-resp", update_status="complete", update_message="Done!")
  943. resp = await async_client.get(f"{API}/devices")
  944. assert resp.status_code == 200
  945. device = next(d for d in resp.json() if d["device_id"] == "sb-upd-resp")
  946. assert device["update_status"] == "complete"
  947. assert device["update_message"] == "Done!"
  948. @pytest.mark.asyncio
  949. @pytest.mark.integration
  950. async def test_update_check_returns_version_info(self, async_client: AsyncClient, device_factory):
  951. """GET /devices/{id}/update-check compares device version against APP_VERSION."""
  952. await device_factory(device_id="sb-uc", firmware_version="0.1.0")
  953. resp = await async_client.get(f"{API}/devices/sb-uc/update-check")
  954. assert resp.status_code == 200
  955. data = resp.json()
  956. assert data["current_version"] == "0.1.0"
  957. assert data["latest_version"] is not None
  958. assert data["update_available"] is True
  959. @pytest.mark.asyncio
  960. @pytest.mark.integration
  961. async def test_update_check_up_to_date(self, async_client: AsyncClient, device_factory):
  962. from backend.app.core.config import APP_VERSION
  963. await device_factory(device_id="sb-uc2", firmware_version=APP_VERSION)
  964. resp = await async_client.get(f"{API}/devices/sb-uc2/update-check")
  965. assert resp.status_code == 200
  966. assert resp.json()["update_available"] is False
  967. @pytest.mark.asyncio
  968. @pytest.mark.integration
  969. async def test_update_check_unknown_device_404(self, async_client: AsyncClient):
  970. resp = await async_client.get(f"{API}/devices/ghost/update-check")
  971. assert resp.status_code == 404
  972. @pytest.mark.asyncio
  973. @pytest.mark.integration
  974. async def test_trigger_update_broadcasts_websocket(self, async_client: AsyncClient, device_factory):
  975. await device_factory(device_id="sb-upd-ws")
  976. with (
  977. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  978. patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
  979. ):
  980. mock_ws.broadcast = AsyncMock()
  981. await async_client.post(f"{API}/devices/sb-upd-ws/update")
  982. mock_ws.broadcast.assert_called_once()
  983. msg = mock_ws.broadcast.call_args[0][0]
  984. assert msg["type"] == "spoolbuddy_update"
  985. assert msg["device_id"] == "sb-upd-ws"
  986. assert msg["update_status"] == "pending"
  987. # ============================================================================
  988. # System command endpoints
  989. # ============================================================================
  990. class TestSystemCommandEndpoints:
  991. @pytest.mark.asyncio
  992. @pytest.mark.integration
  993. async def test_queue_reboot(self, async_client: AsyncClient, device_factory):
  994. await device_factory(device_id="sb-reboot")
  995. resp = await async_client.post(
  996. f"{API}/devices/sb-reboot/system/command",
  997. json={"command": "reboot"},
  998. )
  999. assert resp.status_code == 200
  1000. data = resp.json()
  1001. assert data["status"] == "queued"
  1002. assert data["command"] == "reboot"
  1003. @pytest.mark.asyncio
  1004. @pytest.mark.integration
  1005. async def test_queue_shutdown(self, async_client: AsyncClient, device_factory):
  1006. await device_factory(device_id="sb-shutdown")
  1007. resp = await async_client.post(
  1008. f"{API}/devices/sb-shutdown/system/command",
  1009. json={"command": "shutdown"},
  1010. )
  1011. assert resp.status_code == 200
  1012. assert resp.json()["command"] == "shutdown"
  1013. @pytest.mark.asyncio
  1014. @pytest.mark.integration
  1015. async def test_queue_restart_daemon(self, async_client: AsyncClient, device_factory):
  1016. await device_factory(device_id="sb-rd")
  1017. resp = await async_client.post(
  1018. f"{API}/devices/sb-rd/system/command",
  1019. json={"command": "restart_daemon"},
  1020. )
  1021. assert resp.status_code == 200
  1022. assert resp.json()["command"] == "restart_daemon"
  1023. @pytest.mark.asyncio
  1024. @pytest.mark.integration
  1025. async def test_queue_restart_browser(self, async_client: AsyncClient, device_factory):
  1026. await device_factory(device_id="sb-rb")
  1027. resp = await async_client.post(
  1028. f"{API}/devices/sb-rb/system/command",
  1029. json={"command": "restart_browser"},
  1030. )
  1031. assert resp.status_code == 200
  1032. assert resp.json()["command"] == "restart_browser"
  1033. @pytest.mark.asyncio
  1034. @pytest.mark.integration
  1035. async def test_invalid_command_rejected(self, async_client: AsyncClient, device_factory):
  1036. await device_factory(device_id="sb-invalid")
  1037. resp = await async_client.post(
  1038. f"{API}/devices/sb-invalid/system/command",
  1039. json={"command": "format_disk"},
  1040. )
  1041. assert resp.status_code == 400
  1042. assert "Invalid command" in resp.json()["detail"]
  1043. @pytest.mark.asyncio
  1044. @pytest.mark.integration
  1045. async def test_command_unknown_device_404(self, async_client: AsyncClient):
  1046. resp = await async_client.post(
  1047. f"{API}/devices/ghost/system/command",
  1048. json={"command": "reboot"},
  1049. )
  1050. assert resp.status_code == 404
  1051. @pytest.mark.asyncio
  1052. @pytest.mark.integration
  1053. async def test_command_offline_device_409(self, async_client: AsyncClient, device_factory):
  1054. await device_factory(
  1055. device_id="sb-offline-cmd",
  1056. last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
  1057. )
  1058. resp = await async_client.post(
  1059. f"{API}/devices/sb-offline-cmd/system/command",
  1060. json={"command": "reboot"},
  1061. )
  1062. assert resp.status_code == 409
  1063. assert "offline" in resp.json()["detail"].lower()
  1064. @pytest.mark.asyncio
  1065. @pytest.mark.integration
  1066. async def test_command_sets_pending_command(self, async_client: AsyncClient, device_factory, db_session):
  1067. device = await device_factory(device_id="sb-pending")
  1068. await async_client.post(
  1069. f"{API}/devices/sb-pending/system/command",
  1070. json={"command": "restart_daemon"},
  1071. )
  1072. await db_session.refresh(device)
  1073. assert device.pending_command == "restart_daemon"
  1074. @pytest.mark.asyncio
  1075. @pytest.mark.integration
  1076. async def test_heartbeat_clears_system_command(self, async_client: AsyncClient, device_factory):
  1077. """System commands (reboot/shutdown/restart_*) are fire-and-forget — heartbeat clears them."""
  1078. await device_factory(device_id="sb-hb-clear")
  1079. # Queue a command
  1080. await async_client.post(
  1081. f"{API}/devices/sb-hb-clear/system/command",
  1082. json={"command": "restart_browser"},
  1083. )
  1084. # Heartbeat should return the command and clear it
  1085. resp = await async_client.post(
  1086. f"{API}/devices/sb-hb-clear/heartbeat",
  1087. json={"nfc_ok": True, "scale_ok": True, "uptime_s": 100},
  1088. )
  1089. assert resp.status_code == 200
  1090. data = resp.json()
  1091. assert data["pending_command"] == "restart_browser"
  1092. # ============================================================================
  1093. # Spoolman-aware SpoolBuddy endpoints
  1094. # ============================================================================
  1095. @pytest.fixture
  1096. async def spoolman_settings(db_session: AsyncSession):
  1097. """Create Spoolman settings in the database (enabled with URL)."""
  1098. from backend.app.models.settings import Settings
  1099. settings = [
  1100. Settings(key="spoolman_enabled", value="true"),
  1101. Settings(key="spoolman_url", value="http://spoolman.local:7912"),
  1102. ]
  1103. for s in settings:
  1104. db_session.add(s)
  1105. await db_session.commit()
  1106. return settings
  1107. def _mock_spoolman_client(base_url: str = "http://spoolman.local:7912") -> MagicMock:
  1108. client = MagicMock()
  1109. client.base_url = base_url
  1110. client.get_spools = AsyncMock(return_value=[])
  1111. client.get_spool = AsyncMock(return_value={})
  1112. client.find_spool_by_tag = AsyncMock(return_value=None)
  1113. client.update_spool = AsyncMock(return_value=None)
  1114. client.merge_spool_extra = AsyncMock(return_value={"id": 0})
  1115. return client
  1116. def _spoolman_spool_fixture(
  1117. spool_id: int,
  1118. spool_weight: float = 196.0,
  1119. filament_weight: float = 1000.0,
  1120. spool_level_spool_weight=None,
  1121. ) -> dict:
  1122. """Build a minimal Spoolman spool dict with realistic core weight from filament.spool_weight."""
  1123. raw = {
  1124. "id": spool_id,
  1125. "filament": {"weight": filament_weight, "spool_weight": spool_weight},
  1126. "used_weight": 0.0,
  1127. }
  1128. if spool_level_spool_weight is not None:
  1129. raw["spool_weight"] = spool_level_spool_weight
  1130. return raw
  1131. class TestUpdateSpoolWeightSpoolman:
  1132. """update-spool-weight routes to Spoolman when Spoolman mode is active."""
  1133. @pytest.mark.asyncio
  1134. @pytest.mark.integration
  1135. async def test_spoolman_mode_uses_filament_spool_weight(self, async_client: AsyncClient, spoolman_settings):
  1136. """core_weight comes from filament.spool_weight, not a hardcoded constant."""
  1137. sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0)
  1138. mock_client = _mock_spoolman_client()
  1139. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1140. mock_client.update_spool = AsyncMock(return_value=sm_spool)
  1141. with (
  1142. patch(
  1143. "backend.app.services.spoolman.get_spoolman_client",
  1144. AsyncMock(return_value=mock_client),
  1145. ),
  1146. patch(
  1147. "backend.app.services.spoolman.init_spoolman_client",
  1148. AsyncMock(return_value=mock_client),
  1149. ),
  1150. ):
  1151. resp = await async_client.post(
  1152. f"{API}/scale/update-spool-weight",
  1153. json={"spool_id": 42, "weight_grams": 750},
  1154. )
  1155. assert resp.status_code == 200
  1156. data = resp.json()
  1157. assert data["status"] == "ok"
  1158. # remaining = max(0, 750 - 196) = 554 → weight_used = 1000 - 554 = 446
  1159. assert data["weight_used"] == pytest.approx(446.0)
  1160. mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(554.0))
  1161. @pytest.mark.asyncio
  1162. @pytest.mark.integration
  1163. async def test_spoolman_mode_clamps_remaining_to_zero(self, async_client: AsyncClient, spoolman_settings):
  1164. """Scale weight below core weight → remaining_weight = 0."""
  1165. sm_spool = _spoolman_spool_fixture(7, spool_weight=196.0, filament_weight=1000.0)
  1166. mock_client = _mock_spoolman_client()
  1167. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1168. mock_client.update_spool = AsyncMock(return_value=sm_spool)
  1169. with (
  1170. patch(
  1171. "backend.app.services.spoolman.get_spoolman_client",
  1172. AsyncMock(return_value=mock_client),
  1173. ),
  1174. patch(
  1175. "backend.app.services.spoolman.init_spoolman_client",
  1176. AsyncMock(return_value=mock_client),
  1177. ),
  1178. ):
  1179. resp = await async_client.post(
  1180. f"{API}/scale/update-spool-weight",
  1181. json={"spool_id": 7, "weight_grams": 100},
  1182. )
  1183. assert resp.status_code == 200
  1184. mock_client.update_spool.assert_called_once_with(spool_id=7, remaining_weight=0.0)
  1185. @pytest.mark.asyncio
  1186. @pytest.mark.integration
  1187. async def test_spoolman_mode_404_when_spool_not_found(self, async_client: AsyncClient, spoolman_settings):
  1188. """404 when Spoolman doesn't know the spool."""
  1189. mock_client = _mock_spoolman_client()
  1190. mock_client.get_spool = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 9999 not found"))
  1191. with (
  1192. patch(
  1193. "backend.app.services.spoolman.get_spoolman_client",
  1194. AsyncMock(return_value=mock_client),
  1195. ),
  1196. patch(
  1197. "backend.app.services.spoolman.init_spoolman_client",
  1198. AsyncMock(return_value=mock_client),
  1199. ),
  1200. ):
  1201. resp = await async_client.post(
  1202. f"{API}/scale/update-spool-weight",
  1203. json={"spool_id": 9999, "weight_grams": 500},
  1204. )
  1205. assert resp.status_code == 404
  1206. @pytest.mark.asyncio
  1207. @pytest.mark.integration
  1208. async def test_spoolman_mode_503_on_client_failure(self, async_client: AsyncClient, spoolman_settings):
  1209. """503 is returned when Spoolman is unreachable during weight update."""
  1210. sm_spool = _spoolman_spool_fixture(99)
  1211. mock_client = _mock_spoolman_client()
  1212. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1213. mock_client.update_spool = AsyncMock(side_effect=SpoolmanUnavailableError("Spoolman down"))
  1214. with (
  1215. patch(
  1216. "backend.app.services.spoolman.get_spoolman_client",
  1217. AsyncMock(return_value=mock_client),
  1218. ),
  1219. patch(
  1220. "backend.app.services.spoolman.init_spoolman_client",
  1221. AsyncMock(return_value=mock_client),
  1222. ),
  1223. ):
  1224. resp = await async_client.post(
  1225. f"{API}/scale/update-spool-weight",
  1226. json={"spool_id": 99, "weight_grams": 500},
  1227. )
  1228. assert resp.status_code == 503
  1229. @pytest.mark.asyncio
  1230. @pytest.mark.integration
  1231. async def test_local_mode_unchanged(self, async_client: AsyncClient, spool_factory):
  1232. """When Spoolman is NOT enabled, local DB update still works."""
  1233. spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
  1234. resp = await async_client.post(
  1235. f"{API}/scale/update-spool-weight",
  1236. json={"spool_id": spool.id, "weight_grams": 750},
  1237. )
  1238. assert resp.status_code == 200
  1239. assert resp.json()["weight_used"] == 500
  1240. @pytest.mark.asyncio
  1241. @pytest.mark.integration
  1242. async def test_spool_level_spool_weight_takes_priority(self, async_client: AsyncClient, spoolman_settings):
  1243. """spool.spool_weight overrides filament.spool_weight for tare calculation."""
  1244. sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0, spool_level_spool_weight=300)
  1245. mock_client = _mock_spoolman_client()
  1246. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1247. mock_client.update_spool = AsyncMock(return_value=sm_spool)
  1248. with (
  1249. patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
  1250. patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
  1251. ):
  1252. resp = await async_client.post(
  1253. f"{API}/scale/update-spool-weight",
  1254. json={"spool_id": 42, "weight_grams": 750},
  1255. )
  1256. assert resp.status_code == 200
  1257. # remaining = 750 - 300 = 450; weight_used = 1000 - 450 = 550
  1258. assert resp.json()["weight_used"] == pytest.approx(550.0)
  1259. mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(450.0))
  1260. @pytest.mark.asyncio
  1261. @pytest.mark.integration
  1262. async def test_spool_level_zero_spool_weight_not_treated_as_missing(
  1263. self, async_client: AsyncClient, spoolman_settings
  1264. ):
  1265. """spool.spool_weight=0 is valid (0g tare), not treated as missing/fallback."""
  1266. sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0, spool_level_spool_weight=0)
  1267. mock_client = _mock_spoolman_client()
  1268. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1269. mock_client.update_spool = AsyncMock(return_value=sm_spool)
  1270. with (
  1271. patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
  1272. patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
  1273. ):
  1274. resp = await async_client.post(
  1275. f"{API}/scale/update-spool-weight",
  1276. json={"spool_id": 42, "weight_grams": 750},
  1277. )
  1278. assert resp.status_code == 200
  1279. # remaining = 750 - 0 = 750; weight_used = 1000 - 750 = 250
  1280. assert resp.json()["weight_used"] == pytest.approx(250.0)
  1281. mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(750.0))
  1282. @pytest.mark.asyncio
  1283. @pytest.mark.integration
  1284. async def test_both_levels_none_uses_250g_fallback_and_warns(self, async_client: AsyncClient, spoolman_settings):
  1285. """When both spool_weight and filament.spool_weight are None, 250g fallback is used with a warning."""
  1286. sm_spool = {"id": 42, "filament": {"weight": 1000.0, "spool_weight": None}, "used_weight": 0.0}
  1287. mock_client = _mock_spoolman_client()
  1288. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1289. mock_client.update_spool = AsyncMock(return_value=sm_spool)
  1290. with (
  1291. patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
  1292. patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
  1293. ):
  1294. resp = await async_client.post(
  1295. f"{API}/scale/update-spool-weight",
  1296. json={"spool_id": 42, "weight_grams": 750},
  1297. )
  1298. assert resp.status_code == 200
  1299. # remaining = 750 - 250 = 500; weight_used = 1000 - 500 = 500
  1300. assert resp.json()["weight_used"] == pytest.approx(500.0)
  1301. assert resp.json().get("warnings")
  1302. class TestTagScannedSpoolmanFallback:
  1303. """nfc/tag-scanned falls back to Spoolman when local DB has no match."""
  1304. @pytest.mark.asyncio
  1305. @pytest.mark.integration
  1306. async def test_spoolman_fallback_on_local_miss(self, async_client: AsyncClient, spoolman_settings):
  1307. raw_spool = {
  1308. "id": 5,
  1309. "filament": {
  1310. "material": "PETG",
  1311. "name": "PETG Basic",
  1312. "color_hex": "00FF00",
  1313. "weight": 1000,
  1314. "vendor": {"name": "Polymaker"},
  1315. },
  1316. "used_weight": 100.0,
  1317. "archived": False,
  1318. "registered": "2024-01-01T00:00:00+00:00",
  1319. "extra": {"tag": '"DEADBEEF12345678"'},
  1320. }
  1321. mock_client = _mock_spoolman_client()
  1322. mock_client.get_spools = AsyncMock(return_value=[raw_spool])
  1323. mock_client.find_spool_by_tag = AsyncMock(return_value=raw_spool)
  1324. with (
  1325. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1326. patch(
  1327. "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
  1328. new_callable=AsyncMock,
  1329. return_value=None,
  1330. ),
  1331. patch(
  1332. "backend.app.services.spoolman.get_spoolman_client",
  1333. AsyncMock(return_value=mock_client),
  1334. ),
  1335. patch(
  1336. "backend.app.services.spoolman.init_spoolman_client",
  1337. AsyncMock(return_value=mock_client),
  1338. ),
  1339. ):
  1340. mock_ws.broadcast = AsyncMock()
  1341. resp = await async_client.post(
  1342. f"{API}/nfc/tag-scanned",
  1343. json={"device_id": "sb-1", "tag_uid": "DEADBEEF12345678"},
  1344. )
  1345. assert resp.status_code == 200
  1346. data = resp.json()
  1347. assert data["matched"] is True
  1348. assert data["spool_id"] == 5
  1349. mock_ws.broadcast.assert_called_once()
  1350. msg = mock_ws.broadcast.call_args[0][0]
  1351. assert msg["type"] == "spoolbuddy_tag_matched"
  1352. assert msg["spool"]["id"] == 5
  1353. assert msg["spool"]["material"] == "PETG"
  1354. @pytest.mark.asyncio
  1355. @pytest.mark.integration
  1356. async def test_spoolman_fallback_unknown_when_no_spoolman_match(self, async_client: AsyncClient, spoolman_settings):
  1357. """Unknown tag broadcast when both local DB and Spoolman miss."""
  1358. mock_client = _mock_spoolman_client()
  1359. mock_client.get_spools = AsyncMock(return_value=[])
  1360. mock_client.find_spool_by_tag = AsyncMock(return_value=None)
  1361. with (
  1362. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1363. patch(
  1364. "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
  1365. new_callable=AsyncMock,
  1366. return_value=None,
  1367. ),
  1368. patch(
  1369. "backend.app.services.spoolman.get_spoolman_client",
  1370. AsyncMock(return_value=mock_client),
  1371. ),
  1372. patch(
  1373. "backend.app.services.spoolman.init_spoolman_client",
  1374. AsyncMock(return_value=mock_client),
  1375. ),
  1376. ):
  1377. mock_ws.broadcast = AsyncMock()
  1378. resp = await async_client.post(
  1379. f"{API}/nfc/tag-scanned",
  1380. json={"device_id": "sb-1", "tag_uid": "UNKNOWN0000000FF"},
  1381. )
  1382. assert resp.status_code == 200
  1383. data = resp.json()
  1384. assert data["matched"] is False
  1385. assert data["spool_id"] is None
  1386. mock_ws.broadcast.assert_called_once()
  1387. msg = mock_ws.broadcast.call_args[0][0]
  1388. assert msg["type"] == "spoolbuddy_unknown_tag"
  1389. @pytest.mark.asyncio
  1390. @pytest.mark.integration
  1391. async def test_malformed_spoolman_data_degrades_gracefully(self, async_client: AsyncClient, spoolman_settings):
  1392. """ValueError from _map_spoolman_spool (e.g. spool_id=0) must return matched=False without broadcasting unknown_tag."""
  1393. bad_spool = {
  1394. "id": 0, # _map_spoolman_spool raises ValueError for id <= 0
  1395. "filament": {"material": "PLA", "name": "PLA Basic", "color_hex": "FF0000", "weight": 1000},
  1396. "used_weight": 0.0,
  1397. "archived": False,
  1398. "registered": "2024-01-01T00:00:00Z",
  1399. "extra": {"tag": '"DEADBEEF12345678"'},
  1400. }
  1401. mock_client = _mock_spoolman_client()
  1402. mock_client.find_spool_by_tag = AsyncMock(return_value=bad_spool)
  1403. with (
  1404. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1405. patch(
  1406. "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
  1407. new_callable=AsyncMock,
  1408. return_value=None,
  1409. ),
  1410. patch(
  1411. "backend.app.services.spoolman.get_spoolman_client",
  1412. AsyncMock(return_value=mock_client),
  1413. ),
  1414. patch(
  1415. "backend.app.services.spoolman.init_spoolman_client",
  1416. AsyncMock(return_value=mock_client),
  1417. ),
  1418. ):
  1419. mock_ws.broadcast = AsyncMock()
  1420. resp = await async_client.post(
  1421. f"{API}/nfc/tag-scanned",
  1422. json={"device_id": "sb-1", "tag_uid": "DEADBEEF12345678"},
  1423. )
  1424. assert resp.status_code == 200
  1425. data = resp.json()
  1426. assert data["matched"] is False
  1427. assert data["spool_id"] is None
  1428. # No broadcast: UI must not get a spurious unknown_tag event on Spoolman data errors
  1429. mock_ws.broadcast.assert_not_called()
  1430. @pytest.mark.asyncio
  1431. @pytest.mark.integration
  1432. async def test_local_match_skips_spoolman(self, async_client: AsyncClient, spool_factory):
  1433. """When local DB matches, Spoolman is never queried."""
  1434. spool = await spool_factory(tag_uid="AABB1122", material="PLA")
  1435. mock_spool = MagicMock()
  1436. mock_spool.id = spool.id
  1437. mock_spool.material = spool.material
  1438. mock_spool.subtype = spool.subtype
  1439. mock_spool.color_name = spool.color_name
  1440. mock_spool.rgba = spool.rgba
  1441. mock_spool.brand = spool.brand
  1442. mock_spool.label_weight = spool.label_weight
  1443. mock_spool.core_weight = spool.core_weight
  1444. mock_spool.weight_used = spool.weight_used
  1445. with (
  1446. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1447. patch(
  1448. "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
  1449. new_callable=AsyncMock,
  1450. return_value=mock_spool,
  1451. ),
  1452. ):
  1453. mock_ws.broadcast = AsyncMock()
  1454. resp = await async_client.post(
  1455. f"{API}/nfc/tag-scanned",
  1456. json={"device_id": "sb-1", "tag_uid": "AABB1122"},
  1457. )
  1458. assert resp.status_code == 200
  1459. data = resp.json()
  1460. assert data["matched"] is True
  1461. assert data["spool_id"] == spool.id
  1462. # ============================================================================
  1463. # NFC write-tag / write-result — Spoolman-aware
  1464. # ============================================================================
  1465. def _full_spoolman_spool(spool_id: int) -> dict:
  1466. """Complete Spoolman spool dict sufficient for NDEF encoding."""
  1467. return {
  1468. "id": spool_id,
  1469. "filament": {
  1470. "material": "PLA",
  1471. "name": "PLA Basic",
  1472. "color_hex": "FF0000",
  1473. "weight": 1000.0,
  1474. "spool_weight": 196.0,
  1475. "vendor": {"name": "Bambu Lab"},
  1476. },
  1477. "used_weight": 0.0,
  1478. "archived": False,
  1479. "registered": "2024-01-01T00:00:00Z",
  1480. }
  1481. class TestNfcWriteTagSpoolman:
  1482. """nfc/write-tag falls back to Spoolman when local DB has no matching spool."""
  1483. @pytest.mark.asyncio
  1484. @pytest.mark.integration
  1485. async def test_spoolman_spool_queued_when_local_miss(
  1486. self, async_client: AsyncClient, device_factory, spoolman_settings
  1487. ):
  1488. """write-tag encodes NDEF from Spoolman data when spool not in local DB."""
  1489. await device_factory(device_id="sb-write-sm")
  1490. sm_spool = _full_spoolman_spool(77)
  1491. mock_client = _mock_spoolman_client()
  1492. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1493. with (
  1494. patch(
  1495. "backend.app.services.spoolman.get_spoolman_client",
  1496. AsyncMock(return_value=mock_client),
  1497. ),
  1498. patch(
  1499. "backend.app.services.spoolman.init_spoolman_client",
  1500. AsyncMock(return_value=mock_client),
  1501. ),
  1502. ):
  1503. resp = await async_client.post(
  1504. f"{API}/nfc/write-tag",
  1505. json={"device_id": "sb-write-sm", "spool_id": 77},
  1506. )
  1507. assert resp.status_code == 200
  1508. assert resp.json()["status"] == "queued"
  1509. mock_client.get_spool.assert_called_once_with(77)
  1510. @pytest.mark.asyncio
  1511. @pytest.mark.integration
  1512. async def test_data_origin_spoolman_stored_in_payload(
  1513. self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
  1514. ):
  1515. """Pending write payload records data_origin=spoolman for Spoolman spools."""
  1516. import json as _json
  1517. device = await device_factory(device_id="sb-origin")
  1518. sm_spool = _full_spoolman_spool(88)
  1519. mock_client = _mock_spoolman_client()
  1520. mock_client.get_spool = AsyncMock(return_value=sm_spool)
  1521. with (
  1522. patch(
  1523. "backend.app.services.spoolman.get_spoolman_client",
  1524. AsyncMock(return_value=mock_client),
  1525. ),
  1526. patch(
  1527. "backend.app.services.spoolman.init_spoolman_client",
  1528. AsyncMock(return_value=mock_client),
  1529. ),
  1530. ):
  1531. await async_client.post(
  1532. f"{API}/nfc/write-tag",
  1533. json={"device_id": "sb-origin", "spool_id": 88},
  1534. )
  1535. await db_session.refresh(device)
  1536. payload = _json.loads(device.pending_write_payload)
  1537. assert payload["data_origin"] == "spoolman"
  1538. assert payload["spool_id"] == 88
  1539. assert "ndef_data_hex" in payload
  1540. @pytest.mark.asyncio
  1541. @pytest.mark.integration
  1542. async def test_404_when_neither_local_nor_spoolman(
  1543. self, async_client: AsyncClient, device_factory, spoolman_settings
  1544. ):
  1545. """404 returned when spool is missing from both local DB and Spoolman."""
  1546. await device_factory(device_id="sb-miss")
  1547. mock_client = _mock_spoolman_client()
  1548. mock_client.get_spool = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 9999 not found"))
  1549. with (
  1550. patch(
  1551. "backend.app.services.spoolman.get_spoolman_client",
  1552. AsyncMock(return_value=mock_client),
  1553. ),
  1554. patch(
  1555. "backend.app.services.spoolman.init_spoolman_client",
  1556. AsyncMock(return_value=mock_client),
  1557. ),
  1558. ):
  1559. resp = await async_client.post(
  1560. f"{API}/nfc/write-tag",
  1561. json={"device_id": "sb-miss", "spool_id": 9999},
  1562. )
  1563. assert resp.status_code == 404
  1564. @pytest.mark.asyncio
  1565. @pytest.mark.integration
  1566. async def test_local_spool_used_when_present(self, async_client: AsyncClient, device_factory, spool_factory):
  1567. """Local DB spool is encoded directly without contacting Spoolman."""
  1568. await device_factory(device_id="sb-local-write")
  1569. spool = await spool_factory(material="PETG")
  1570. resp = await async_client.post(
  1571. f"{API}/nfc/write-tag",
  1572. json={"device_id": "sb-local-write", "spool_id": spool.id},
  1573. )
  1574. assert resp.status_code == 200
  1575. assert resp.json()["status"] == "queued"
  1576. class TestNfcWriteResultSpoolman:
  1577. """nfc/write-result updates Spoolman extra.tag on success for Spoolman spools."""
  1578. @pytest.mark.asyncio
  1579. @pytest.mark.integration
  1580. async def test_success_updates_spoolman_extra_tag(
  1581. self, async_client: AsyncClient, device_factory, spoolman_settings
  1582. ):
  1583. """Successful write for a Spoolman spool calls merge_spool_extra with extra.tag."""
  1584. import json as _json
  1585. await device_factory(
  1586. device_id="sb-wr-sm",
  1587. pending_command="write_tag",
  1588. pending_write_payload=_json.dumps({"spool_id": 55, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
  1589. )
  1590. mock_client = _mock_spoolman_client()
  1591. mock_client.merge_spool_extra = AsyncMock(return_value={"id": 55})
  1592. with (
  1593. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1594. patch(
  1595. "backend.app.services.spoolman.get_spoolman_client",
  1596. AsyncMock(return_value=mock_client),
  1597. ),
  1598. patch(
  1599. "backend.app.services.spoolman.init_spoolman_client",
  1600. AsyncMock(return_value=mock_client),
  1601. ),
  1602. ):
  1603. mock_ws.broadcast = AsyncMock()
  1604. resp = await async_client.post(
  1605. f"{API}/nfc/write-result",
  1606. json={
  1607. "device_id": "sb-wr-sm",
  1608. "spool_id": 55,
  1609. "tag_uid": "AABBCCDD11223344",
  1610. "success": True,
  1611. },
  1612. )
  1613. assert resp.status_code == 200
  1614. mock_client.merge_spool_extra.assert_called_once_with(55, {"tag": '"AABBCCDD11223344"'})
  1615. msg = mock_ws.broadcast.call_args[0][0]
  1616. assert msg["type"] == "spoolbuddy_tag_written"
  1617. assert msg["tag_uid"] == "AABBCCDD11223344"
  1618. @pytest.mark.asyncio
  1619. @pytest.mark.integration
  1620. async def test_failure_does_not_call_spoolman(self, async_client: AsyncClient, device_factory, spoolman_settings):
  1621. """Failed write never calls Spoolman update."""
  1622. import json as _json
  1623. await device_factory(
  1624. device_id="sb-wr-fail",
  1625. pending_command="write_tag",
  1626. pending_write_payload=_json.dumps({"spool_id": 66, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
  1627. )
  1628. mock_client = _mock_spoolman_client()
  1629. with (
  1630. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1631. patch(
  1632. "backend.app.services.spoolman.get_spoolman_client",
  1633. AsyncMock(return_value=mock_client),
  1634. ),
  1635. ):
  1636. mock_ws.broadcast = AsyncMock()
  1637. resp = await async_client.post(
  1638. f"{API}/nfc/write-result",
  1639. json={
  1640. "device_id": "sb-wr-fail",
  1641. "spool_id": 66,
  1642. "tag_uid": "AABBCCDD11223344",
  1643. "success": False,
  1644. "message": "write timeout",
  1645. },
  1646. )
  1647. assert resp.status_code == 200
  1648. mock_client.update_spool.assert_not_called()
  1649. msg = mock_ws.broadcast.call_args[0][0]
  1650. assert msg["type"] == "spoolbuddy_tag_write_failed"
  1651. @pytest.mark.asyncio
  1652. @pytest.mark.integration
  1653. async def test_success_local_spool_writes_to_db(
  1654. self, async_client: AsyncClient, device_factory, spool_factory, db_session
  1655. ):
  1656. """Successful write for a local spool still updates local DB tag_uid."""
  1657. import json as _json
  1658. spool = await spool_factory()
  1659. await device_factory(
  1660. device_id="sb-wr-local",
  1661. pending_command="write_tag",
  1662. pending_write_payload=_json.dumps(
  1663. {"spool_id": spool.id, "ndef_data_hex": "deadbeef", "data_origin": "local"}
  1664. ),
  1665. )
  1666. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  1667. mock_ws.broadcast = AsyncMock()
  1668. resp = await async_client.post(
  1669. f"{API}/nfc/write-result",
  1670. json={
  1671. "device_id": "sb-wr-local",
  1672. "spool_id": spool.id,
  1673. "tag_uid": "DEADBEEF12345678",
  1674. "success": True,
  1675. },
  1676. )
  1677. assert resp.status_code == 200
  1678. await db_session.refresh(spool)
  1679. assert spool.tag_uid == "DEADBEEF12345678"
  1680. assert spool.tag_type == "ntag"
  1681. # ============================================================================
  1682. # Security fix tests — write-tag ValueError + write-result exception safety
  1683. # ============================================================================
  1684. class TestNfcWriteTagSpoolmanSecurityFixes:
  1685. """Regression tests for security fixes in nfc/write-tag Spoolman path."""
  1686. @pytest.mark.asyncio
  1687. @pytest.mark.integration
  1688. async def test_invalid_spoolman_spool_id_returns_502(
  1689. self, async_client: AsyncClient, device_factory, spoolman_settings
  1690. ):
  1691. """Malformed Spoolman spool (invalid id=0) raises 502, not 404 — spool exists but is bad data."""
  1692. await device_factory(device_id="sb-invalid-id")
  1693. # Spoolman returns spool with id=0 (invalid — caught by _map_spoolman_spool guard)
  1694. bad_spool = {**_full_spoolman_spool(1), "id": 0}
  1695. mock_client = _mock_spoolman_client()
  1696. mock_client.get_spool = AsyncMock(return_value=bad_spool)
  1697. with (
  1698. patch(
  1699. "backend.app.services.spoolman.get_spoolman_client",
  1700. AsyncMock(return_value=mock_client),
  1701. ),
  1702. patch(
  1703. "backend.app.services.spoolman.init_spoolman_client",
  1704. AsyncMock(return_value=mock_client),
  1705. ),
  1706. ):
  1707. resp = await async_client.post(
  1708. f"{API}/nfc/write-tag",
  1709. json={"device_id": "sb-invalid-id", "spool_id": 99},
  1710. )
  1711. # 502: spool exists in Spoolman but its data is malformed — not a "not found"
  1712. assert resp.status_code == 502
  1713. @pytest.mark.asyncio
  1714. @pytest.mark.integration
  1715. async def test_oversized_label_weight_does_not_crash(
  1716. self, async_client: AsyncClient, device_factory, spoolman_settings
  1717. ):
  1718. """label_weight > 65535 from Spoolman must not crash with struct.error."""
  1719. await device_factory(device_id="sb-overflow")
  1720. big_weight_spool = {
  1721. **_full_spoolman_spool(42),
  1722. "filament": {**_full_spoolman_spool(42)["filament"], "weight": 70000},
  1723. }
  1724. mock_client = _mock_spoolman_client()
  1725. mock_client.get_spool = AsyncMock(return_value=big_weight_spool)
  1726. with (
  1727. patch(
  1728. "backend.app.services.spoolman.get_spoolman_client",
  1729. AsyncMock(return_value=mock_client),
  1730. ),
  1731. patch(
  1732. "backend.app.services.spoolman.init_spoolman_client",
  1733. AsyncMock(return_value=mock_client),
  1734. ),
  1735. ):
  1736. resp = await async_client.post(
  1737. f"{API}/nfc/write-tag",
  1738. json={"device_id": "sb-overflow", "spool_id": 42},
  1739. )
  1740. assert resp.status_code == 200
  1741. assert resp.json()["status"] == "queued"
  1742. class TestNfcWriteResultSpoolmanSecurityFixes:
  1743. """Regression tests for transaction safety in nfc/write-result Spoolman path."""
  1744. @pytest.mark.asyncio
  1745. @pytest.mark.integration
  1746. async def test_spoolman_client_exception_still_clears_device_state(
  1747. self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
  1748. ):
  1749. """If Spoolman client raises, device pending_command is still cleared in DB."""
  1750. import json as _json
  1751. device = await device_factory(
  1752. device_id="sb-exc-safe",
  1753. pending_command="write_tag",
  1754. pending_write_payload=_json.dumps({"spool_id": 77, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
  1755. )
  1756. mock_client = _mock_spoolman_client()
  1757. mock_client.merge_spool_extra = AsyncMock(side_effect=Exception("connection refused"))
  1758. with (
  1759. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1760. patch(
  1761. "backend.app.services.spoolman.get_spoolman_client",
  1762. AsyncMock(return_value=mock_client),
  1763. ),
  1764. patch(
  1765. "backend.app.services.spoolman.init_spoolman_client",
  1766. AsyncMock(return_value=mock_client),
  1767. ),
  1768. ):
  1769. mock_ws.broadcast = AsyncMock()
  1770. resp = await async_client.post(
  1771. f"{API}/nfc/write-result",
  1772. json={
  1773. "device_id": "sb-exc-safe",
  1774. "spool_id": 77,
  1775. "tag_uid": "AABBCCDD11223344",
  1776. "success": True,
  1777. },
  1778. )
  1779. # 502: tag written to NFC but Spoolman link failed (not best-effort — caller must retry)
  1780. assert resp.status_code == 502
  1781. # Device state must be cleared despite the exception (no spurious re-write)
  1782. await db_session.refresh(device)
  1783. assert device.pending_command is None
  1784. assert device.pending_write_payload is None
  1785. # Failure broadcast fires so the UI can show the error
  1786. msg = mock_ws.broadcast.call_args[0][0]
  1787. assert msg["type"] == "spoolbuddy_tag_link_failed"
  1788. @pytest.mark.asyncio
  1789. @pytest.mark.integration
  1790. async def test_spoolman_not_found_error_broadcasts_link_failed(
  1791. self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
  1792. ):
  1793. """SpoolmanNotFoundError from merge_spool_extra must clear device state and broadcast link_failed."""
  1794. import json as _json
  1795. device = await device_factory(
  1796. device_id="sb-notfound",
  1797. pending_command="write_tag",
  1798. pending_write_payload=_json.dumps({"spool_id": 55, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
  1799. )
  1800. mock_client = _mock_spoolman_client()
  1801. mock_client.merge_spool_extra = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 55 not found"))
  1802. with (
  1803. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  1804. patch(
  1805. "backend.app.services.spoolman.get_spoolman_client",
  1806. AsyncMock(return_value=mock_client),
  1807. ),
  1808. patch(
  1809. "backend.app.services.spoolman.init_spoolman_client",
  1810. AsyncMock(return_value=mock_client),
  1811. ),
  1812. ):
  1813. mock_ws.broadcast = AsyncMock()
  1814. resp = await async_client.post(
  1815. f"{API}/nfc/write-result",
  1816. json={
  1817. "device_id": "sb-notfound",
  1818. "spool_id": 55,
  1819. "tag_uid": "AABBCCDD11223344",
  1820. "success": True,
  1821. },
  1822. )
  1823. assert resp.status_code == 502
  1824. await db_session.refresh(device)
  1825. assert device.pending_command is None
  1826. assert device.pending_write_payload is None
  1827. msg = mock_ws.broadcast.call_args[0][0]
  1828. assert msg["type"] == "spoolbuddy_tag_link_failed"
  1829. assert msg["spool_id"] == 55
  1830. class TestNfcWriteResultOrphanedSpool:
  1831. """nfc/write-result when the local spool was deleted between write-queue and write-result."""
  1832. @pytest.mark.asyncio
  1833. @pytest.mark.integration
  1834. async def test_local_spool_deleted_before_write_back(self, async_client: AsyncClient, device_factory, db_session):
  1835. """When local spool is deleted between write-queue and write-result, return linked=False and broadcast link_failed."""
  1836. import json as _json
  1837. device = await device_factory(
  1838. device_id="sb-orphan",
  1839. pending_command="write_tag",
  1840. pending_write_payload=_json.dumps(
  1841. {
  1842. "spool_id": 99999, # non-existent spool
  1843. "ndef_data_hex": "aabbccdd",
  1844. "data_origin": "local",
  1845. }
  1846. ),
  1847. )
  1848. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  1849. mock_ws.broadcast = AsyncMock()
  1850. resp = await async_client.post(
  1851. f"{API}/nfc/write-result",
  1852. json={"device_id": device.device_id, "spool_id": 99999, "success": True, "tag_uid": "AABBCCDD"},
  1853. )
  1854. assert resp.status_code == 200
  1855. data = resp.json()
  1856. assert data["linked"] is False
  1857. # pending command should be cleared
  1858. await db_session.refresh(device)
  1859. assert device.pending_command is None
  1860. # broadcast should be spoolbuddy_tag_link_failed
  1861. broadcast_calls = mock_ws.broadcast.call_args_list
  1862. link_failed = [c[0][0] for c in broadcast_calls if c[0][0].get("type") == "spoolbuddy_tag_link_failed"]
  1863. assert len(link_failed) >= 1
  1864. class TestNfcWriteResultInputValidation:
  1865. """Input validation and JSON safety for nfc/write-result."""
  1866. @pytest.mark.asyncio
  1867. @pytest.mark.integration
  1868. async def test_tag_uid_too_long_rejected(self, async_client: AsyncClient, device_factory):
  1869. """tag_uid longer than 32 chars must be rejected with 422."""
  1870. import json as _json
  1871. await device_factory(
  1872. device_id="sb-uid-long",
  1873. pending_command="write_tag",
  1874. pending_write_payload=_json.dumps({"spool_id": 1, "ndef_data_hex": "dead", "data_origin": "local"}),
  1875. )
  1876. resp = await async_client.post(
  1877. f"{API}/nfc/write-result",
  1878. json={
  1879. "device_id": "sb-uid-long",
  1880. "spool_id": 1,
  1881. "tag_uid": "A" * 65,
  1882. "success": True,
  1883. },
  1884. )
  1885. assert resp.status_code == 422
  1886. @pytest.mark.asyncio
  1887. @pytest.mark.integration
  1888. async def test_malformed_pending_payload_falls_back_to_local(
  1889. self, async_client: AsyncClient, device_factory, spool_factory, db_session
  1890. ):
  1891. """Corrupted pending_write_payload JSON falls back to local mode gracefully."""
  1892. spool = await spool_factory()
  1893. await device_factory(
  1894. device_id="sb-corrupt-json",
  1895. pending_command="write_tag",
  1896. pending_write_payload="{not valid json!!!",
  1897. )
  1898. with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
  1899. mock_ws.broadcast = AsyncMock()
  1900. resp = await async_client.post(
  1901. f"{API}/nfc/write-result",
  1902. json={
  1903. "device_id": "sb-corrupt-json",
  1904. "spool_id": spool.id,
  1905. "tag_uid": "DEADBEEF12345678",
  1906. "success": True,
  1907. },
  1908. )
  1909. # Must return 200, not 500
  1910. assert resp.status_code == 200
  1911. # Falls back to local mode — tag written to DB
  1912. await db_session.refresh(spool)
  1913. assert spool.tag_uid == "DEADBEEF12345678"
  1914. # ============================================================================
  1915. # B1: NFC write-tag warnings appear in response body
  1916. # ============================================================================
  1917. class TestNfcWriteTagWarningsBody:
  1918. """B1: resp.json()['warnings'] is populated when Spoolman fields are absent."""
  1919. @pytest.mark.asyncio
  1920. @pytest.mark.integration
  1921. async def test_warnings_returned_for_missing_color_and_temp(
  1922. self, async_client: AsyncClient, device_factory, spoolman_settings
  1923. ):
  1924. """Both color_name=None and settings_extruder_temp=None produce 2 warnings."""
  1925. await device_factory(device_id="sb-warn-b1")
  1926. # Spoolman spool with no color_name or nozzle temp
  1927. sparse_spool = {
  1928. "id": 99,
  1929. "filament": {
  1930. "material": "PLA",
  1931. "name": "PLA Basic",
  1932. "color_hex": "808080",
  1933. # color_name absent → None after mapping
  1934. # settings_extruder_temp absent → nozzle_temp_min=None
  1935. "weight": 1000.0,
  1936. "vendor": {"name": "Bambu Lab"},
  1937. },
  1938. "used_weight": 0.0,
  1939. "archived": False,
  1940. "registered": "2024-01-01T00:00:00Z",
  1941. }
  1942. mock_client = _mock_spoolman_client()
  1943. mock_client.get_spool = AsyncMock(return_value=sparse_spool)
  1944. with (
  1945. patch(
  1946. "backend.app.services.spoolman.get_spoolman_client",
  1947. AsyncMock(return_value=mock_client),
  1948. ),
  1949. patch(
  1950. "backend.app.services.spoolman.init_spoolman_client",
  1951. AsyncMock(return_value=mock_client),
  1952. ),
  1953. ):
  1954. resp = await async_client.post(
  1955. f"{API}/nfc/write-tag",
  1956. json={"device_id": "sb-warn-b1", "spool_id": 99},
  1957. )
  1958. assert resp.status_code == 200
  1959. body = resp.json()
  1960. assert "warnings" in body, "Response should contain 'warnings' key when fields are absent"
  1961. warnings = body["warnings"]
  1962. assert len(warnings) >= 2, f"Expected at least 2 warnings for missing color_name + nozzle_temp, got: {warnings}"
  1963. # Confirm the specific fields are mentioned
  1964. warn_text = " ".join(warnings)
  1965. assert "color_name" in warn_text
  1966. assert "nozzle_temp" in warn_text
  1967. @pytest.mark.asyncio
  1968. @pytest.mark.integration
  1969. async def test_no_warnings_key_when_all_fields_present(
  1970. self, async_client: AsyncClient, device_factory, spoolman_settings
  1971. ):
  1972. """No 'warnings' key in response when all fields are populated."""
  1973. await device_factory(device_id="sb-nowarn")
  1974. full_spool = _full_spoolman_spool(100)
  1975. # Add color_name and extruder temp
  1976. full_spool["filament"]["color_name"] = "Red"
  1977. full_spool["filament"]["settings_extruder_temp"] = 220
  1978. mock_client = _mock_spoolman_client()
  1979. mock_client.get_spool = AsyncMock(return_value=full_spool)
  1980. with (
  1981. patch(
  1982. "backend.app.services.spoolman.get_spoolman_client",
  1983. AsyncMock(return_value=mock_client),
  1984. ),
  1985. patch(
  1986. "backend.app.services.spoolman.init_spoolman_client",
  1987. AsyncMock(return_value=mock_client),
  1988. ),
  1989. ):
  1990. resp = await async_client.post(
  1991. f"{API}/nfc/write-tag",
  1992. json={"device_id": "sb-nowarn", "spool_id": 100},
  1993. )
  1994. assert resp.status_code == 200
  1995. body = resp.json()
  1996. assert "warnings" not in body or body["warnings"] == []
  1997. # ============================================================================
  1998. # B5: Exception text scrubbed from WebSocket broadcast message
  1999. # ============================================================================
  2000. class TestNfcWriteResultExceptionScrubbing:
  2001. """B5: Internal exception details must not appear in WebSocket 'message' field."""
  2002. @pytest.mark.asyncio
  2003. @pytest.mark.integration
  2004. async def test_exception_text_not_leaked_in_ws_message(
  2005. self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
  2006. ):
  2007. """When Spoolman merge raises, WS message is generic; 'connection refused' absent."""
  2008. import json as _json
  2009. await device_factory(
  2010. device_id="sb-scrub-b5",
  2011. pending_command="write_tag",
  2012. pending_write_payload=_json.dumps({"spool_id": 77, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
  2013. )
  2014. mock_client = _mock_spoolman_client()
  2015. mock_client.merge_spool_extra = AsyncMock(side_effect=Exception("connection refused to 192.168.1.1:7912"))
  2016. with (
  2017. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  2018. patch(
  2019. "backend.app.services.spoolman.get_spoolman_client",
  2020. AsyncMock(return_value=mock_client),
  2021. ),
  2022. patch(
  2023. "backend.app.services.spoolman.init_spoolman_client",
  2024. AsyncMock(return_value=mock_client),
  2025. ),
  2026. ):
  2027. mock_ws.broadcast = AsyncMock()
  2028. resp = await async_client.post(
  2029. f"{API}/nfc/write-result",
  2030. json={
  2031. "device_id": "sb-scrub-b5",
  2032. "spool_id": 77,
  2033. "tag_uid": "AABBCCDD11223344",
  2034. "success": True,
  2035. },
  2036. )
  2037. assert resp.status_code == 502
  2038. msg = mock_ws.broadcast.call_args[0][0]
  2039. assert msg["type"] == "spoolbuddy_tag_link_failed"
  2040. # Generic message — no internal exception details leaked
  2041. assert msg["message"] == "Spoolman link failed", f"Expected generic message but got: {msg['message']!r}"
  2042. assert "connection refused" not in str(msg), f"Exception text must not appear in WS message: {msg}"
  2043. assert "192.168.1" not in str(msg), f"Internal IP must not appear in WS message: {msg}"
  2044. # ============================================================================
  2045. # _get_spoolman_client_or_none: graceful degradation on ValueError during reinit
  2046. # ============================================================================
  2047. class TestSpoolmanClientOrNoneGraceful:
  2048. """_get_spoolman_client_or_none returns None when init_spoolman_client raises ValueError."""
  2049. @pytest.mark.asyncio
  2050. @pytest.mark.integration
  2051. async def test_returns_none_when_init_raises_value_error(self, async_client: AsyncClient, db_session):
  2052. """_get_spoolman_client_or_none returns None when init_spoolman_client raises ValueError,
  2053. so the device endpoint degrades gracefully instead of propagating a 500 error."""
  2054. from backend.app.models.settings import Settings
  2055. db_session.add(Settings(key="spoolman_enabled", value="true"))
  2056. db_session.add(Settings(key="spoolman_url", value="http://spoolman.local:7912"))
  2057. await db_session.commit()
  2058. with (
  2059. patch("backend.app.api.routes._spoolman_helpers.assert_safe_spoolman_url"),
  2060. patch(
  2061. "backend.app.services.spoolman.get_spoolman_client",
  2062. AsyncMock(return_value=None),
  2063. ),
  2064. patch(
  2065. "backend.app.services.spoolman.init_spoolman_client",
  2066. AsyncMock(side_effect=ValueError("invalid URL")),
  2067. ),
  2068. patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
  2069. ):
  2070. mock_ws.broadcast = AsyncMock()
  2071. # nfc/tag-scanned calls _get_spoolman_client_or_none; with None returned it
  2072. # must broadcast unknown_tag (not raise 500 due to ValueError propagating).
  2073. resp = await async_client.post(
  2074. f"{API}/nfc/tag-scanned",
  2075. json={"device_id": "sb-vale", "tag_uid": "AABBCCDD"},
  2076. )
  2077. # Must not be 500 — ValueError is caught and client returns None, degrading gracefully
  2078. assert resp.status_code == 200
  2079. data = resp.json()
  2080. assert data["matched"] is False