| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952 |
- """Tests for the VP MQTT bridge — non-proxy mirror of target printer state to slicer."""
- import asyncio
- import json
- from pathlib import Path
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from backend.app.services.virtual_printer.mqtt_bridge import MQTTBridge, _ip_to_uint32_le
- from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
- H2D_SERIAL = "0948BB540200427"
- VP_SERIAL = "09400A391800003"
- H2D_IP = "192.168.255.133"
- VP_IP = "192.168.255.16"
- def _make_server(serial: str = VP_SERIAL, bind_address: str = VP_IP) -> SimpleMQTTServer:
- return SimpleMQTTServer(
- serial=serial,
- access_code="deadbeef",
- cert_path=Path("/tmp/unused.crt"), # nosec B108
- key_path=Path("/tmp/unused.key"), # nosec B108
- model="O1D",
- bind_address=bind_address,
- )
- def _make_paho_client(
- serial: str = H2D_SERIAL,
- ip: str = H2D_IP,
- *,
- connected: bool = True,
- ) -> MagicMock:
- """Build a mock BambuMQTTClient that satisfies MQTTBridge's interface."""
- client = MagicMock()
- client.serial_number = serial
- client.ip_address = ip
- client.state = MagicMock()
- client.state.connected = connected
- client.publish_raw = MagicMock(return_value=True)
- client._raw_handlers: list = []
- def _register(handler):
- client._raw_handlers.append(handler)
- def _unregister(handler):
- if handler in client._raw_handlers:
- client._raw_handlers.remove(handler)
- client.register_raw_message_handler.side_effect = _register
- client.unregister_raw_message_handler.side_effect = _unregister
- # No-op for _request_version / request_status_update so the post-bind nudge doesn't crash.
- client._request_version = MagicMock()
- client.request_status_update = MagicMock()
- return client
- def _make_printer_manager(client) -> MagicMock:
- pm = MagicMock()
- pm.get_client = MagicMock(return_value=client)
- return pm
- def _make_bridge(server: SimpleMQTTServer, target: MagicMock | None = None) -> MQTTBridge:
- target = target if target is not None else _make_paho_client()
- pm = _make_printer_manager(target)
- return MQTTBridge(
- vp_id=1,
- vp_name="vp1",
- vp_serial=VP_SERIAL,
- target_printer_id=42,
- mqtt_server=server,
- printer_manager=pm,
- )
- # ---------------------------------------------------------------------------
- # Lifecycle
- # ---------------------------------------------------------------------------
- class TestBridgeLifecycle:
- @pytest.mark.asyncio
- async def test_start_registers_handler_on_target_client(self):
- target = _make_paho_client()
- bridge = _make_bridge(_make_server(), target)
- await bridge.start()
- assert len(target._raw_handlers) == 1
- assert bridge.is_active is True
- await bridge.stop()
- assert len(target._raw_handlers) == 0
- @pytest.mark.asyncio
- async def test_start_with_no_target_client_does_not_crash(self):
- pm = MagicMock()
- pm.get_client = MagicMock(return_value=None)
- bridge = MQTTBridge(
- vp_id=1,
- vp_name="vp1",
- vp_serial=VP_SERIAL,
- target_printer_id=42,
- mqtt_server=_make_server(),
- printer_manager=pm,
- )
- await bridge.start()
- assert bridge.is_active is False
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_resolve_rebinds_when_paho_client_replaced(self):
- """BambuMQTTClient is destroyed and recreated on connect_printer; bridge must rebind."""
- old_client = _make_paho_client(serial="REAL_OLD")
- new_client = _make_paho_client(serial="REAL_NEW")
- pm = _make_printer_manager(old_client)
- bridge = MQTTBridge(
- vp_id=1,
- vp_name="vp1",
- vp_serial=VP_SERIAL,
- target_printer_id=42,
- mqtt_server=_make_server(),
- printer_manager=pm,
- )
- await bridge.start()
- assert len(old_client._raw_handlers) == 1
- assert bridge._target_serial == "REAL_OLD"
- pm.get_client.return_value = new_client
- bridge._resolve_client()
- assert len(old_client._raw_handlers) == 0
- assert len(new_client._raw_handlers) == 1
- assert bridge._target_serial == "REAL_NEW"
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_post_bind_nudge_requests_version_and_status(self):
- target = _make_paho_client()
- bridge = _make_bridge(_make_server(), target)
- await bridge.start()
- target._request_version.assert_called_once()
- target.request_status_update.assert_called_once()
- await bridge.stop()
- # ---------------------------------------------------------------------------
- # Caching: push_status
- # ---------------------------------------------------------------------------
- class TestPushStatusCache:
- """push_status snapshots feed `_send_status_report` via the cache, not a fan-out."""
- @pytest.mark.asyncio
- async def test_push_status_is_cached_not_fanned_out(self):
- server = _make_server()
- server.push_raw_to_clients = AsyncMock()
- bridge = _make_bridge(server)
- await bridge.start()
- payload = json.dumps({"print": {"command": "push_status", "ams": {"ams": []}, "gcode_state": "IDLE"}}).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/report", payload)
- await asyncio.sleep(0.01)
- server.push_raw_to_clients.assert_not_awaited()
- cached = bridge.get_latest_print_state()
- assert cached is not None
- assert cached["command"] == "push_status"
- assert cached["gcode_state"] == "IDLE"
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_serial_rewritten_in_cached_push(self):
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- payload = json.dumps(
- {
- "print": {
- "command": "push_status",
- "upgrade_state": {"sn": H2D_SERIAL, "status": "IDLE"},
- }
- }
- ).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/report", payload)
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- assert cached["upgrade_state"]["sn"] == VP_SERIAL
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_net_info_ip_rewritten_to_vp_ip(self):
- """BambuStudio reads `net.info[].ip` (LE uint32) for the FTP destination —
- must be rewritten to the VP's bind IP or the slicer bypasses the VP."""
- server = _make_server(bind_address=VP_IP)
- bridge = _make_bridge(server)
- await bridge.start()
- h2d_le = _ip_to_uint32_le(H2D_IP)
- vp_le = _ip_to_uint32_le(VP_IP)
- payload = json.dumps(
- {
- "print": {
- "command": "push_status",
- "net": {"info": [{"ip": h2d_le, "mask": 0xFFFFFF}, {"ip": 0, "mask": 0}]},
- }
- }
- ).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/report", payload)
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- assert cached["net"]["info"][0]["ip"] == vp_le
- assert cached["net"]["info"][1]["ip"] == 0 # untouched
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_request_topic_message_is_ignored(self):
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- payload = json.dumps({"print": {"command": "push_status"}}).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/request", payload)
- await asyncio.sleep(0.01)
- assert bridge.get_latest_print_state() is None
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_incremental_push_preserves_ams_from_previous_cache(self):
- """Regression for #1371: Bambu firmware sends FULL push_status on
- pushall (with AMS/vt_tray/net/etc.) but typically OMITS those fields
- from 1 Hz incremental push_status updates. Without preserving the
- sticky keys across pushes, the cache forgets AMS info after the first
- incremental update, and BambuStudio (which reads the cache via the
- VP's 1 Hz status push) sees no AMS info until the user power-cycles
- the printer (forcing a fresh pushall).
- """
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- # 1. Initial pushall response with full state, AMS included.
- full_push = json.dumps(
- {
- "print": {
- "command": "push_status",
- "gcode_state": "IDLE",
- "wifi_signal": "-50dBm",
- "ams": {
- "ams": [
- {
- "id": "0",
- "tray": [
- {"id": "0", "tray_type": "PLA", "tray_color": "FF0000FF"},
- {"id": "1", "tray_type": "PETG", "tray_color": "00FF00FF"},
- ],
- }
- ],
- "tray_exist_bits": "3",
- },
- "vt_tray": {"id": "254", "tray_type": ""},
- "lights_report": [{"node": "chamber_light", "mode": "on"}],
- }
- }
- ).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/report", full_push)
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- assert cached["ams"]["ams"][0]["tray"][0]["tray_type"] == "PLA"
- assert cached["vt_tray"]["id"] == "254"
- assert cached["lights_report"][0]["mode"] == "on"
- # 2. Incremental push with only temp/wifi changes — NO ams field.
- # This is what the printer sends every ~1 s between full pushalls.
- incremental_push = json.dumps(
- {
- "print": {
- "command": "push_status",
- "wifi_signal": "-55dBm",
- "chamber_temper": 26.0,
- }
- }
- ).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/report", incremental_push)
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- # New fields take effect.
- assert cached["wifi_signal"] == "-55dBm"
- assert cached["chamber_temper"] == 26.0
- # Sticky fields preserved from the previous cache (the #1371 fix).
- assert "ams" in cached, "AMS field must be preserved across incremental pushes (#1371)"
- assert cached["ams"]["ams"][0]["tray"][0]["tray_type"] == "PLA"
- assert cached["ams"]["tray_exist_bits"] == "3"
- assert cached["vt_tray"]["id"] == "254"
- assert cached["lights_report"][0]["mode"] == "on"
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_partial_ams_status_update_preserves_unit_list(self):
- """#1387: Bambu firmware also sends `ams` updates where the key is
- present but the inner `ams` array is missing — e.g. just
- ``{ams_status: 1}`` or a humidity change. Before the deep-merge fix
- the bridge would overwrite the cached AMS with this stripped blob,
- the slicer would read it on the next 1 Hz push, and BambuStudio
- would drop the unit list and fall back to its "no AMS" render
- (only the external spool visible — the reporter's exact symptom).
- Now the partial update only mutates the fields it carries; the
- cached unit list survives.
- """
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- # 1. Pushall with full AMS state.
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {
- "ams": [
- {
- "id": "0",
- "humidity": "1",
- "tray": [{"id": "0", "tray_type": "PLA", "tray_color": "FF0000FF"}],
- }
- ],
- "tray_exist_bits": "1",
- "ams_status": "0",
- },
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- # 2. Partial AMS update — only `ams_status` and `humidity` changed.
- # No `ams.ams` array, so prev's unit list must be preserved.
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {"ams_status": "1", "humidity": "2"},
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- # Scalar fields take the new values.
- assert cached["ams"]["ams_status"] == "1"
- assert cached["ams"]["humidity"] == "2"
- # Unit + tray data preserved from the pushall.
- assert cached["ams"]["tray_exist_bits"] == "1"
- assert len(cached["ams"]["ams"]) == 1
- assert cached["ams"]["ams"][0]["tray"][0]["tray_type"] == "PLA"
- assert cached["ams"]["ams"][0]["tray"][0]["tray_color"] == "FF0000FF"
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_partial_ams_unit_update_preserves_other_units(self):
- """#1387: when multiple AMS units are configured (e.g. H2D with two
- AMS), an incremental push during a print typically only carries the
- unit / tray that changed state. Naive replacement of `ams.ams` wipes
- the other unit. The bridge merges unit-by-unit by id, preserving
- units the incremental doesn't mention.
- """
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- # 1. Pushall with two AMS units configured.
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {
- "ams": [
- {"id": "0", "tray": [{"id": "0", "tray_type": "PLA"}]},
- {"id": "1", "tray": [{"id": "0", "tray_type": "PETG"}]},
- ],
- "tray_exist_bits": "3",
- },
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- # 2. Tray-targeted incremental: unit 0 / tray 0 state changed.
- # Unit 1 is not in the update — must survive.
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {"ams": [{"id": "0", "tray": [{"id": "0", "state": "11"}]}]},
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- units = {u["id"]: u for u in cached["ams"]["ams"]}
- # Unit 0 keeps its tray_type from the pushall + picks up the new state.
- assert units["0"]["tray"][0]["tray_type"] == "PLA"
- assert units["0"]["tray"][0]["state"] == "11"
- # Unit 1 survives the incremental.
- assert "1" in units
- assert units["1"]["tray"][0]["tray_type"] == "PETG"
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_partial_ams_tray_update_preserves_other_trays(self):
- """Same shape as the unit-level test but at the tray level. AMS
- unit 0 has four trays; the incremental only mentions tray 0.
- Trays 1-3 must survive intact."""
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {
- "ams": [
- {
- "id": "0",
- "tray": [
- {"id": "0", "tray_type": "PLA", "tray_color": "FF0000FF"},
- {"id": "1", "tray_type": "PETG", "tray_color": "00FF00FF"},
- {"id": "2", "tray_type": "ABS", "tray_color": "0000FFFF"},
- {"id": "3", "tray_type": "TPU", "tray_color": "FFFF00FF"},
- ],
- }
- ],
- },
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {"ams": [{"id": "0", "tray": [{"id": "0", "state": "11"}]}]},
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- trays = {t["id"]: t for t in cached["ams"]["ams"][0]["tray"]}
- assert trays["0"]["tray_type"] == "PLA"
- assert trays["0"]["state"] == "11"
- # Trays not mentioned in the incremental survive intact.
- assert trays["1"]["tray_type"] == "PETG"
- assert trays["2"]["tray_type"] == "ABS"
- assert trays["3"]["tray_type"] == "TPU"
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_incoming_ams_update_replaces_cached_ams(self):
- """Counterpart to the #1371 fix: preservation only kicks in when the
- incoming push OMITS a sticky key. When the printer DOES send a fresh
- `ams` value (e.g. on a pushall, or when AMS state genuinely changes),
- that value must take effect — the preservation must not shadow real
- updates.
- """
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- # 1. Initial state: PLA in tray 0.
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {"ams": [{"id": "0", "tray": [{"id": "0", "tray_type": "PLA"}]}]},
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- # 2. Fresh push with PETG — must replace, not get shadowed by the old PLA.
- bridge._on_printer_raw(
- f"device/{H2D_SERIAL}/report",
- json.dumps(
- {
- "print": {
- "command": "push_status",
- "ams": {"ams": [{"id": "0", "tray": [{"id": "0", "tray_type": "PETG"}]}]},
- }
- }
- ).encode(),
- )
- await asyncio.sleep(0.01)
- cached = bridge.get_latest_print_state()
- assert cached["ams"]["ams"][0]["tray"][0]["tray_type"] == "PETG"
- await bridge.stop()
- # ---------------------------------------------------------------------------
- # Caching: get_version response
- # ---------------------------------------------------------------------------
- class TestVersionCache:
- @pytest.mark.asyncio
- async def test_get_version_response_caches_modules(self):
- server = _make_server()
- bridge = _make_bridge(server)
- await bridge.start()
- payload = json.dumps(
- {
- "info": {
- "command": "get_version",
- "module": [
- {"name": "ota", "sn": H2D_SERIAL, "sw_ver": "01.03.00.00"},
- {"name": "n3f/0", "sn": "AMS_HW_1", "sw_ver": "04.00.21.87"},
- ],
- }
- }
- ).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/report", payload)
- await asyncio.sleep(0.01)
- modules = bridge.get_latest_version_modules()
- assert modules is not None
- assert len(modules) == 2
- # Device-level sn rewritten; AMS-hardware sn left alone.
- assert modules[0]["sn"] == VP_SERIAL
- assert modules[1]["sn"] == "AMS_HW_1"
- await bridge.stop()
- # ---------------------------------------------------------------------------
- # Selective fan-out (everything that's not push_status / get_version)
- # ---------------------------------------------------------------------------
- class TestCommandResponseFanout:
- @pytest.mark.asyncio
- async def test_extrusion_cali_get_response_is_fanned_out(self):
- """Slicer's extrusion_cali_get goes to the printer; the printer's response
- must reach the slicer or BambuStudio's pre-flight blocks Send."""
- server = _make_server()
- server.push_raw_to_clients = AsyncMock()
- bridge = _make_bridge(server)
- await bridge.start()
- body = json.dumps({"print": {"command": "extrusion_cali_get", "filaments": []}}).encode()
- bridge._on_printer_raw(f"device/{H2D_SERIAL}/report", body)
- await asyncio.sleep(0.01)
- server.push_raw_to_clients.assert_awaited_once()
- topic, _payload = server.push_raw_to_clients.await_args.args
- assert topic == f"device/{VP_SERIAL}/report"
- await bridge.stop()
- # ---------------------------------------------------------------------------
- # Forwarding: slicer → printer
- # ---------------------------------------------------------------------------
- class TestForwardToPrinter:
- @pytest.mark.asyncio
- async def test_forward_publishes_to_real_serial_request_topic(self):
- target = _make_paho_client()
- bridge = _make_bridge(_make_server(), target)
- await bridge.start()
- ok = bridge.forward_to_printer({"print": {"command": "stop"}})
- assert ok is True
- target.publish_raw.assert_called_once()
- topic, payload = target.publish_raw.call_args.args
- assert topic == f"device/{H2D_SERIAL}/request"
- assert json.loads(payload) == {"print": {"command": "stop"}}
- await bridge.stop()
- @pytest.mark.asyncio
- async def test_forward_returns_false_when_not_bound(self):
- pm = MagicMock()
- pm.get_client = MagicMock(return_value=None)
- bridge = MQTTBridge(
- vp_id=1,
- vp_name="vp1",
- vp_serial=VP_SERIAL,
- target_printer_id=42,
- mqtt_server=_make_server(),
- printer_manager=pm,
- )
- await bridge.start()
- assert bridge.forward_to_printer({"print": {"command": "stop"}}) is False
- await bridge.stop()
- # ---------------------------------------------------------------------------
- # SimpleMQTTServer status response: cached-as-base
- # ---------------------------------------------------------------------------
- class TestStatusReportCachedAsBase:
- """`_send_status_report` sends near-byte-identical real data when bridge cache exists."""
- def _capture_published(self, server: SimpleMQTTServer):
- """Wrap _publish_to_report to capture (topic, payload_dict)."""
- published: list = []
- async def _capture(writer, payload, serial=""):
- published.append((serial or server.serial, payload))
- server._publish_to_report = _capture # type: ignore[assignment]
- return published
- @pytest.mark.asyncio
- async def test_uses_real_cache_when_bridge_active(self):
- server = _make_server()
- bridge = MagicMock()
- bridge.get_latest_print_state.return_value = {
- "command": "push_status",
- "msg": 0,
- "ams": {"ams": [{"id": "0"}]},
- "device": {"extruder": {"info": [{"id": 0}, {"id": 1}]}},
- "nozzle_diameter": "0.4",
- "nozzle_type": "HH01", # real H2D value, not synthetic 'hardened_steel'
- }
- server.set_bridge(bridge)
- published = self._capture_published(server)
- await server._send_status_report(MagicMock())
- assert len(published) == 1
- _serial, payload = published[0]
- # AMS / device / nozzle_type all from cache
- assert payload["print"]["nozzle_type"] == "HH01"
- assert payload["print"]["device"]["extruder"]["info"][1]["id"] == 1
- # Protocol fields under our control
- assert payload["print"]["command"] == "push_status"
- assert payload["print"]["gcode_state"] == "IDLE"
- @pytest.mark.asyncio
- async def test_falls_back_to_synthetic_when_no_cache(self):
- server = _make_server()
- bridge = MagicMock()
- bridge.get_latest_print_state.return_value = None
- server.set_bridge(bridge)
- published = self._capture_published(server)
- await server._send_status_report(MagicMock())
- assert len(published) == 1
- _serial, payload = published[0]
- # Synthetic baseline has stub fields like nozzle_type='hardened_steel'
- # and a `storage` field that the real H2D doesn't push.
- assert payload["print"]["nozzle_type"] == "hardened_steel"
- assert "storage" in payload["print"]
- @pytest.mark.asyncio
- async def test_storage_indicators_overlaid_for_send_preflight(self):
- """#1228: P1S/A1-class firmware doesn't always include the SD/storage
- fields BambuStudio's "Send" pre-flight reads. Without these the
- slicer rejects with 'storage needs to be inserted' before even
- attempting FTP. The cached-as-base path now overlays them so the
- pre-flight passes regardless of what the real printer reports.
- """
- server = _make_server()
- bridge = MagicMock()
- # Real P1S push without SD card inserted: home_flag has other bits set
- # but the SD bit (0x100) is clear; sdcard is False; no storage field.
- bridge.get_latest_print_state.return_value = {
- "command": "push_status",
- "msg": 0,
- "home_flag": 0x42,
- "sdcard": False,
- }
- server.set_bridge(bridge)
- published = self._capture_published(server)
- await server._send_status_report(MagicMock())
- _serial, payload = published[0]
- # SD bit ORed onto whatever was there — other bits preserved.
- assert payload["print"]["home_flag"] & 0x100 == 0x100
- assert payload["print"]["home_flag"] & 0x42 == 0x42
- # Force-set so a False from the printer doesn't trip the pre-flight.
- assert payload["print"]["sdcard"] is True
- # storage was missing — the overlay must inject a non-empty default.
- assert "storage" in payload["print"]
- assert payload["print"]["storage"]["free"] > 0
- assert payload["print"]["storage"]["total"] > 0
- @pytest.mark.asyncio
- async def test_storage_indicators_preserve_real_storage_when_present(self):
- """When the real printer DOES report a storage block, pass it through
- unchanged (the overlay only fills in the missing field, not overrides).
- """
- server = _make_server()
- bridge = MagicMock()
- real_storage = {"free": 12345, "total": 67890}
- bridge.get_latest_print_state.return_value = {
- "command": "push_status",
- "msg": 0,
- "home_flag": 0x100, # SD bit already set on the real printer
- "sdcard": True,
- "storage": real_storage,
- }
- server.set_bridge(bridge)
- published = self._capture_published(server)
- await server._send_status_report(MagicMock())
- _serial, payload = published[0]
- # SD bit OR is idempotent — already-set bit stays set.
- assert payload["print"]["home_flag"] == 0x100
- assert payload["print"]["sdcard"] is True
- # Real values pass through, NOT the synthetic defaults.
- assert payload["print"]["storage"] == real_storage
- @pytest.mark.asyncio
- async def test_overrides_protocol_fields_even_when_cache_present(self):
- """Cached value's gcode_state must NOT win over our local upload-state-machine value."""
- server = _make_server()
- server._gcode_state = "PREPARE"
- server._current_file = "foo.3mf"
- bridge = MagicMock()
- bridge.get_latest_print_state.return_value = {
- "command": "push_status",
- "gcode_state": "IDLE", # printer is idle; we are mid-FTP-upload
- "gcode_file": "",
- "gcode_file_prepare_percent": "0",
- }
- server.set_bridge(bridge)
- published = self._capture_published(server)
- await server._send_status_report(MagicMock())
- _serial, payload = published[0]
- assert payload["print"]["gcode_state"] == "PREPARE"
- assert payload["print"]["gcode_file"] == "foo.3mf"
- # ---------------------------------------------------------------------------
- # Wire format
- # ---------------------------------------------------------------------------
- class TestWireFormat:
- """BambuStudio's Send pre-flight rejects compact JSON — must match real printer's
- indented format (32K bytes for an idle H2D vs 14K compact)."""
- @pytest.mark.asyncio
- async def test_publish_uses_indent_4_json_format(self):
- server = _make_server()
- captured: list = []
- async def _capture_drain():
- pass
- writer = MagicMock()
- writer.write = lambda data: captured.append(data)
- writer.drain = AsyncMock()
- await server._publish_to_report(writer, {"print": {"command": "push_status", "ams": {}}})
- body = b"".join(captured)
- assert b'\n "print"' in body, "publish_to_report must use indent=4 JSON"
- # ---------------------------------------------------------------------------
- # Routing: _handle_publish
- # ---------------------------------------------------------------------------
- class TestPublishRouting:
- """Slicer-issued commands: project_file/gcode_file handled locally, everything
- else forwarded to the real printer."""
- def _build_publish_payload(self, topic: str, body: bytes) -> bytes:
- topic_bytes = topic.encode("utf-8")
- return bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF]) + topic_bytes + body
- def _attach_active_bridge(self, server: SimpleMQTTServer) -> MagicMock:
- bridge = MagicMock()
- bridge.is_active = True
- bridge.forward_to_printer = MagicMock(return_value=True)
- server.set_bridge(bridge)
- return bridge
- @pytest.mark.asyncio
- async def test_project_file_handled_locally_not_forwarded(self):
- server = _make_server()
- bridge = self._attach_active_bridge(server)
- writer = MagicMock()
- writer.write = MagicMock()
- writer.drain = AsyncMock()
- body = json.dumps({"print": {"command": "project_file", "subtask_name": "f", "sequence_id": "1"}}).encode()
- payload = self._build_publish_payload(f"device/{VP_SERIAL}/request", body)
- with patch.object(server, "_send_print_response", new=AsyncMock()) as mock_resp:
- await server._handle_publish(0x30, payload, writer, "client1")
- bridge.forward_to_printer.assert_not_called()
- mock_resp.assert_awaited_once()
- @pytest.mark.asyncio
- async def test_gcode_file_handled_locally_not_forwarded(self):
- server = _make_server()
- bridge = self._attach_active_bridge(server)
- writer = MagicMock()
- writer.write = MagicMock()
- writer.drain = AsyncMock()
- body = json.dumps({"print": {"command": "gcode_file", "subtask_name": "f.gcode", "sequence_id": "1"}}).encode()
- payload = self._build_publish_payload(f"device/{VP_SERIAL}/request", body)
- with patch.object(server, "_send_print_response", new=AsyncMock()):
- await server._handle_publish(0x30, payload, writer, "client1")
- bridge.forward_to_printer.assert_not_called()
- @pytest.mark.asyncio
- async def test_pushall_handled_locally_not_forwarded(self):
- server = _make_server()
- bridge = self._attach_active_bridge(server)
- writer = MagicMock()
- writer.write = MagicMock()
- writer.drain = AsyncMock()
- body = json.dumps({"pushing": {"command": "pushall", "sequence_id": "0"}}).encode()
- payload = self._build_publish_payload(f"device/{VP_SERIAL}/request", body)
- with patch.object(server, "_send_status_report", new=AsyncMock()) as mock_status:
- await server._handle_publish(0x30, payload, writer, "client1")
- # Synthetic answer fires (fast, low latency); no forwarding (the
- # cache already mirrors what the printer would respond with).
- bridge.forward_to_printer.assert_not_called()
- mock_status.assert_awaited_once()
- @pytest.mark.asyncio
- async def test_get_version_handled_locally_not_forwarded(self):
- server = _make_server()
- bridge = self._attach_active_bridge(server)
- writer = MagicMock()
- writer.write = MagicMock()
- writer.drain = AsyncMock()
- body = json.dumps({"info": {"command": "get_version", "sequence_id": "1"}}).encode()
- payload = self._build_publish_payload(f"device/{VP_SERIAL}/request", body)
- with patch.object(server, "_send_version_response", new=AsyncMock()) as mock_ver:
- await server._handle_publish(0x30, payload, writer, "client1")
- bridge.forward_to_printer.assert_not_called()
- mock_ver.assert_awaited_once()
- @pytest.mark.asyncio
- async def test_extrusion_cali_get_is_forwarded(self):
- """extrusion_cali_get fetches per-filament k-profiles — must reach the printer."""
- server = _make_server()
- bridge = self._attach_active_bridge(server)
- writer = MagicMock()
- writer.write = MagicMock()
- writer.drain = AsyncMock()
- body = json.dumps(
- {
- "print": {
- "command": "extrusion_cali_get",
- "filament_id": "",
- "nozzle_diameter": "0.4",
- "sequence_id": "5",
- }
- }
- ).encode()
- payload = self._build_publish_payload(f"device/{VP_SERIAL}/request", body)
- await server._handle_publish(0x30, payload, writer, "client1")
- bridge.forward_to_printer.assert_called_once()
- forwarded = bridge.forward_to_printer.call_args.args[0]
- assert forwarded["print"]["command"] == "extrusion_cali_get"
- @pytest.mark.asyncio
- async def test_print_stop_is_forwarded(self):
- server = _make_server()
- bridge = self._attach_active_bridge(server)
- writer = MagicMock()
- writer.write = MagicMock()
- writer.drain = AsyncMock()
- body = json.dumps({"print": {"command": "stop", "sequence_id": "5"}}).encode()
- payload = self._build_publish_payload(f"device/{VP_SERIAL}/request", body)
- await server._handle_publish(0x30, payload, writer, "client1")
- bridge.forward_to_printer.assert_called_once()
- # ---------------------------------------------------------------------------
- # IP encoding helper
- # ---------------------------------------------------------------------------
- class TestIpEncoding:
- def test_le_uint32_matches_real_h2d_capture(self):
- # 192.168.255.133 captured from real H2D's net.info[0].ip = 2248124608
- assert _ip_to_uint32_le("192.168.255.133") == 2248124608
- def test_vp_ip_round_trip(self):
- assert _ip_to_uint32_le("192.168.255.16") == 285190336
- def test_invalid_ip_raises(self):
- with pytest.raises(ValueError):
- _ip_to_uint32_le("not.an.ip.actually")
|