| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441 |
- """Integration tests for SpoolBuddy API endpoints."""
- from datetime import datetime, timedelta, timezone
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from httpx import AsyncClient
- from sqlalchemy.ext.asyncio import AsyncSession
- import backend.app.services.spoolbuddy_ssh # noqa: F401 — ensures patch() can resolve the dotted path
- from backend.app.api.routes import spoolbuddy as spoolbuddy_routes
- from backend.app.models.spool import Spool
- from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
- from backend.app.services.spoolman import SpoolmanNotFoundError, SpoolmanUnavailableError
- API = "/api/v1/spoolbuddy"
- @pytest.fixture
- def device_factory(db_session: AsyncSession):
- """Factory to create SpoolBuddyDevice records."""
- _counter = [0]
- async def _create(**kwargs):
- _counter[0] += 1
- n = _counter[0]
- defaults = {
- "device_id": f"sb-{n:04d}",
- "hostname": f"spoolbuddy-{n}",
- "ip_address": f"10.0.0.{n}",
- "firmware_version": "1.0.0",
- "has_nfc": True,
- "has_scale": True,
- "tare_offset": 0,
- "calibration_factor": 1.0,
- "last_seen": datetime.now(timezone.utc),
- }
- defaults.update(kwargs)
- device = SpoolBuddyDevice(**defaults)
- db_session.add(device)
- await db_session.commit()
- await db_session.refresh(device)
- return device
- return _create
- @pytest.fixture
- def spool_factory(db_session: AsyncSession):
- """Factory to create Spool records."""
- _counter = [0]
- async def _create(**kwargs):
- _counter[0] += 1
- defaults = {
- "material": "PLA",
- "subtype": "Basic",
- "brand": "Polymaker",
- "color_name": "Red",
- "rgba": "FF0000FF",
- "label_weight": 1000,
- "core_weight": 250,
- "weight_used": 0,
- }
- defaults.update(kwargs)
- spool = Spool(**defaults)
- db_session.add(spool)
- await db_session.commit()
- await db_session.refresh(spool)
- return spool
- return _create
- # ============================================================================
- # Device endpoints
- # ============================================================================
- class TestDeviceEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_register_new_device(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/register",
- json={
- "device_id": "sb-new",
- "hostname": "spoolbuddy-new",
- "ip_address": "10.0.0.99",
- "firmware_version": "1.2.0",
- },
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["device_id"] == "sb-new"
- assert data["hostname"] == "spoolbuddy-new"
- assert data["online"] is True
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_re_register_existing_device(self, async_client: AsyncClient, device_factory):
- device = await device_factory(
- device_id="sb-exist",
- tare_offset=12345,
- calibration_factor=0.0042,
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/register",
- json={
- "device_id": "sb-exist",
- "hostname": "updated-host",
- "ip_address": "10.0.0.200",
- "firmware_version": "2.0.0",
- },
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["id"] == device.id
- assert data["hostname"] == "updated-host"
- assert data["ip_address"] == "10.0.0.200"
- assert data["firmware_version"] == "2.0.0"
- # Calibration preserved on re-register
- assert data["tare_offset"] == 12345
- assert data["calibration_factor"] == pytest.approx(0.0042)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_devices_empty(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- assert resp.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_devices(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-a", hostname="alpha")
- await device_factory(device_id="sb-b", hostname="beta")
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- devices = resp.json()
- assert len(devices) == 2
- hostnames = {d["hostname"] for d in devices}
- assert hostnames == {"alpha", "beta"}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_unregister_device(self, async_client: AsyncClient, device_factory, db_session):
- await device_factory(device_id="sb-keep", hostname="keep")
- await device_factory(device_id="sb-drop", hostname="drop")
- spoolbuddy_routes._spoolbuddy_online_last_broadcast["sb-drop"] = 123.0
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.delete(f"{API}/devices/sb-drop")
- assert resp.status_code == 200
- assert resp.json() == {"status": "deleted", "device_id": "sb-drop"}
- assert "sb-drop" not in spoolbuddy_routes._spoolbuddy_online_last_broadcast
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_unregistered"
- assert msg["device_id"] == "sb-drop"
- # Other device still present
- resp = await async_client.get(f"{API}/devices")
- remaining = {d["device_id"] for d in resp.json()}
- assert remaining == {"sb-keep"}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_unregister_device_not_found(self, async_client: AsyncClient):
- resp = await async_client.delete(f"{API}/devices/sb-ghost")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_updates_status(self, async_client: AsyncClient, device_factory):
- device = await device_factory(device_id="sb-hb")
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 600},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == device.tare_offset
- assert data["calibration_factor"] == pytest.approx(device.calibration_factor)
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-hb"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_returns_ssh_public_key(self, async_client: AsyncClient, device_factory):
- """Heartbeat response carries the current SSH public key so the daemon
- can re-deploy it whenever Bambuddy's keypair rotates without waiting
- for a service restart."""
- await device_factory(device_id="sb-ssh-hb")
- fake_key = "ssh-ed25519 AAAATESTKEY bambuddy-spoolbuddy"
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolbuddy_ssh.get_public_key",
- AsyncMock(return_value=fake_key),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-ssh-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
- )
- assert resp.status_code == 200
- assert resp.json()["ssh_public_key"] == fake_key
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_ssh_key_failure_does_not_break_heartbeat(self, async_client: AsyncClient, device_factory):
- """If the backend can't read its own SSH key, the heartbeat must still
- succeed — telemetry/commands are far more critical than key sync."""
- await device_factory(device_id="sb-ssh-fail")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolbuddy_ssh.get_public_key",
- AsyncMock(side_effect=OSError("disk full")),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-ssh-fail/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
- )
- assert resp.status_code == 200
- assert resp.json()["ssh_public_key"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_returns_pending_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-cmd", pending_command="tare")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-cmd/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert resp.status_code == 200
- assert resp.json()["pending_command"] == "tare"
- # Second heartbeat should have no pending command (cleared)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp2 = await async_client.post(
- f"{API}/devices/sb-cmd/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
- )
- assert resp2.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_unknown_device_404(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/nonexistent/heartbeat",
- json={"nfc_ok": False, "scale_ok": False, "uptime_s": 0},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_broadcasts_online_when_was_offline(self, async_client: AsyncClient, device_factory):
- # Create device with last_seen far in the past (offline)
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-offline",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-offline/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
- )
- assert resp.status_code == 200
- # Should broadcast online since device was offline
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-offline"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_broadcasts_online_when_already_online(self, async_client: AsyncClient, device_factory):
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-already-online",
- last_seen=datetime.now(timezone.utc),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-already-online/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 42},
- )
- assert resp.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-already-online"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_online_broadcast_is_throttled(self, async_client: AsyncClient, device_factory):
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-throttle",
- last_seen=datetime.now(timezone.utc),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp1 = await async_client.post(
- f"{API}/devices/sb-throttle/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- resp2 = await async_client.post(
- f"{API}/devices/sb-throttle/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 11},
- )
- assert resp1.status_code == 200
- assert resp2.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-throttle"
- # ============================================================================
- # NFC endpoints
- # ============================================================================
- class TestNfcEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_scanned_matched(self, async_client: AsyncClient, spool_factory):
- spool = await spool_factory(tag_uid="AABB1122", material="PLA")
- mock_spool = MagicMock()
- mock_spool.id = spool.id
- mock_spool.material = spool.material
- mock_spool.subtype = spool.subtype
- mock_spool.color_name = spool.color_name
- mock_spool.rgba = spool.rgba
- mock_spool.brand = spool.brand
- mock_spool.label_weight = spool.label_weight
- mock_spool.core_weight = spool.core_weight
- mock_spool.weight_used = spool.weight_used
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_lookup.return_value = mock_spool
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- assert data["spool_id"] == spool.id
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_matched"
- assert msg["spool"]["id"] == spool.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_scanned_unmatched(self, async_client: AsyncClient):
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_lookup.return_value = None
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "DEADBEEF"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
- assert data["spool_id"] is None
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_unknown_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_removed(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-removed",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_removed"
- assert msg["device_id"] == "sb-1"
- assert msg["tag_uid"] == "AABB1122"
- # ============================================================================
- # NFC write-tag endpoints
- # ============================================================================
- class TestWriteTagEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_queues_command(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-wt")
- spool = await spool_factory(material="PLA", brand="Polymaker", color_name="Red", rgba="FF0000FF")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- # Verify heartbeat returns write_tag command with payload
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- hb_data = hb.json()
- assert hb_data["pending_command"] == "write_tag"
- assert hb_data["pending_write_payload"] is not None
- assert hb_data["pending_write_payload"]["spool_id"] == spool.id
- assert "ndef_data_hex" in hb_data["pending_write_payload"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_heartbeat_not_cleared(self, async_client: AsyncClient, device_factory, spool_factory):
- """write_tag command persists across heartbeats until write-result clears it."""
- device = await device_factory(device_id="sb-wt-persist")
- spool = await spool_factory(material="PETG")
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- # First heartbeat — command present
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb1 = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb1.json()["pending_command"] == "write_tag"
- # Second heartbeat — should still be present (not cleared like tare)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb2 = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
- )
- assert hb2.json()["pending_command"] == "write_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_missing_spool_404(self, async_client: AsyncClient, device_factory):
- device = await device_factory(device_id="sb-wt-nospool")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": 99999},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_missing_device_404(self, async_client: AsyncClient, spool_factory):
- spool = await spool_factory()
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "nonexistent", "spool_id": spool.id},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_success_links_tag(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-wr", pending_command="write_tag")
- spool = await spool_factory(material="PLA", tag_uid=None)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "04AABB11223344",
- "success": True,
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_written"
- assert msg["spool_id"] == spool.id
- assert msg["tag_uid"] == "04AABB11223344"
- # Verify spool got tag linked
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- spool_data = spool_resp.json()
- assert spool_data["tag_uid"] == "04AABB11223344"
- assert spool_data["tag_type"] == "ntag"
- assert spool_data["data_origin"] == "opentag3d"
- assert spool_data["encode_time"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_failure_broadcasts_error(
- self, async_client: AsyncClient, device_factory, spool_factory
- ):
- device = await device_factory(device_id="sb-wr-fail", pending_command="write_tag")
- spool = await spool_factory(material="PLA", tag_uid=None)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "04AABBCC",
- "success": False,
- "message": "Write or verification failed",
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_write_failed"
- assert msg["message"] == "Write or verification failed"
- # Verify spool NOT linked
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- assert spool_resp.json()["tag_uid"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_clears_pending_command(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(
- device_id="sb-wr-clear",
- pending_command="write_tag",
- pending_write_payload='{"spool_id": 1, "ndef_data_hex": "E110120003"}',
- )
- spool = await spool_factory()
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "AABBCCDD",
- "success": True,
- },
- )
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 30},
- )
- assert hb.json()["pending_command"] is None
- assert hb.json()["pending_write_payload"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_cancel_write(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-cancel")
- spool = await spool_factory()
- # Queue a write
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- # Cancel it
- resp = await async_client.post(f"{API}/devices/{device.device_id}/cancel-write", json={})
- assert resp.status_code == 200
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_cancel_write_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/cancel-write", json={})
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_ndef_data_is_valid(self, async_client: AsyncClient, device_factory, spool_factory):
- """Verify the NDEF data in the heartbeat is a valid OpenTag3D message."""
- device = await device_factory(device_id="sb-wt-ndef")
- spool = await spool_factory(
- material="PLA",
- brand="Polymaker",
- color_name="White",
- rgba="FFFFFFFF",
- label_weight=1000,
- )
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- payload = hb.json()["pending_write_payload"]
- ndef_bytes = bytes.fromhex(payload["ndef_data_hex"])
- # CC bytes
- assert ndef_bytes[:4] == bytes([0xE1, 0x10, 0x12, 0x00])
- # TLV type
- assert ndef_bytes[4] == 0x03
- # NDEF record: TNF=MIME, type=application/opentag3d
- assert ndef_bytes[6] == 0xD2
- assert ndef_bytes[9:30] == b"application/opentag3d"
- # Terminator
- assert ndef_bytes[-1] == 0xFE
- # Total size fits NTAG213
- assert len(ndef_bytes) <= 144
- # ============================================================================
- # Scale endpoints
- # ============================================================================
- class TestScaleEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_scale_reading_broadcast(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/scale/reading",
- json={
- "device_id": "sb-1",
- "weight_grams": 823.5,
- "stable": True,
- "raw_adc": 456789,
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_weight"
- assert msg["device_id"] == "sb-1"
- assert msg["weight_grams"] == 823.5
- assert msg["stable"] is True
- assert msg["raw_adc"] == 456789
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_calculates_correctly(self, async_client: AsyncClient, spool_factory):
- # label=1000g, core=250g, scale reads 750g
- # net_filament = max(0, 750 - 250) = 500
- # weight_used = max(0, 1000 - 500) = 500
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["weight_used"] == 500
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_full_spool(self, async_client: AsyncClient, spool_factory):
- # label=1000g, core=250g, scale reads 1250g (full spool)
- # net_filament = max(0, 1250 - 250) = 1000
- # weight_used = max(0, 1000 - 1000) = 0
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=200)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 1250},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["weight_used"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_stores_scale_reading(self, async_client: AsyncClient, spool_factory):
- """Verify last_scale_weight and last_weighed_at are stored after weight sync."""
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # Fetch the spool via inventory API to verify stored fields
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- assert spool_resp.status_code == 200
- spool_data = spool_resp.json()
- assert spool_data["last_scale_weight"] == 750
- assert spool_data["last_weighed_at"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_missing_spool_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 99999, "weight_grams": 500},
- )
- assert resp.status_code == 404
- # ============================================================================
- # Calibration endpoints
- # ============================================================================
- class TestCalibrationEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tare_queues_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-tare")
- resp = await async_client.post(f"{API}/devices/sb-tare/calibration/tare", json={})
- assert resp.status_code == 200
- assert resp.json()["status"] == "ok"
- # Verify pending_command via heartbeat
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-tare/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 1},
- )
- assert hb.json()["pending_command"] == "tare"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tare_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/calibration/tare", json={})
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_tare_offset(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-st", calibration_factor=0.005)
- resp = await async_client.post(
- f"{API}/devices/sb-st/calibration/set-tare",
- json={"tare_offset": 54321},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == 54321
- assert data["calibration_factor"] == pytest.approx(0.005)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_calibration_factor(self, async_client: AsyncClient, device_factory):
- # known_weight=200g, raw_adc=50000, tare=10000 → factor=200/(50000-10000)=0.005
- await device_factory(device_id="sb-cf", tare_offset=10000)
- resp = await async_client.post(
- f"{API}/devices/sb-cf/calibration/set-factor",
- json={"known_weight_grams": 200, "raw_adc": 50000},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["calibration_factor"] == pytest.approx(0.005)
- assert data["tare_offset"] == 10000
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_calibration_factor_zero_delta_400(self, async_client: AsyncClient, device_factory):
- # raw_adc == tare_offset → delta is 0 → 400 error
- await device_factory(device_id="sb-zero", tare_offset=5000)
- resp = await async_client.post(
- f"{API}/devices/sb-zero/calibration/set-factor",
- json={"known_weight_grams": 100, "raw_adc": 5000},
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_calibration(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-gcal",
- tare_offset=11111,
- calibration_factor=0.0042,
- )
- resp = await async_client.get(f"{API}/devices/sb-gcal/calibration")
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == 11111
- assert data["calibration_factor"] == pytest.approx(0.0042)
- # ============================================================================
- # Display endpoints
- # ============================================================================
- class TestDisplayEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_settings(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp", display_brightness=100, display_blank_timeout=0)
- resp = await async_client.put(
- f"{API}/devices/sb-disp/display",
- json={"brightness": 75, "blank_timeout": 300},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["brightness"] == 75
- assert data["blank_timeout"] == 300
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_persists_via_heartbeat(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp-hb")
- await async_client.put(
- f"{API}/devices/sb-disp-hb/display",
- json={"brightness": 50, "blank_timeout": 600},
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-disp-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["display_brightness"] == 50
- assert hb.json()["display_blank_timeout"] == 600
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.put(
- f"{API}/devices/ghost/display",
- json={"brightness": 50, "blank_timeout": 60},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_validates_brightness(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp-val")
- resp = await async_client.put(
- f"{API}/devices/sb-disp-val/display",
- json={"brightness": 150, "blank_timeout": 0},
- )
- assert resp.status_code == 422 # Validation error: brightness > 100
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_display_settings(self, async_client: AsyncClient, device_factory):
- """The kiosk idle watchdog (install/spoolbuddy-idle.sh) reads this
- endpoint on autostart to configure swayidle with the user-selected
- blank timeout before launching. See issue #937."""
- await device_factory(device_id="sb-disp-get", display_brightness=60, display_blank_timeout=450)
- resp = await async_client.get(f"{API}/devices/sb-disp-get/display")
- assert resp.status_code == 200
- data = resp.json()
- assert data["brightness"] == 60
- assert data["blank_timeout"] == 450
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_display_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices/ghost/display")
- assert resp.status_code == 404
- # ============================================================================
- # Update endpoints
- # ============================================================================
- class TestUpdateEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_starts_ssh_update(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(f"{API}/devices/sb-upd/update")
- assert resp.status_code == 200
- assert resp.json()["status"] == "ok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_offline_device_409(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-upd-off",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- resp = await async_client.post(f"{API}/devices/sb-upd-off/update")
- assert resp.status_code == 409
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/update")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_already_updating(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-dup", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(f"{API}/devices/sb-upd-dup/update")
- assert resp.status_code == 200
- assert resp.json()["status"] == "already_updating"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_updating(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-st", pending_command="update", update_status="pending")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-upd-st/update-status",
- json={"status": "updating", "message": "Fetching latest code..."},
- )
- assert resp.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_update"
- assert msg["update_status"] == "updating"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_complete_clears_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-done", pending_command="update", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- await async_client.post(
- f"{API}/devices/sb-upd-done/update-status",
- json={"status": "complete", "message": "Update complete, restarting..."},
- )
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-upd-done/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_error(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-err", pending_command="update", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-upd-err/update-status",
- json={"status": "error", "message": "git fetch failed: network unreachable"},
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["update_status"] == "error"
- assert "git fetch failed" in msg["update_message"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/devices/ghost/update-status",
- json={"status": "updating", "message": "test"},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_invalid_status_422(self, async_client: AsyncClient, device_factory):
- """Arbitrary status strings must be rejected with 422 (H2: UpdateStatusRequest validation)."""
- await device_factory(device_id="sb-upd-inv")
- resp = await async_client.post(
- f"{API}/devices/sb-upd-inv/update-status",
- json={"status": "hacked", "message": "injected"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_oversized_message_422(self, async_client: AsyncClient, device_factory):
- """Message exceeding 255 chars must be rejected with 422 (H2/M4)."""
- await device_factory(device_id="sb-upd-big")
- resp = await async_client.post(
- f"{API}/devices/sb-upd-big/update-status",
- json={"status": "updating", "message": "x" * 256},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_ssh_public_key_error_does_not_leak_exception_text(self, async_client: AsyncClient):
- """SSH public-key 500 must not expose raw exception details (M3)."""
- from backend.app.services.spoolbuddy_ssh import get_public_key
- with patch(
- "backend.app.services.spoolbuddy_ssh.get_public_key",
- AsyncMock(side_effect=RuntimeError("REDACT_ME internal path /data/keys/id_ed25519")),
- ):
- resp = await async_client.get(f"{API}/ssh/public-key")
- assert resp.status_code == 500
- body = resp.json()["detail"]
- assert "REDACT_ME" not in body
- assert "/data/keys" not in body
- assert "id_ed25519" not in body
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_device_response_includes_update_fields(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-resp", update_status="complete", update_message="Done!")
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- device = next(d for d in resp.json() if d["device_id"] == "sb-upd-resp")
- assert device["update_status"] == "complete"
- assert device["update_message"] == "Done!"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_returns_version_info(self, async_client: AsyncClient, device_factory):
- """GET /devices/{id}/update-check compares device version against APP_VERSION."""
- await device_factory(device_id="sb-uc", firmware_version="0.1.0")
- resp = await async_client.get(f"{API}/devices/sb-uc/update-check")
- assert resp.status_code == 200
- data = resp.json()
- assert data["current_version"] == "0.1.0"
- assert data["latest_version"] is not None
- assert data["update_available"] is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_up_to_date(self, async_client: AsyncClient, device_factory):
- from backend.app.core.config import APP_VERSION
- await device_factory(device_id="sb-uc2", firmware_version=APP_VERSION)
- resp = await async_client.get(f"{API}/devices/sb-uc2/update-check")
- assert resp.status_code == 200
- assert resp.json()["update_available"] is False
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices/ghost/update-check")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_broadcasts_websocket(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-ws")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
- ):
- mock_ws.broadcast = AsyncMock()
- await async_client.post(f"{API}/devices/sb-upd-ws/update")
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_update"
- assert msg["device_id"] == "sb-upd-ws"
- assert msg["update_status"] == "pending"
- # ============================================================================
- # System command endpoints
- # ============================================================================
- class TestSystemCommandEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_reboot(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-reboot")
- resp = await async_client.post(
- f"{API}/devices/sb-reboot/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["status"] == "queued"
- assert data["command"] == "reboot"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_shutdown(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-shutdown")
- resp = await async_client.post(
- f"{API}/devices/sb-shutdown/system/command",
- json={"command": "shutdown"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "shutdown"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_restart_daemon(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-rd")
- resp = await async_client.post(
- f"{API}/devices/sb-rd/system/command",
- json={"command": "restart_daemon"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "restart_daemon"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_restart_browser(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-rb")
- resp = await async_client.post(
- f"{API}/devices/sb-rb/system/command",
- json={"command": "restart_browser"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "restart_browser"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_command_rejected(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-invalid")
- resp = await async_client.post(
- f"{API}/devices/sb-invalid/system/command",
- json={"command": "format_disk"},
- )
- assert resp.status_code == 400
- assert "Invalid command" in resp.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/devices/ghost/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_offline_device_409(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-offline-cmd",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- resp = await async_client.post(
- f"{API}/devices/sb-offline-cmd/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 409
- assert "offline" in resp.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_sets_pending_command(self, async_client: AsyncClient, device_factory, db_session):
- device = await device_factory(device_id="sb-pending")
- await async_client.post(
- f"{API}/devices/sb-pending/system/command",
- json={"command": "restart_daemon"},
- )
- await db_session.refresh(device)
- assert device.pending_command == "restart_daemon"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_clears_system_command(self, async_client: AsyncClient, device_factory):
- """System commands (reboot/shutdown/restart_*) are fire-and-forget — heartbeat clears them."""
- await device_factory(device_id="sb-hb-clear")
- # Queue a command
- await async_client.post(
- f"{API}/devices/sb-hb-clear/system/command",
- json={"command": "restart_browser"},
- )
- # Heartbeat should return the command and clear it
- resp = await async_client.post(
- f"{API}/devices/sb-hb-clear/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 100},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["pending_command"] == "restart_browser"
- # ============================================================================
- # Spoolman-aware SpoolBuddy endpoints
- # ============================================================================
- @pytest.fixture
- async def spoolman_settings(db_session: AsyncSession):
- """Create Spoolman settings in the database (enabled with URL)."""
- from backend.app.models.settings import Settings
- settings = [
- Settings(key="spoolman_enabled", value="true"),
- Settings(key="spoolman_url", value="http://spoolman.local:7912"),
- ]
- for s in settings:
- db_session.add(s)
- await db_session.commit()
- return settings
- def _mock_spoolman_client(base_url: str = "http://spoolman.local:7912") -> MagicMock:
- client = MagicMock()
- client.base_url = base_url
- client.get_spools = AsyncMock(return_value=[])
- client.get_spool = AsyncMock(return_value={})
- client.find_spool_by_tag = AsyncMock(return_value=None)
- client.update_spool = AsyncMock(return_value=None)
- client.merge_spool_extra = AsyncMock(return_value={"id": 0})
- return client
- def _spoolman_spool_fixture(
- spool_id: int,
- spool_weight: float = 196.0,
- filament_weight: float = 1000.0,
- spool_level_spool_weight=None,
- ) -> dict:
- """Build a minimal Spoolman spool dict with realistic core weight from filament.spool_weight."""
- raw = {
- "id": spool_id,
- "filament": {"weight": filament_weight, "spool_weight": spool_weight},
- "used_weight": 0.0,
- }
- if spool_level_spool_weight is not None:
- raw["spool_weight"] = spool_level_spool_weight
- return raw
- class TestUpdateSpoolWeightSpoolman:
- """update-spool-weight routes to Spoolman when Spoolman mode is active."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_uses_filament_spool_weight(self, async_client: AsyncClient, spoolman_settings):
- """core_weight comes from filament.spool_weight, not a hardcoded constant."""
- sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["status"] == "ok"
- # remaining = max(0, 750 - 196) = 554 → weight_used = 1000 - 554 = 446
- assert data["weight_used"] == pytest.approx(446.0)
- mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(554.0))
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_clamps_remaining_to_zero(self, async_client: AsyncClient, spoolman_settings):
- """Scale weight below core weight → remaining_weight = 0."""
- sm_spool = _spoolman_spool_fixture(7, spool_weight=196.0, filament_weight=1000.0)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 7, "weight_grams": 100},
- )
- assert resp.status_code == 200
- mock_client.update_spool.assert_called_once_with(spool_id=7, remaining_weight=0.0)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_404_when_spool_not_found(self, async_client: AsyncClient, spoolman_settings):
- """404 when Spoolman doesn't know the spool."""
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 9999 not found"))
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 9999, "weight_grams": 500},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_503_on_client_failure(self, async_client: AsyncClient, spoolman_settings):
- """503 is returned when Spoolman is unreachable during weight update."""
- sm_spool = _spoolman_spool_fixture(99)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(side_effect=SpoolmanUnavailableError("Spoolman down"))
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 99, "weight_grams": 500},
- )
- assert resp.status_code == 503
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_mode_unchanged(self, async_client: AsyncClient, spool_factory):
- """When Spoolman is NOT enabled, local DB update still works."""
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- assert resp.json()["weight_used"] == 500
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spool_level_spool_weight_takes_priority(self, async_client: AsyncClient, spoolman_settings):
- """spool.spool_weight overrides filament.spool_weight for tare calculation."""
- sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0, spool_level_spool_weight=300)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
- patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # remaining = 750 - 300 = 450; weight_used = 1000 - 450 = 550
- assert resp.json()["weight_used"] == pytest.approx(550.0)
- mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(450.0))
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spool_level_zero_spool_weight_not_treated_as_missing(
- self, async_client: AsyncClient, spoolman_settings
- ):
- """spool.spool_weight=0 is valid (0g tare), not treated as missing/fallback."""
- sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0, spool_level_spool_weight=0)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
- patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # remaining = 750 - 0 = 750; weight_used = 1000 - 750 = 250
- assert resp.json()["weight_used"] == pytest.approx(250.0)
- mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(750.0))
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_both_levels_none_uses_250g_fallback_and_warns(self, async_client: AsyncClient, spoolman_settings):
- """When both spool_weight and filament.spool_weight are None, 250g fallback is used with a warning."""
- sm_spool = {"id": 42, "filament": {"weight": 1000.0, "spool_weight": None}, "used_weight": 0.0}
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
- patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # remaining = 750 - 250 = 500; weight_used = 1000 - 500 = 500
- assert resp.json()["weight_used"] == pytest.approx(500.0)
- assert resp.json().get("warnings")
- class TestTagScannedSpoolmanFallback:
- """nfc/tag-scanned falls back to Spoolman when local DB has no match."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_fallback_on_local_miss(self, async_client: AsyncClient, spoolman_settings):
- raw_spool = {
- "id": 5,
- "filament": {
- "material": "PETG",
- "name": "PETG Basic",
- "color_hex": "00FF00",
- "weight": 1000,
- "vendor": {"name": "Polymaker"},
- },
- "used_weight": 100.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00+00:00",
- "extra": {"tag": '"DEADBEEF12345678"'},
- }
- mock_client = _mock_spoolman_client()
- mock_client.get_spools = AsyncMock(return_value=[raw_spool])
- mock_client.find_spool_by_tag = AsyncMock(return_value=raw_spool)
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=None,
- ),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "DEADBEEF12345678"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- assert data["spool_id"] == 5
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_matched"
- assert msg["spool"]["id"] == 5
- assert msg["spool"]["material"] == "PETG"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_fallback_unknown_when_no_spoolman_match(self, async_client: AsyncClient, spoolman_settings):
- """Unknown tag broadcast when both local DB and Spoolman miss."""
- mock_client = _mock_spoolman_client()
- mock_client.get_spools = AsyncMock(return_value=[])
- mock_client.find_spool_by_tag = AsyncMock(return_value=None)
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=None,
- ),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "UNKNOWN0000000FF"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
- assert data["spool_id"] is None
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_unknown_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_malformed_spoolman_data_degrades_gracefully(self, async_client: AsyncClient, spoolman_settings):
- """ValueError from _map_spoolman_spool (e.g. spool_id=0) must return matched=False without broadcasting unknown_tag."""
- bad_spool = {
- "id": 0, # _map_spoolman_spool raises ValueError for id <= 0
- "filament": {"material": "PLA", "name": "PLA Basic", "color_hex": "FF0000", "weight": 1000},
- "used_weight": 0.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00Z",
- "extra": {"tag": '"DEADBEEF12345678"'},
- }
- mock_client = _mock_spoolman_client()
- mock_client.find_spool_by_tag = AsyncMock(return_value=bad_spool)
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=None,
- ),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "DEADBEEF12345678"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
- assert data["spool_id"] is None
- # No broadcast: UI must not get a spurious unknown_tag event on Spoolman data errors
- mock_ws.broadcast.assert_not_called()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_match_skips_spoolman(self, async_client: AsyncClient, spool_factory):
- """When local DB matches, Spoolman is never queried."""
- spool = await spool_factory(tag_uid="AABB1122", material="PLA")
- mock_spool = MagicMock()
- mock_spool.id = spool.id
- mock_spool.material = spool.material
- mock_spool.subtype = spool.subtype
- mock_spool.color_name = spool.color_name
- mock_spool.rgba = spool.rgba
- mock_spool.brand = spool.brand
- mock_spool.label_weight = spool.label_weight
- mock_spool.core_weight = spool.core_weight
- mock_spool.weight_used = spool.weight_used
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=mock_spool,
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- assert data["spool_id"] == spool.id
- # ============================================================================
- # NFC write-tag / write-result — Spoolman-aware
- # ============================================================================
- def _full_spoolman_spool(spool_id: int) -> dict:
- """Complete Spoolman spool dict sufficient for NDEF encoding."""
- return {
- "id": spool_id,
- "filament": {
- "material": "PLA",
- "name": "PLA Basic",
- "color_hex": "FF0000",
- "weight": 1000.0,
- "spool_weight": 196.0,
- "vendor": {"name": "Bambu Lab"},
- },
- "used_weight": 0.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00Z",
- }
- class TestNfcWriteTagSpoolman:
- """nfc/write-tag falls back to Spoolman when local DB has no matching spool."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_spool_queued_when_local_miss(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """write-tag encodes NDEF from Spoolman data when spool not in local DB."""
- await device_factory(device_id="sb-write-sm")
- sm_spool = _full_spoolman_spool(77)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-write-sm", "spool_id": 77},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- mock_client.get_spool.assert_called_once_with(77)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_data_origin_spoolman_stored_in_payload(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """Pending write payload records data_origin=spoolman for Spoolman spools."""
- import json as _json
- device = await device_factory(device_id="sb-origin")
- sm_spool = _full_spoolman_spool(88)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-origin", "spool_id": 88},
- )
- await db_session.refresh(device)
- payload = _json.loads(device.pending_write_payload)
- assert payload["data_origin"] == "spoolman"
- assert payload["spool_id"] == 88
- assert "ndef_data_hex" in payload
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_404_when_neither_local_nor_spoolman(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """404 returned when spool is missing from both local DB and Spoolman."""
- await device_factory(device_id="sb-miss")
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 9999 not found"))
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-miss", "spool_id": 9999},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_spool_used_when_present(self, async_client: AsyncClient, device_factory, spool_factory):
- """Local DB spool is encoded directly without contacting Spoolman."""
- await device_factory(device_id="sb-local-write")
- spool = await spool_factory(material="PETG")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-local-write", "spool_id": spool.id},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- class TestNfcWriteResultSpoolman:
- """nfc/write-result updates Spoolman extra.tag on success for Spoolman spools."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_success_updates_spoolman_extra_tag(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """Successful write for a Spoolman spool calls merge_spool_extra with extra.tag."""
- import json as _json
- await device_factory(
- device_id="sb-wr-sm",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 55, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(return_value={"id": 55})
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-wr-sm",
- "spool_id": 55,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- assert resp.status_code == 200
- mock_client.merge_spool_extra.assert_called_once_with(55, {"tag": '"AABBCCDD11223344"'})
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_written"
- assert msg["tag_uid"] == "AABBCCDD11223344"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_failure_does_not_call_spoolman(self, async_client: AsyncClient, device_factory, spoolman_settings):
- """Failed write never calls Spoolman update."""
- import json as _json
- await device_factory(
- device_id="sb-wr-fail",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 66, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-wr-fail",
- "spool_id": 66,
- "tag_uid": "AABBCCDD11223344",
- "success": False,
- "message": "write timeout",
- },
- )
- assert resp.status_code == 200
- mock_client.update_spool.assert_not_called()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_write_failed"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_success_local_spool_writes_to_db(
- self, async_client: AsyncClient, device_factory, spool_factory, db_session
- ):
- """Successful write for a local spool still updates local DB tag_uid."""
- import json as _json
- spool = await spool_factory()
- await device_factory(
- device_id="sb-wr-local",
- pending_command="write_tag",
- pending_write_payload=_json.dumps(
- {"spool_id": spool.id, "ndef_data_hex": "deadbeef", "data_origin": "local"}
- ),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-wr-local",
- "spool_id": spool.id,
- "tag_uid": "DEADBEEF12345678",
- "success": True,
- },
- )
- assert resp.status_code == 200
- await db_session.refresh(spool)
- assert spool.tag_uid == "DEADBEEF12345678"
- assert spool.tag_type == "ntag"
- # ============================================================================
- # Security fix tests — write-tag ValueError + write-result exception safety
- # ============================================================================
- class TestNfcWriteTagSpoolmanSecurityFixes:
- """Regression tests for security fixes in nfc/write-tag Spoolman path."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_spoolman_spool_id_returns_502(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """Malformed Spoolman spool (invalid id=0) raises 502, not 404 — spool exists but is bad data."""
- await device_factory(device_id="sb-invalid-id")
- # Spoolman returns spool with id=0 (invalid — caught by _map_spoolman_spool guard)
- bad_spool = {**_full_spoolman_spool(1), "id": 0}
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=bad_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-invalid-id", "spool_id": 99},
- )
- # 502: spool exists in Spoolman but its data is malformed — not a "not found"
- assert resp.status_code == 502
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oversized_label_weight_does_not_crash(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """label_weight > 65535 from Spoolman must not crash with struct.error."""
- await device_factory(device_id="sb-overflow")
- big_weight_spool = {
- **_full_spoolman_spool(42),
- "filament": {**_full_spoolman_spool(42)["filament"], "weight": 70000},
- }
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=big_weight_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-overflow", "spool_id": 42},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- class TestNfcWriteResultSpoolmanSecurityFixes:
- """Regression tests for transaction safety in nfc/write-result Spoolman path."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_client_exception_still_clears_device_state(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """If Spoolman client raises, device pending_command is still cleared in DB."""
- import json as _json
- device = await device_factory(
- device_id="sb-exc-safe",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 77, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(side_effect=Exception("connection refused"))
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-exc-safe",
- "spool_id": 77,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- # 502: tag written to NFC but Spoolman link failed (not best-effort — caller must retry)
- assert resp.status_code == 502
- # Device state must be cleared despite the exception (no spurious re-write)
- await db_session.refresh(device)
- assert device.pending_command is None
- assert device.pending_write_payload is None
- # Failure broadcast fires so the UI can show the error
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_link_failed"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_not_found_error_broadcasts_link_failed(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """SpoolmanNotFoundError from merge_spool_extra must clear device state and broadcast link_failed."""
- import json as _json
- device = await device_factory(
- device_id="sb-notfound",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 55, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 55 not found"))
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-notfound",
- "spool_id": 55,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- assert resp.status_code == 502
- await db_session.refresh(device)
- assert device.pending_command is None
- assert device.pending_write_payload is None
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_link_failed"
- assert msg["spool_id"] == 55
- class TestNfcWriteResultOrphanedSpool:
- """nfc/write-result when the local spool was deleted between write-queue and write-result."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_spool_deleted_before_write_back(self, async_client: AsyncClient, device_factory, db_session):
- """When local spool is deleted between write-queue and write-result, return linked=False and broadcast link_failed."""
- import json as _json
- device = await device_factory(
- device_id="sb-orphan",
- pending_command="write_tag",
- pending_write_payload=_json.dumps(
- {
- "spool_id": 99999, # non-existent spool
- "ndef_data_hex": "aabbccdd",
- "data_origin": "local",
- }
- ),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={"device_id": device.device_id, "spool_id": 99999, "success": True, "tag_uid": "AABBCCDD"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["linked"] is False
- # pending command should be cleared
- await db_session.refresh(device)
- assert device.pending_command is None
- # broadcast should be spoolbuddy_tag_link_failed
- broadcast_calls = mock_ws.broadcast.call_args_list
- link_failed = [c[0][0] for c in broadcast_calls if c[0][0].get("type") == "spoolbuddy_tag_link_failed"]
- assert len(link_failed) >= 1
- class TestNfcWriteResultInputValidation:
- """Input validation and JSON safety for nfc/write-result."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_uid_too_long_rejected(self, async_client: AsyncClient, device_factory):
- """tag_uid longer than 32 chars must be rejected with 422."""
- import json as _json
- await device_factory(
- device_id="sb-uid-long",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 1, "ndef_data_hex": "dead", "data_origin": "local"}),
- )
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-uid-long",
- "spool_id": 1,
- "tag_uid": "A" * 65,
- "success": True,
- },
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_malformed_pending_payload_falls_back_to_local(
- self, async_client: AsyncClient, device_factory, spool_factory, db_session
- ):
- """Corrupted pending_write_payload JSON falls back to local mode gracefully."""
- spool = await spool_factory()
- await device_factory(
- device_id="sb-corrupt-json",
- pending_command="write_tag",
- pending_write_payload="{not valid json!!!",
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-corrupt-json",
- "spool_id": spool.id,
- "tag_uid": "DEADBEEF12345678",
- "success": True,
- },
- )
- # Must return 200, not 500
- assert resp.status_code == 200
- # Falls back to local mode — tag written to DB
- await db_session.refresh(spool)
- assert spool.tag_uid == "DEADBEEF12345678"
- # ============================================================================
- # B1: NFC write-tag warnings appear in response body
- # ============================================================================
- class TestNfcWriteTagWarningsBody:
- """B1: resp.json()['warnings'] is populated when Spoolman fields are absent."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_warnings_returned_for_missing_color_and_temp(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """Both color_name=None and settings_extruder_temp=None produce 2 warnings."""
- await device_factory(device_id="sb-warn-b1")
- # Spoolman spool with no color_name or nozzle temp
- sparse_spool = {
- "id": 99,
- "filament": {
- "material": "PLA",
- "name": "PLA Basic",
- "color_hex": "808080",
- # color_name absent → None after mapping
- # settings_extruder_temp absent → nozzle_temp_min=None
- "weight": 1000.0,
- "vendor": {"name": "Bambu Lab"},
- },
- "used_weight": 0.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00Z",
- }
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sparse_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-warn-b1", "spool_id": 99},
- )
- assert resp.status_code == 200
- body = resp.json()
- assert "warnings" in body, "Response should contain 'warnings' key when fields are absent"
- warnings = body["warnings"]
- assert len(warnings) >= 2, f"Expected at least 2 warnings for missing color_name + nozzle_temp, got: {warnings}"
- # Confirm the specific fields are mentioned
- warn_text = " ".join(warnings)
- assert "color_name" in warn_text
- assert "nozzle_temp" in warn_text
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_no_warnings_key_when_all_fields_present(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """No 'warnings' key in response when all fields are populated."""
- await device_factory(device_id="sb-nowarn")
- full_spool = _full_spoolman_spool(100)
- # Add color_name and extruder temp
- full_spool["filament"]["color_name"] = "Red"
- full_spool["filament"]["settings_extruder_temp"] = 220
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=full_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-nowarn", "spool_id": 100},
- )
- assert resp.status_code == 200
- body = resp.json()
- assert "warnings" not in body or body["warnings"] == []
- # ============================================================================
- # B5: Exception text scrubbed from WebSocket broadcast message
- # ============================================================================
- class TestNfcWriteResultExceptionScrubbing:
- """B5: Internal exception details must not appear in WebSocket 'message' field."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_exception_text_not_leaked_in_ws_message(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """When Spoolman merge raises, WS message is generic; 'connection refused' absent."""
- import json as _json
- await device_factory(
- device_id="sb-scrub-b5",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 77, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(side_effect=Exception("connection refused to 192.168.1.1:7912"))
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-scrub-b5",
- "spool_id": 77,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- assert resp.status_code == 502
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_link_failed"
- # Generic message — no internal exception details leaked
- assert msg["message"] == "Spoolman link failed", f"Expected generic message but got: {msg['message']!r}"
- assert "connection refused" not in str(msg), f"Exception text must not appear in WS message: {msg}"
- assert "192.168.1" not in str(msg), f"Internal IP must not appear in WS message: {msg}"
- # ============================================================================
- # _get_spoolman_client_or_none: graceful degradation on ValueError during reinit
- # ============================================================================
- class TestSpoolmanClientOrNoneGraceful:
- """_get_spoolman_client_or_none returns None when init_spoolman_client raises ValueError."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_returns_none_when_init_raises_value_error(self, async_client: AsyncClient, db_session):
- """_get_spoolman_client_or_none returns None when init_spoolman_client raises ValueError,
- so the device endpoint degrades gracefully instead of propagating a 500 error."""
- from backend.app.models.settings import Settings
- db_session.add(Settings(key="spoolman_enabled", value="true"))
- db_session.add(Settings(key="spoolman_url", value="http://spoolman.local:7912"))
- await db_session.commit()
- with (
- patch("backend.app.api.routes._spoolman_helpers.assert_safe_spoolman_url"),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=None),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(side_effect=ValueError("invalid URL")),
- ),
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- ):
- mock_ws.broadcast = AsyncMock()
- # nfc/tag-scanned calls _get_spoolman_client_or_none; with None returned it
- # must broadcast unknown_tag (not raise 500 due to ValueError propagating).
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-vale", "tag_uid": "AABBCCDD"},
- )
- # Must not be 500 — ValueError is caught and client returns None, degrading gracefully
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
|