| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569 |
- """Integration tests for SpoolBuddy API endpoints."""
- from datetime import datetime, timedelta, timezone
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from httpx import AsyncClient
- from sqlalchemy.ext.asyncio import AsyncSession
- import backend.app.services.spoolbuddy_ssh # noqa: F401 — ensures patch() can resolve the dotted path
- from backend.app.api.routes import spoolbuddy as spoolbuddy_routes
- from backend.app.models.spool import Spool
- from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
- from backend.app.services.spoolman import SpoolmanNotFoundError, SpoolmanUnavailableError
- API = "/api/v1/spoolbuddy"
- @pytest.fixture
- def device_factory(db_session: AsyncSession):
- """Factory to create SpoolBuddyDevice records."""
- _counter = [0]
- async def _create(**kwargs):
- _counter[0] += 1
- n = _counter[0]
- defaults = {
- "device_id": f"sb-{n:04d}",
- "hostname": f"spoolbuddy-{n}",
- "ip_address": f"10.0.0.{n}",
- "firmware_version": "1.0.0",
- "has_nfc": True,
- "has_scale": True,
- "tare_offset": 0,
- "calibration_factor": 1.0,
- "last_seen": datetime.now(timezone.utc),
- }
- defaults.update(kwargs)
- device = SpoolBuddyDevice(**defaults)
- db_session.add(device)
- await db_session.commit()
- await db_session.refresh(device)
- return device
- return _create
- @pytest.fixture
- def spool_factory(db_session: AsyncSession):
- """Factory to create Spool records."""
- _counter = [0]
- async def _create(**kwargs):
- _counter[0] += 1
- defaults = {
- "material": "PLA",
- "subtype": "Basic",
- "brand": "Polymaker",
- "color_name": "Red",
- "rgba": "FF0000FF",
- "label_weight": 1000,
- "core_weight": 250,
- "weight_used": 0,
- }
- defaults.update(kwargs)
- spool = Spool(**defaults)
- db_session.add(spool)
- await db_session.commit()
- await db_session.refresh(spool)
- return spool
- return _create
- # ============================================================================
- # Device endpoints
- # ============================================================================
- class TestDeviceEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_register_new_device(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/register",
- json={
- "device_id": "sb-new",
- "hostname": "spoolbuddy-new",
- "ip_address": "10.0.0.99",
- "firmware_version": "1.2.0",
- },
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["device_id"] == "sb-new"
- assert data["hostname"] == "spoolbuddy-new"
- assert data["online"] is True
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_re_register_existing_device(self, async_client: AsyncClient, device_factory):
- device = await device_factory(
- device_id="sb-exist",
- tare_offset=12345,
- calibration_factor=0.0042,
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/register",
- json={
- "device_id": "sb-exist",
- "hostname": "updated-host",
- "ip_address": "10.0.0.200",
- "firmware_version": "2.0.0",
- },
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["id"] == device.id
- assert data["hostname"] == "updated-host"
- assert data["ip_address"] == "10.0.0.200"
- assert data["firmware_version"] == "2.0.0"
- # Calibration preserved on re-register
- assert data["tare_offset"] == 12345
- assert data["calibration_factor"] == pytest.approx(0.0042)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_devices_empty(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- assert resp.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_devices(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-a", hostname="alpha")
- await device_factory(device_id="sb-b", hostname="beta")
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- devices = resp.json()
- assert len(devices) == 2
- hostnames = {d["hostname"] for d in devices}
- assert hostnames == {"alpha", "beta"}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_unregister_device(self, async_client: AsyncClient, device_factory, db_session):
- await device_factory(device_id="sb-keep", hostname="keep")
- await device_factory(device_id="sb-drop", hostname="drop")
- spoolbuddy_routes._spoolbuddy_online_last_broadcast["sb-drop"] = 123.0
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.delete(f"{API}/devices/sb-drop")
- assert resp.status_code == 200
- assert resp.json() == {"status": "deleted", "device_id": "sb-drop"}
- assert "sb-drop" not in spoolbuddy_routes._spoolbuddy_online_last_broadcast
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_unregistered"
- assert msg["device_id"] == "sb-drop"
- # Other device still present
- resp = await async_client.get(f"{API}/devices")
- remaining = {d["device_id"] for d in resp.json()}
- assert remaining == {"sb-keep"}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_unregister_device_not_found(self, async_client: AsyncClient):
- resp = await async_client.delete(f"{API}/devices/sb-ghost")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_updates_status(self, async_client: AsyncClient, device_factory):
- device = await device_factory(device_id="sb-hb")
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 600},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == device.tare_offset
- assert data["calibration_factor"] == pytest.approx(device.calibration_factor)
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-hb"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_returns_ssh_public_key(self, async_client: AsyncClient, device_factory):
- """Heartbeat response carries the current SSH public key so the daemon
- can re-deploy it whenever Bambuddy's keypair rotates without waiting
- for a service restart."""
- await device_factory(device_id="sb-ssh-hb")
- fake_key = "ssh-ed25519 AAAATESTKEY bambuddy-spoolbuddy"
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolbuddy_ssh.get_public_key",
- AsyncMock(return_value=fake_key),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-ssh-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
- )
- assert resp.status_code == 200
- assert resp.json()["ssh_public_key"] == fake_key
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_ssh_key_failure_does_not_break_heartbeat(self, async_client: AsyncClient, device_factory):
- """If the backend can't read its own SSH key, the heartbeat must still
- succeed — telemetry/commands are far more critical than key sync."""
- await device_factory(device_id="sb-ssh-fail")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolbuddy_ssh.get_public_key",
- AsyncMock(side_effect=OSError("disk full")),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-ssh-fail/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
- )
- assert resp.status_code == 200
- assert resp.json()["ssh_public_key"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_returns_pending_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-cmd", pending_command="tare")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-cmd/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert resp.status_code == 200
- assert resp.json()["pending_command"] == "tare"
- # Second heartbeat should have no pending command (cleared)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp2 = await async_client.post(
- f"{API}/devices/sb-cmd/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
- )
- assert resp2.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_unknown_device_404(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/nonexistent/heartbeat",
- json={"nfc_ok": False, "scale_ok": False, "uptime_s": 0},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_broadcasts_online_when_was_offline(self, async_client: AsyncClient, device_factory):
- # Create device with last_seen far in the past (offline)
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-offline",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-offline/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 5},
- )
- assert resp.status_code == 200
- # Should broadcast online since device was offline
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-offline"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_broadcasts_online_when_already_online(self, async_client: AsyncClient, device_factory):
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-already-online",
- last_seen=datetime.now(timezone.utc),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-already-online/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 42},
- )
- assert resp.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-already-online"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_online_broadcast_is_throttled(self, async_client: AsyncClient, device_factory):
- spoolbuddy_routes._spoolbuddy_online_last_broadcast.clear()
- await device_factory(
- device_id="sb-throttle",
- last_seen=datetime.now(timezone.utc),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp1 = await async_client.post(
- f"{API}/devices/sb-throttle/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- resp2 = await async_client.post(
- f"{API}/devices/sb-throttle/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 11},
- )
- assert resp1.status_code == 200
- assert resp2.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_online"
- assert msg["device_id"] == "sb-throttle"
- # ============================================================================
- # NFC endpoints
- # ============================================================================
- class TestNfcEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_scanned_matched(self, async_client: AsyncClient, spool_factory):
- spool = await spool_factory(tag_uid="AABB1122", material="PLA")
- mock_spool = MagicMock()
- mock_spool.id = spool.id
- mock_spool.material = spool.material
- mock_spool.subtype = spool.subtype
- mock_spool.color_name = spool.color_name
- mock_spool.rgba = spool.rgba
- mock_spool.brand = spool.brand
- mock_spool.label_weight = spool.label_weight
- mock_spool.core_weight = spool.core_weight
- mock_spool.weight_used = spool.weight_used
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_lookup.return_value = mock_spool
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- assert data["spool_id"] == spool.id
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_matched"
- assert msg["spool"]["id"] == spool.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_scanned_unmatched(self, async_client: AsyncClient):
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.api.routes.spoolbuddy.get_spool_by_tag", new_callable=AsyncMock) as mock_lookup,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_lookup.return_value = None
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "DEADBEEF"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
- assert data["spool_id"] is None
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_unknown_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_scanned_spoolman_mode_skips_local_lookup(self, async_client: AsyncClient, db_session):
- """When spoolman_enabled=true, /nfc/tag-scanned must use Spoolman
- exclusively — local DB lookup must not be consulted at all. The
- previous always-local-first behaviour caused stale local rows to
- win over the authoritative Spoolman data (#1228 follow-up).
- """
- from backend.app.models.settings import Settings
- db_session.add(Settings(key="spoolman_enabled", value="true"))
- db_session.add(Settings(key="spoolman_url", value="http://127.0.0.1:7912"))
- await db_session.commit()
- # Mock Spoolman match and verify get_spool_by_tag (the local-DB lookup)
- # is never called in Spoolman-enabled mode.
- sm_match = {
- "id": 7,
- "filament": {
- "material": "PLA",
- "name": "PLA Basic Red",
- "color_hex": "FF0000",
- "weight": 1000.0,
- "vendor": {"name": "Bambu Lab"},
- },
- "extra": {"tag": '"AABB1122"'},
- "used_weight": 0.0,
- }
- mock_client = MagicMock()
- mock_client.get_spools = AsyncMock(return_value=[sm_match])
- mock_client.find_spool_by_tag = AsyncMock(return_value=sm_match)
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy._get_spoolman_client_or_none",
- new_callable=AsyncMock,
- ) as mock_get_client,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- ) as mock_local_lookup,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_get_client.return_value = mock_client
- # Sentinel so a misrouted call would surface as a wrong spool_id.
- mock_local_lookup.return_value = MagicMock(id=999)
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- # Spoolman result, not local DB sentinel — proves the local lookup was skipped.
- assert data["spool_id"] == 7
- mock_local_lookup.assert_not_called()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_clears_duplicate_tag_binding(
- self, async_client: AsyncClient, db_session, device_factory
- ):
- """Writing a tag for spool B must clear the same tag binding from any
- other spool that currently has it. Without this guard, find_spool_by_tag
- returns whichever spool comes first in the cached list (typically the
- older one), so the dashboard shows the wrong spool when the tag is
- scanned.
- """
- import json as _json
- from backend.app.models.settings import Settings
- from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
- db_session.add(Settings(key="spoolman_enabled", value="true"))
- db_session.add(Settings(key="spoolman_url", value="http://127.0.0.1:7912"))
- await device_factory(
- device_id="sb-write",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 22, "ndef_data_hex": "DEAD", "data_origin": "spoolman"}),
- )
- await db_session.commit()
- # Spool A (id=11) currently holds the tag we're about to bind to spool B (id=22).
- spool_a_with_tag = {
- "id": 11,
- "filament": {"material": "PLA", "name": "PLA Old", "color_hex": "AAAAAA", "weight": 1000.0},
- "extra": {"tag": '"DEADBEEF"'},
- }
- mock_client = MagicMock()
- mock_client.get_spools = AsyncMock(return_value=[spool_a_with_tag])
- mock_client.find_spool_by_tag = AsyncMock(return_value=spool_a_with_tag)
- mock_client.merge_spool_extra = AsyncMock(return_value={})
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy._get_spoolman_client_or_none",
- new_callable=AsyncMock,
- ) as mock_get_client,
- ):
- mock_ws.broadcast = AsyncMock()
- mock_get_client.return_value = mock_client
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-write",
- "spool_id": 22,
- "tag_uid": "DEADBEEF",
- "success": True,
- },
- )
- assert resp.status_code == 200
- # merge_spool_extra was called twice:
- # 1. clear tag from spool A (id=11) — set tag to ""
- # 2. set tag on spool B (id=22) — set tag to "DEADBEEF" (JSON-encoded)
- assert mock_client.merge_spool_extra.await_count == 2
- clear_call, bind_call = mock_client.merge_spool_extra.await_args_list
- assert clear_call.args[0] == 11
- assert clear_call.args[1] == {"tag": ""}
- assert bind_call.args[0] == 22
- assert bind_call.args[1] == {"tag": '"DEADBEEF"'}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_removed(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-removed",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_removed"
- assert msg["device_id"] == "sb-1"
- assert msg["tag_uid"] == "AABB1122"
- # ============================================================================
- # NFC write-tag endpoints
- # ============================================================================
- class TestWriteTagEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_queues_command(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-wt")
- spool = await spool_factory(material="PLA", brand="Polymaker", color_name="Red", rgba="FF0000FF")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- # Verify heartbeat returns write_tag command with payload
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- hb_data = hb.json()
- assert hb_data["pending_command"] == "write_tag"
- assert hb_data["pending_write_payload"] is not None
- assert hb_data["pending_write_payload"]["spool_id"] == spool.id
- assert "ndef_data_hex" in hb_data["pending_write_payload"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_heartbeat_not_cleared(self, async_client: AsyncClient, device_factory, spool_factory):
- """write_tag command persists across heartbeats until write-result clears it."""
- device = await device_factory(device_id="sb-wt-persist")
- spool = await spool_factory(material="PETG")
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- # First heartbeat — command present
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb1 = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb1.json()["pending_command"] == "write_tag"
- # Second heartbeat — should still be present (not cleared like tare)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb2 = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 20},
- )
- assert hb2.json()["pending_command"] == "write_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_missing_spool_404(self, async_client: AsyncClient, device_factory):
- device = await device_factory(device_id="sb-wt-nospool")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": 99999},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_missing_device_404(self, async_client: AsyncClient, spool_factory):
- spool = await spool_factory()
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "nonexistent", "spool_id": spool.id},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_success_links_tag(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-wr", pending_command="write_tag")
- spool = await spool_factory(material="PLA", tag_uid=None)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "04AABB11223344",
- "success": True,
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_written"
- assert msg["spool_id"] == spool.id
- assert msg["tag_uid"] == "04AABB11223344"
- # Verify spool got tag linked
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- spool_data = spool_resp.json()
- assert spool_data["tag_uid"] == "04AABB11223344"
- assert spool_data["tag_type"] == "ntag"
- assert spool_data["data_origin"] == "opentag3d"
- assert spool_data["encode_time"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_failure_broadcasts_error(
- self, async_client: AsyncClient, device_factory, spool_factory
- ):
- device = await device_factory(device_id="sb-wr-fail", pending_command="write_tag")
- spool = await spool_factory(material="PLA", tag_uid=None)
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "04AABBCC",
- "success": False,
- "message": "Write or verification failed",
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_write_failed"
- assert msg["message"] == "Write or verification failed"
- # Verify spool NOT linked
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- assert spool_resp.json()["tag_uid"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_result_clears_pending_command(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(
- device_id="sb-wr-clear",
- pending_command="write_tag",
- pending_write_payload='{"spool_id": 1, "ndef_data_hex": "E110120003"}',
- )
- spool = await spool_factory()
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": device.device_id,
- "spool_id": spool.id,
- "tag_uid": "AABBCCDD",
- "success": True,
- },
- )
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 30},
- )
- assert hb.json()["pending_command"] is None
- assert hb.json()["pending_write_payload"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_cancel_write(self, async_client: AsyncClient, device_factory, spool_factory):
- device = await device_factory(device_id="sb-cancel")
- spool = await spool_factory()
- # Queue a write
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- # Cancel it
- resp = await async_client.post(f"{API}/devices/{device.device_id}/cancel-write", json={})
- assert resp.status_code == 200
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_cancel_write_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/cancel-write", json={})
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_write_tag_ndef_data_is_valid(self, async_client: AsyncClient, device_factory, spool_factory):
- """Verify the NDEF data in the heartbeat is a valid OpenTag3D message."""
- device = await device_factory(device_id="sb-wt-ndef")
- spool = await spool_factory(
- material="PLA",
- brand="Polymaker",
- color_name="White",
- rgba="FFFFFFFF",
- label_weight=1000,
- )
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": device.device_id, "spool_id": spool.id},
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/{device.device_id}/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- payload = hb.json()["pending_write_payload"]
- ndef_bytes = bytes.fromhex(payload["ndef_data_hex"])
- # CC bytes
- assert ndef_bytes[:4] == bytes([0xE1, 0x10, 0x12, 0x00])
- # TLV type
- assert ndef_bytes[4] == 0x03
- # NDEF record: TNF=MIME, type=application/opentag3d
- assert ndef_bytes[6] == 0xD2
- assert ndef_bytes[9:30] == b"application/opentag3d"
- # Terminator
- assert ndef_bytes[-1] == 0xFE
- # Total size fits NTAG213
- assert len(ndef_bytes) <= 144
- # ============================================================================
- # Scale endpoints
- # ============================================================================
- class TestScaleEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_scale_reading_broadcast(self, async_client: AsyncClient):
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/scale/reading",
- json={
- "device_id": "sb-1",
- "weight_grams": 823.5,
- "stable": True,
- "raw_adc": 456789,
- },
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_weight"
- assert msg["device_id"] == "sb-1"
- assert msg["weight_grams"] == 823.5
- assert msg["stable"] is True
- assert msg["raw_adc"] == 456789
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_calculates_correctly(self, async_client: AsyncClient, spool_factory):
- # label=1000g, core=250g, scale reads 750g
- # net_filament = max(0, 750 - 250) = 500
- # weight_used = max(0, 1000 - 500) = 500
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["weight_used"] == 500
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_full_spool(self, async_client: AsyncClient, spool_factory):
- # label=1000g, core=250g, scale reads 1250g (full spool)
- # net_filament = max(0, 1250 - 250) = 1000
- # weight_used = max(0, 1000 - 1000) = 0
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=200)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 1250},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["weight_used"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_stores_scale_reading(self, async_client: AsyncClient, spool_factory):
- """Verify last_scale_weight and last_weighed_at are stored after weight sync."""
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # Fetch the spool via inventory API to verify stored fields
- spool_resp = await async_client.get(f"/api/v1/inventory/spools/{spool.id}")
- assert spool_resp.status_code == 200
- spool_data = spool_resp.json()
- assert spool_data["last_scale_weight"] == 750
- assert spool_data["last_weighed_at"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_spool_weight_missing_spool_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 99999, "weight_grams": 500},
- )
- assert resp.status_code == 404
- # ============================================================================
- # Calibration endpoints
- # ============================================================================
- class TestCalibrationEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tare_queues_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-tare")
- resp = await async_client.post(f"{API}/devices/sb-tare/calibration/tare", json={})
- assert resp.status_code == 200
- assert resp.json()["status"] == "ok"
- # Verify pending_command via heartbeat
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-tare/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 1},
- )
- assert hb.json()["pending_command"] == "tare"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tare_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/calibration/tare", json={})
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_tare_offset(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-st", calibration_factor=0.005)
- resp = await async_client.post(
- f"{API}/devices/sb-st/calibration/set-tare",
- json={"tare_offset": 54321},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == 54321
- assert data["calibration_factor"] == pytest.approx(0.005)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_calibration_factor(self, async_client: AsyncClient, device_factory):
- # known_weight=200g, raw_adc=50000, tare=10000 → factor=200/(50000-10000)=0.005
- await device_factory(device_id="sb-cf", tare_offset=10000)
- resp = await async_client.post(
- f"{API}/devices/sb-cf/calibration/set-factor",
- json={"known_weight_grams": 200, "raw_adc": 50000},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["calibration_factor"] == pytest.approx(0.005)
- assert data["tare_offset"] == 10000
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_set_calibration_factor_zero_delta_400(self, async_client: AsyncClient, device_factory):
- # raw_adc == tare_offset → delta is 0 → 400 error
- await device_factory(device_id="sb-zero", tare_offset=5000)
- resp = await async_client.post(
- f"{API}/devices/sb-zero/calibration/set-factor",
- json={"known_weight_grams": 100, "raw_adc": 5000},
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_calibration(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-gcal",
- tare_offset=11111,
- calibration_factor=0.0042,
- )
- resp = await async_client.get(f"{API}/devices/sb-gcal/calibration")
- assert resp.status_code == 200
- data = resp.json()
- assert data["tare_offset"] == 11111
- assert data["calibration_factor"] == pytest.approx(0.0042)
- # ============================================================================
- # Display endpoints
- # ============================================================================
- class TestDisplayEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_settings(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp", display_brightness=100, display_blank_timeout=0)
- resp = await async_client.put(
- f"{API}/devices/sb-disp/display",
- json={"brightness": 75, "blank_timeout": 300},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["brightness"] == 75
- assert data["blank_timeout"] == 300
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_persists_via_heartbeat(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp-hb")
- await async_client.put(
- f"{API}/devices/sb-disp-hb/display",
- json={"brightness": 50, "blank_timeout": 600},
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-disp-hb/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["display_brightness"] == 50
- assert hb.json()["display_blank_timeout"] == 600
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.put(
- f"{API}/devices/ghost/display",
- json={"brightness": 50, "blank_timeout": 60},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_display_validates_brightness(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-disp-val")
- resp = await async_client.put(
- f"{API}/devices/sb-disp-val/display",
- json={"brightness": 150, "blank_timeout": 0},
- )
- assert resp.status_code == 422 # Validation error: brightness > 100
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_display_settings(self, async_client: AsyncClient, device_factory):
- """The kiosk idle watchdog (install/spoolbuddy-idle.sh) reads this
- endpoint on autostart to configure swayidle with the user-selected
- blank timeout before launching. See issue #937."""
- await device_factory(device_id="sb-disp-get", display_brightness=60, display_blank_timeout=450)
- resp = await async_client.get(f"{API}/devices/sb-disp-get/display")
- assert resp.status_code == 200
- data = resp.json()
- assert data["brightness"] == 60
- assert data["blank_timeout"] == 450
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_display_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices/ghost/display")
- assert resp.status_code == 404
- # ============================================================================
- # Update endpoints
- # ============================================================================
- class TestUpdateEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_starts_ssh_update(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(f"{API}/devices/sb-upd/update")
- assert resp.status_code == 200
- assert resp.json()["status"] == "ok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_offline_device_409(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-upd-off",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- resp = await async_client.post(f"{API}/devices/sb-upd-off/update")
- assert resp.status_code == 409
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(f"{API}/devices/ghost/update")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_already_updating(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-dup", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(f"{API}/devices/sb-upd-dup/update")
- assert resp.status_code == 200
- assert resp.json()["status"] == "already_updating"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_updating(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-st", pending_command="update", update_status="pending")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-upd-st/update-status",
- json={"status": "updating", "message": "Fetching latest code..."},
- )
- assert resp.status_code == 200
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_update"
- assert msg["update_status"] == "updating"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_complete_clears_command(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-done", pending_command="update", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- await async_client.post(
- f"{API}/devices/sb-upd-done/update-status",
- json={"status": "complete", "message": "Update complete, restarting..."},
- )
- # Heartbeat should have no pending command
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- hb = await async_client.post(
- f"{API}/devices/sb-upd-done/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 10},
- )
- assert hb.json()["pending_command"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_error(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-err", pending_command="update", update_status="updating")
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/devices/sb-upd-err/update-status",
- json={"status": "error", "message": "git fetch failed: network unreachable"},
- )
- assert resp.status_code == 200
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["update_status"] == "error"
- assert "git fetch failed" in msg["update_message"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/devices/ghost/update-status",
- json={"status": "updating", "message": "test"},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_invalid_status_422(self, async_client: AsyncClient, device_factory):
- """Arbitrary status strings must be rejected with 422 (H2: UpdateStatusRequest validation)."""
- await device_factory(device_id="sb-upd-inv")
- resp = await async_client.post(
- f"{API}/devices/sb-upd-inv/update-status",
- json={"status": "hacked", "message": "injected"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_report_update_status_oversized_message_422(self, async_client: AsyncClient, device_factory):
- """Message exceeding 255 chars must be rejected with 422 (H2/M4)."""
- await device_factory(device_id="sb-upd-big")
- resp = await async_client.post(
- f"{API}/devices/sb-upd-big/update-status",
- json={"status": "updating", "message": "x" * 256},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_ssh_public_key_error_does_not_leak_exception_text(self, async_client: AsyncClient):
- """SSH public-key 500 must not expose raw exception details (M3)."""
- from backend.app.services.spoolbuddy_ssh import get_public_key
- with patch(
- "backend.app.services.spoolbuddy_ssh.get_public_key",
- AsyncMock(side_effect=RuntimeError("REDACT_ME internal path /data/keys/id_ed25519")),
- ):
- resp = await async_client.get(f"{API}/ssh/public-key")
- assert resp.status_code == 500
- body = resp.json()["detail"]
- assert "REDACT_ME" not in body
- assert "/data/keys" not in body
- assert "id_ed25519" not in body
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_device_response_includes_update_fields(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-resp", update_status="complete", update_message="Done!")
- resp = await async_client.get(f"{API}/devices")
- assert resp.status_code == 200
- device = next(d for d in resp.json() if d["device_id"] == "sb-upd-resp")
- assert device["update_status"] == "complete"
- assert device["update_message"] == "Done!"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_returns_version_info(self, async_client: AsyncClient, device_factory):
- """GET /devices/{id}/update-check compares device version against APP_VERSION."""
- await device_factory(device_id="sb-uc", firmware_version="0.1.0")
- resp = await async_client.get(f"{API}/devices/sb-uc/update-check")
- assert resp.status_code == 200
- data = resp.json()
- assert data["current_version"] == "0.1.0"
- assert data["latest_version"] is not None
- assert data["update_available"] is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_up_to_date(self, async_client: AsyncClient, device_factory):
- from backend.app.core.config import APP_VERSION
- await device_factory(device_id="sb-uc2", firmware_version=APP_VERSION)
- resp = await async_client.get(f"{API}/devices/sb-uc2/update-check")
- assert resp.status_code == 200
- assert resp.json()["update_available"] is False
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_check_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.get(f"{API}/devices/ghost/update-check")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trigger_update_broadcasts_websocket(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-upd-ws")
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch("backend.app.services.spoolbuddy_ssh.perform_ssh_update", new_callable=AsyncMock),
- ):
- mock_ws.broadcast = AsyncMock()
- await async_client.post(f"{API}/devices/sb-upd-ws/update")
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_update"
- assert msg["device_id"] == "sb-upd-ws"
- assert msg["update_status"] == "pending"
- # ============================================================================
- # System command endpoints
- # ============================================================================
- class TestSystemCommandEndpoints:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_reboot(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-reboot")
- resp = await async_client.post(
- f"{API}/devices/sb-reboot/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["status"] == "queued"
- assert data["command"] == "reboot"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_shutdown(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-shutdown")
- resp = await async_client.post(
- f"{API}/devices/sb-shutdown/system/command",
- json={"command": "shutdown"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "shutdown"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_restart_daemon(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-rd")
- resp = await async_client.post(
- f"{API}/devices/sb-rd/system/command",
- json={"command": "restart_daemon"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "restart_daemon"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_queue_restart_browser(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-rb")
- resp = await async_client.post(
- f"{API}/devices/sb-rb/system/command",
- json={"command": "restart_browser"},
- )
- assert resp.status_code == 200
- assert resp.json()["command"] == "restart_browser"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_command_rejected(self, async_client: AsyncClient, device_factory):
- await device_factory(device_id="sb-invalid")
- resp = await async_client.post(
- f"{API}/devices/sb-invalid/system/command",
- json={"command": "format_disk"},
- )
- assert resp.status_code == 400
- assert "Invalid command" in resp.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_unknown_device_404(self, async_client: AsyncClient):
- resp = await async_client.post(
- f"{API}/devices/ghost/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_offline_device_409(self, async_client: AsyncClient, device_factory):
- await device_factory(
- device_id="sb-offline-cmd",
- last_seen=datetime.now(timezone.utc) - timedelta(seconds=120),
- )
- resp = await async_client.post(
- f"{API}/devices/sb-offline-cmd/system/command",
- json={"command": "reboot"},
- )
- assert resp.status_code == 409
- assert "offline" in resp.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_command_sets_pending_command(self, async_client: AsyncClient, device_factory, db_session):
- device = await device_factory(device_id="sb-pending")
- await async_client.post(
- f"{API}/devices/sb-pending/system/command",
- json={"command": "restart_daemon"},
- )
- await db_session.refresh(device)
- assert device.pending_command == "restart_daemon"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_heartbeat_clears_system_command(self, async_client: AsyncClient, device_factory):
- """System commands (reboot/shutdown/restart_*) are fire-and-forget — heartbeat clears them."""
- await device_factory(device_id="sb-hb-clear")
- # Queue a command
- await async_client.post(
- f"{API}/devices/sb-hb-clear/system/command",
- json={"command": "restart_browser"},
- )
- # Heartbeat should return the command and clear it
- resp = await async_client.post(
- f"{API}/devices/sb-hb-clear/heartbeat",
- json={"nfc_ok": True, "scale_ok": True, "uptime_s": 100},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["pending_command"] == "restart_browser"
- # ============================================================================
- # Spoolman-aware SpoolBuddy endpoints
- # ============================================================================
- @pytest.fixture
- async def spoolman_settings(db_session: AsyncSession):
- """Create Spoolman settings in the database (enabled with URL)."""
- from backend.app.models.settings import Settings
- settings = [
- Settings(key="spoolman_enabled", value="true"),
- Settings(key="spoolman_url", value="http://spoolman.local:7912"),
- ]
- for s in settings:
- db_session.add(s)
- await db_session.commit()
- return settings
- def _mock_spoolman_client(base_url: str = "http://spoolman.local:7912") -> MagicMock:
- client = MagicMock()
- client.base_url = base_url
- client.get_spools = AsyncMock(return_value=[])
- client.get_spool = AsyncMock(return_value={})
- client.find_spool_by_tag = AsyncMock(return_value=None)
- client.update_spool = AsyncMock(return_value=None)
- client.merge_spool_extra = AsyncMock(return_value={"id": 0})
- return client
- def _spoolman_spool_fixture(
- spool_id: int,
- spool_weight: float = 196.0,
- filament_weight: float = 1000.0,
- spool_level_spool_weight=None,
- ) -> dict:
- """Build a minimal Spoolman spool dict with realistic core weight from filament.spool_weight."""
- raw = {
- "id": spool_id,
- "filament": {"weight": filament_weight, "spool_weight": spool_weight},
- "used_weight": 0.0,
- }
- if spool_level_spool_weight is not None:
- raw["spool_weight"] = spool_level_spool_weight
- return raw
- class TestUpdateSpoolWeightSpoolman:
- """update-spool-weight routes to Spoolman when Spoolman mode is active."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_uses_filament_spool_weight(self, async_client: AsyncClient, spoolman_settings):
- """core_weight comes from filament.spool_weight, not a hardcoded constant."""
- sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["status"] == "ok"
- # remaining = max(0, 750 - 196) = 554 → weight_used = 1000 - 554 = 446
- assert data["weight_used"] == pytest.approx(446.0)
- mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(554.0))
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_clamps_remaining_to_zero(self, async_client: AsyncClient, spoolman_settings):
- """Scale weight below core weight → remaining_weight = 0."""
- sm_spool = _spoolman_spool_fixture(7, spool_weight=196.0, filament_weight=1000.0)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 7, "weight_grams": 100},
- )
- assert resp.status_code == 200
- mock_client.update_spool.assert_called_once_with(spool_id=7, remaining_weight=0.0)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_404_when_spool_not_found(self, async_client: AsyncClient, spoolman_settings):
- """404 when Spoolman doesn't know the spool."""
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 9999 not found"))
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 9999, "weight_grams": 500},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_mode_503_on_client_failure(self, async_client: AsyncClient, spoolman_settings):
- """503 is returned when Spoolman is unreachable during weight update."""
- sm_spool = _spoolman_spool_fixture(99)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(side_effect=SpoolmanUnavailableError("Spoolman down"))
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 99, "weight_grams": 500},
- )
- assert resp.status_code == 503
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_mode_unchanged(self, async_client: AsyncClient, spool_factory):
- """When Spoolman is NOT enabled, local DB update still works."""
- spool = await spool_factory(label_weight=1000, core_weight=250, weight_used=0)
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": spool.id, "weight_grams": 750},
- )
- assert resp.status_code == 200
- assert resp.json()["weight_used"] == 500
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spool_level_spool_weight_takes_priority(self, async_client: AsyncClient, spoolman_settings):
- """spool.spool_weight overrides filament.spool_weight for tare calculation."""
- sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0, spool_level_spool_weight=300)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
- patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # remaining = 750 - 300 = 450; weight_used = 1000 - 450 = 550
- assert resp.json()["weight_used"] == pytest.approx(550.0)
- mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(450.0))
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spool_level_zero_spool_weight_not_treated_as_missing(
- self, async_client: AsyncClient, spoolman_settings
- ):
- """spool.spool_weight=0 is valid (0g tare), not treated as missing/fallback."""
- sm_spool = _spoolman_spool_fixture(42, spool_weight=196.0, filament_weight=1000.0, spool_level_spool_weight=0)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
- patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # remaining = 750 - 0 = 750; weight_used = 1000 - 750 = 250
- assert resp.json()["weight_used"] == pytest.approx(250.0)
- mock_client.update_spool.assert_called_once_with(spool_id=42, remaining_weight=pytest.approx(750.0))
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_both_levels_none_uses_250g_fallback_and_warns(self, async_client: AsyncClient, spoolman_settings):
- """When both spool_weight and filament.spool_weight are None, 250g fallback is used with a warning."""
- sm_spool = {"id": 42, "filament": {"weight": 1000.0, "spool_weight": None}, "used_weight": 0.0}
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- mock_client.update_spool = AsyncMock(return_value=sm_spool)
- with (
- patch("backend.app.services.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client)),
- patch("backend.app.services.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client)),
- ):
- resp = await async_client.post(
- f"{API}/scale/update-spool-weight",
- json={"spool_id": 42, "weight_grams": 750},
- )
- assert resp.status_code == 200
- # remaining = 750 - 250 = 500; weight_used = 1000 - 500 = 500
- assert resp.json()["weight_used"] == pytest.approx(500.0)
- assert resp.json().get("warnings")
- class TestTagScannedSpoolmanFallback:
- """nfc/tag-scanned falls back to Spoolman when local DB has no match."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_fallback_on_local_miss(self, async_client: AsyncClient, spoolman_settings):
- raw_spool = {
- "id": 5,
- "filament": {
- "material": "PETG",
- "name": "PETG Basic",
- "color_hex": "00FF00",
- "weight": 1000,
- "vendor": {"name": "Polymaker"},
- },
- "used_weight": 100.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00+00:00",
- "extra": {"tag": '"DEADBEEF12345678"'},
- }
- mock_client = _mock_spoolman_client()
- mock_client.get_spools = AsyncMock(return_value=[raw_spool])
- mock_client.find_spool_by_tag = AsyncMock(return_value=raw_spool)
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=None,
- ),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "DEADBEEF12345678"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- assert data["spool_id"] == 5
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_matched"
- assert msg["spool"]["id"] == 5
- assert msg["spool"]["material"] == "PETG"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_fallback_unknown_when_no_spoolman_match(self, async_client: AsyncClient, spoolman_settings):
- """Unknown tag broadcast when both local DB and Spoolman miss."""
- mock_client = _mock_spoolman_client()
- mock_client.get_spools = AsyncMock(return_value=[])
- mock_client.find_spool_by_tag = AsyncMock(return_value=None)
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=None,
- ),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "UNKNOWN0000000FF"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
- assert data["spool_id"] is None
- mock_ws.broadcast.assert_called_once()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_unknown_tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_malformed_spoolman_data_degrades_gracefully(self, async_client: AsyncClient, spoolman_settings):
- """ValueError from _map_spoolman_spool (e.g. spool_id=0) must return matched=False without broadcasting unknown_tag."""
- bad_spool = {
- "id": 0, # _map_spoolman_spool raises ValueError for id <= 0
- "filament": {"material": "PLA", "name": "PLA Basic", "color_hex": "FF0000", "weight": 1000},
- "used_weight": 0.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00Z",
- "extra": {"tag": '"DEADBEEF12345678"'},
- }
- mock_client = _mock_spoolman_client()
- mock_client.find_spool_by_tag = AsyncMock(return_value=bad_spool)
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=None,
- ),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "DEADBEEF12345678"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
- assert data["spool_id"] is None
- # No broadcast: UI must not get a spurious unknown_tag event on Spoolman data errors
- mock_ws.broadcast.assert_not_called()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_match_skips_spoolman(self, async_client: AsyncClient, spool_factory):
- """When local DB matches, Spoolman is never queried."""
- spool = await spool_factory(tag_uid="AABB1122", material="PLA")
- mock_spool = MagicMock()
- mock_spool.id = spool.id
- mock_spool.material = spool.material
- mock_spool.subtype = spool.subtype
- mock_spool.color_name = spool.color_name
- mock_spool.rgba = spool.rgba
- mock_spool.brand = spool.brand
- mock_spool.label_weight = spool.label_weight
- mock_spool.core_weight = spool.core_weight
- mock_spool.weight_used = spool.weight_used
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.api.routes.spoolbuddy.get_spool_by_tag",
- new_callable=AsyncMock,
- return_value=mock_spool,
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-1", "tag_uid": "AABB1122"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is True
- assert data["spool_id"] == spool.id
- # ============================================================================
- # NFC write-tag / write-result — Spoolman-aware
- # ============================================================================
- def _full_spoolman_spool(spool_id: int) -> dict:
- """Complete Spoolman spool dict sufficient for NDEF encoding."""
- return {
- "id": spool_id,
- "filament": {
- "material": "PLA",
- "name": "PLA Basic",
- "color_hex": "FF0000",
- "weight": 1000.0,
- "spool_weight": 196.0,
- "vendor": {"name": "Bambu Lab"},
- },
- "used_weight": 0.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00Z",
- }
- class TestNfcWriteTagSpoolman:
- """nfc/write-tag falls back to Spoolman when local DB has no matching spool."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_spool_queued_when_local_miss(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """write-tag encodes NDEF from Spoolman data when spool not in local DB."""
- await device_factory(device_id="sb-write-sm")
- sm_spool = _full_spoolman_spool(77)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-write-sm", "spool_id": 77},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- mock_client.get_spool.assert_called_once_with(77)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_data_origin_spoolman_stored_in_payload(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """Pending write payload records data_origin=spoolman for Spoolman spools."""
- import json as _json
- device = await device_factory(device_id="sb-origin")
- sm_spool = _full_spoolman_spool(88)
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sm_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-origin", "spool_id": 88},
- )
- await db_session.refresh(device)
- payload = _json.loads(device.pending_write_payload)
- assert payload["data_origin"] == "spoolman"
- assert payload["spool_id"] == 88
- assert "ndef_data_hex" in payload
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_404_when_neither_local_nor_spoolman(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """404 returned when spool is missing from both local DB and Spoolman."""
- await device_factory(device_id="sb-miss")
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 9999 not found"))
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-miss", "spool_id": 9999},
- )
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_spool_used_when_present(self, async_client: AsyncClient, device_factory, spool_factory):
- """Local DB spool is encoded directly without contacting Spoolman."""
- await device_factory(device_id="sb-local-write")
- spool = await spool_factory(material="PETG")
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-local-write", "spool_id": spool.id},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- class TestNfcWriteResultSpoolman:
- """nfc/write-result updates Spoolman extra.tag on success for Spoolman spools."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_success_updates_spoolman_extra_tag(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """Successful write for a Spoolman spool calls merge_spool_extra with extra.tag."""
- import json as _json
- await device_factory(
- device_id="sb-wr-sm",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 55, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(return_value={"id": 55})
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-wr-sm",
- "spool_id": 55,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- assert resp.status_code == 200
- mock_client.merge_spool_extra.assert_called_once_with(55, {"tag": '"AABBCCDD11223344"'})
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_written"
- assert msg["tag_uid"] == "AABBCCDD11223344"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_failure_does_not_call_spoolman(self, async_client: AsyncClient, device_factory, spoolman_settings):
- """Failed write never calls Spoolman update."""
- import json as _json
- await device_factory(
- device_id="sb-wr-fail",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 66, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-wr-fail",
- "spool_id": 66,
- "tag_uid": "AABBCCDD11223344",
- "success": False,
- "message": "write timeout",
- },
- )
- assert resp.status_code == 200
- mock_client.update_spool.assert_not_called()
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_write_failed"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_success_local_spool_writes_to_db(
- self, async_client: AsyncClient, device_factory, spool_factory, db_session
- ):
- """Successful write for a local spool still updates local DB tag_uid."""
- import json as _json
- spool = await spool_factory()
- await device_factory(
- device_id="sb-wr-local",
- pending_command="write_tag",
- pending_write_payload=_json.dumps(
- {"spool_id": spool.id, "ndef_data_hex": "deadbeef", "data_origin": "local"}
- ),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-wr-local",
- "spool_id": spool.id,
- "tag_uid": "DEADBEEF12345678",
- "success": True,
- },
- )
- assert resp.status_code == 200
- await db_session.refresh(spool)
- assert spool.tag_uid == "DEADBEEF12345678"
- assert spool.tag_type == "ntag"
- # ============================================================================
- # Security fix tests — write-tag ValueError + write-result exception safety
- # ============================================================================
- class TestNfcWriteTagSpoolmanSecurityFixes:
- """Regression tests for security fixes in nfc/write-tag Spoolman path."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_spoolman_spool_id_returns_502(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """Malformed Spoolman spool (invalid id=0) raises 502, not 404 — spool exists but is bad data."""
- await device_factory(device_id="sb-invalid-id")
- # Spoolman returns spool with id=0 (invalid — caught by _map_spoolman_spool guard)
- bad_spool = {**_full_spoolman_spool(1), "id": 0}
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=bad_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-invalid-id", "spool_id": 99},
- )
- # 502: spool exists in Spoolman but its data is malformed — not a "not found"
- assert resp.status_code == 502
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oversized_label_weight_does_not_crash(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """label_weight > 65535 from Spoolman must not crash with struct.error."""
- await device_factory(device_id="sb-overflow")
- big_weight_spool = {
- **_full_spoolman_spool(42),
- "filament": {**_full_spoolman_spool(42)["filament"], "weight": 70000},
- }
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=big_weight_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-overflow", "spool_id": 42},
- )
- assert resp.status_code == 200
- assert resp.json()["status"] == "queued"
- class TestNfcWriteResultSpoolmanSecurityFixes:
- """Regression tests for transaction safety in nfc/write-result Spoolman path."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_client_exception_still_clears_device_state(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """If Spoolman client raises, device pending_command is still cleared in DB."""
- import json as _json
- device = await device_factory(
- device_id="sb-exc-safe",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 77, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(side_effect=Exception("connection refused"))
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-exc-safe",
- "spool_id": 77,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- # 502: tag written to NFC but Spoolman link failed (not best-effort — caller must retry)
- assert resp.status_code == 502
- # Device state must be cleared despite the exception (no spurious re-write)
- await db_session.refresh(device)
- assert device.pending_command is None
- assert device.pending_write_payload is None
- # Failure broadcast fires so the UI can show the error
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_link_failed"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_spoolman_not_found_error_broadcasts_link_failed(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """SpoolmanNotFoundError from merge_spool_extra must clear device state and broadcast link_failed."""
- import json as _json
- device = await device_factory(
- device_id="sb-notfound",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 55, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(side_effect=SpoolmanNotFoundError("Spool 55 not found"))
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-notfound",
- "spool_id": 55,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- assert resp.status_code == 502
- await db_session.refresh(device)
- assert device.pending_command is None
- assert device.pending_write_payload is None
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_link_failed"
- assert msg["spool_id"] == 55
- class TestNfcWriteResultOrphanedSpool:
- """nfc/write-result when the local spool was deleted between write-queue and write-result."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_local_spool_deleted_before_write_back(self, async_client: AsyncClient, device_factory, db_session):
- """When local spool is deleted between write-queue and write-result, return linked=False and broadcast link_failed."""
- import json as _json
- device = await device_factory(
- device_id="sb-orphan",
- pending_command="write_tag",
- pending_write_payload=_json.dumps(
- {
- "spool_id": 99999, # non-existent spool
- "ndef_data_hex": "aabbccdd",
- "data_origin": "local",
- }
- ),
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={"device_id": device.device_id, "spool_id": 99999, "success": True, "tag_uid": "AABBCCDD"},
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data["linked"] is False
- # pending command should be cleared
- await db_session.refresh(device)
- assert device.pending_command is None
- # broadcast should be spoolbuddy_tag_link_failed
- broadcast_calls = mock_ws.broadcast.call_args_list
- link_failed = [c[0][0] for c in broadcast_calls if c[0][0].get("type") == "spoolbuddy_tag_link_failed"]
- assert len(link_failed) >= 1
- class TestNfcWriteResultInputValidation:
- """Input validation and JSON safety for nfc/write-result."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_tag_uid_too_long_rejected(self, async_client: AsyncClient, device_factory):
- """tag_uid longer than 32 chars must be rejected with 422."""
- import json as _json
- await device_factory(
- device_id="sb-uid-long",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 1, "ndef_data_hex": "dead", "data_origin": "local"}),
- )
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-uid-long",
- "spool_id": 1,
- "tag_uid": "A" * 65,
- "success": True,
- },
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_malformed_pending_payload_falls_back_to_local(
- self, async_client: AsyncClient, device_factory, spool_factory, db_session
- ):
- """Corrupted pending_write_payload JSON falls back to local mode gracefully."""
- spool = await spool_factory()
- await device_factory(
- device_id="sb-corrupt-json",
- pending_command="write_tag",
- pending_write_payload="{not valid json!!!",
- )
- with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-corrupt-json",
- "spool_id": spool.id,
- "tag_uid": "DEADBEEF12345678",
- "success": True,
- },
- )
- # Must return 200, not 500
- assert resp.status_code == 200
- # Falls back to local mode — tag written to DB
- await db_session.refresh(spool)
- assert spool.tag_uid == "DEADBEEF12345678"
- # ============================================================================
- # B1: NFC write-tag warnings appear in response body
- # ============================================================================
- class TestNfcWriteTagWarningsBody:
- """B1: resp.json()['warnings'] is populated when Spoolman fields are absent."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_warnings_returned_for_missing_color_and_temp(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """Both color_name=None and settings_extruder_temp=None produce 2 warnings."""
- await device_factory(device_id="sb-warn-b1")
- # Spoolman spool with no color_name or nozzle temp
- sparse_spool = {
- "id": 99,
- "filament": {
- "material": "PLA",
- "name": "PLA Basic",
- "color_hex": "808080",
- # color_name absent → None after mapping
- # settings_extruder_temp absent → nozzle_temp_min=None
- "weight": 1000.0,
- "vendor": {"name": "Bambu Lab"},
- },
- "used_weight": 0.0,
- "archived": False,
- "registered": "2024-01-01T00:00:00Z",
- }
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=sparse_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-warn-b1", "spool_id": 99},
- )
- assert resp.status_code == 200
- body = resp.json()
- assert "warnings" in body, "Response should contain 'warnings' key when fields are absent"
- warnings = body["warnings"]
- assert len(warnings) >= 2, f"Expected at least 2 warnings for missing color_name + nozzle_temp, got: {warnings}"
- # Confirm the specific fields are mentioned
- warn_text = " ".join(warnings)
- assert "color_name" in warn_text
- assert "nozzle_temp" in warn_text
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_no_warnings_key_when_all_fields_present(
- self, async_client: AsyncClient, device_factory, spoolman_settings
- ):
- """No 'warnings' key in response when all fields are populated."""
- await device_factory(device_id="sb-nowarn")
- full_spool = _full_spoolman_spool(100)
- # Add color_name and extruder temp
- full_spool["filament"]["color_name"] = "Red"
- full_spool["filament"]["settings_extruder_temp"] = 220
- mock_client = _mock_spoolman_client()
- mock_client.get_spool = AsyncMock(return_value=full_spool)
- with (
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- resp = await async_client.post(
- f"{API}/nfc/write-tag",
- json={"device_id": "sb-nowarn", "spool_id": 100},
- )
- assert resp.status_code == 200
- body = resp.json()
- assert "warnings" not in body or body["warnings"] == []
- # ============================================================================
- # B5: Exception text scrubbed from WebSocket broadcast message
- # ============================================================================
- class TestNfcWriteResultExceptionScrubbing:
- """B5: Internal exception details must not appear in WebSocket 'message' field."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_exception_text_not_leaked_in_ws_message(
- self, async_client: AsyncClient, device_factory, db_session, spoolman_settings
- ):
- """When Spoolman merge raises, WS message is generic; 'connection refused' absent."""
- import json as _json
- await device_factory(
- device_id="sb-scrub-b5",
- pending_command="write_tag",
- pending_write_payload=_json.dumps({"spool_id": 77, "ndef_data_hex": "deadbeef", "data_origin": "spoolman"}),
- )
- mock_client = _mock_spoolman_client()
- mock_client.merge_spool_extra = AsyncMock(side_effect=Exception("connection refused to 192.168.1.1:7912"))
- with (
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(return_value=mock_client),
- ),
- ):
- mock_ws.broadcast = AsyncMock()
- resp = await async_client.post(
- f"{API}/nfc/write-result",
- json={
- "device_id": "sb-scrub-b5",
- "spool_id": 77,
- "tag_uid": "AABBCCDD11223344",
- "success": True,
- },
- )
- assert resp.status_code == 502
- msg = mock_ws.broadcast.call_args[0][0]
- assert msg["type"] == "spoolbuddy_tag_link_failed"
- # Generic message — no internal exception details leaked
- assert msg["message"] == "Spoolman link failed", f"Expected generic message but got: {msg['message']!r}"
- assert "connection refused" not in str(msg), f"Exception text must not appear in WS message: {msg}"
- assert "192.168.1" not in str(msg), f"Internal IP must not appear in WS message: {msg}"
- # ============================================================================
- # _get_spoolman_client_or_none: graceful degradation on ValueError during reinit
- # ============================================================================
- class TestSpoolmanClientOrNoneGraceful:
- """_get_spoolman_client_or_none returns None when init_spoolman_client raises ValueError."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_returns_none_when_init_raises_value_error(self, async_client: AsyncClient, db_session):
- """_get_spoolman_client_or_none returns None when init_spoolman_client raises ValueError,
- so the device endpoint degrades gracefully instead of propagating a 500 error."""
- from backend.app.models.settings import Settings
- db_session.add(Settings(key="spoolman_enabled", value="true"))
- db_session.add(Settings(key="spoolman_url", value="http://spoolman.local:7912"))
- await db_session.commit()
- with (
- patch("backend.app.api.routes._spoolman_helpers.assert_safe_spoolman_url"),
- patch(
- "backend.app.services.spoolman.get_spoolman_client",
- AsyncMock(return_value=None),
- ),
- patch(
- "backend.app.services.spoolman.init_spoolman_client",
- AsyncMock(side_effect=ValueError("invalid URL")),
- ),
- patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws,
- ):
- mock_ws.broadcast = AsyncMock()
- # nfc/tag-scanned calls _get_spoolman_client_or_none; with None returned it
- # must broadcast unknown_tag (not raise 500 due to ValueError propagating).
- resp = await async_client.post(
- f"{API}/nfc/tag-scanned",
- json={"device_id": "sb-vale", "tag_uid": "AABBCCDD"},
- )
- # Must not be 500 — ValueError is caught and client returns None, degrading gracefully
- assert resp.status_code == 200
- data = resp.json()
- assert data["matched"] is False
|