test_vp_mqtt_server.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. """Tests for Virtual Printer MQTT server."""
  2. import ast
  3. import asyncio
  4. import inspect
  5. import json
  6. from pathlib import Path
  7. from unittest.mock import AsyncMock, MagicMock
  8. import pytest
  9. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  10. class TestMQTTServerNoGlobalState:
  11. """Ensure MQTT server doesn't set global asyncio state."""
  12. def test_no_global_exception_handler(self):
  13. """MQTT server must not call set_exception_handler().
  14. set_exception_handler() is global to the event loop. When multiple
  15. VP instances run, each would overwrite the previous handler,
  16. causing lost error context and spurious 'Unhandled exception in
  17. client_connected_cb' messages.
  18. """
  19. source = inspect.getsource(SimpleMQTTServer)
  20. tree = ast.parse(source)
  21. for node in ast.walk(tree):
  22. if isinstance(node, ast.Attribute) and node.attr == "set_exception_handler":
  23. raise AssertionError(
  24. "SimpleMQTTServer must not call set_exception_handler(). "
  25. "It overwrites the global asyncio exception handler, "
  26. "breaking multi-VP setups."
  27. )
  28. def _make_server(serial: str = "01P00A391800001") -> SimpleMQTTServer:
  29. """Build a SimpleMQTTServer with dummy cert paths (start() is never called)."""
  30. return SimpleMQTTServer(
  31. serial=serial,
  32. access_code="deadbeef",
  33. cert_path=Path("/tmp/unused.crt"), # nosec B108
  34. key_path=Path("/tmp/unused.key"), # nosec B108
  35. model="C12",
  36. )
  37. class TestExtractSerialFromTopic:
  38. """_extract_serial_from_topic should pull the serial out of device topics."""
  39. @pytest.mark.parametrize(
  40. "topic,expected",
  41. [
  42. ("device/01P00A391800001/request", "01P00A391800001"),
  43. ("device/09400A391800003/report", "09400A391800003"),
  44. ("device/00M00A391800004/request/subpath", "00M00A391800004"),
  45. ],
  46. )
  47. def test_valid_topics(self, topic, expected):
  48. assert SimpleMQTTServer._extract_serial_from_topic(topic) == expected
  49. @pytest.mark.parametrize(
  50. "topic",
  51. [
  52. "",
  53. "device/",
  54. "device//request", # empty serial
  55. "notdevice/01P00A/request",
  56. "random",
  57. ],
  58. )
  59. def test_invalid_topics(self, topic):
  60. assert SimpleMQTTServer._extract_serial_from_topic(topic) is None
  61. def _build_publish_payload(topic: str, message: dict) -> bytes:
  62. """Build the MQTT PUBLISH packet *payload* (past the fixed header byte)."""
  63. topic_bytes = topic.encode("utf-8")
  64. message_bytes = json.dumps(message).encode("utf-8")
  65. return len(topic_bytes).to_bytes(2, "big") + topic_bytes + message_bytes
  66. class TestPublishHandlerAdaptiveSerial:
  67. """#927: `_handle_publish` must accept any `device/*/request` topic from an
  68. authenticated client and use the topic's serial for all responses."""
  69. def test_handle_publish_accepts_mismatched_serial(self):
  70. """Prior behavior silently dropped publishes whose topic serial didn't
  71. equal self.serial. After the fix the handler must run and learn the
  72. client's serial.
  73. """
  74. server = _make_server(serial="01P00A391800001") # synthetic VP serial
  75. server._client_serials["test-client"] = server.serial # simulate post-CONNECT
  76. writer = MagicMock()
  77. writer.write = MagicMock()
  78. writer.drain = AsyncMock()
  79. # Slicer publishes with a *different* serial — the exact bug from #927.
  80. topic = "device/01P00AABCDEFGHI/request"
  81. payload = _build_publish_payload(topic, {"info": {"command": "get_version", "sequence_id": "42"}})
  82. asyncio.run(server._handle_publish(0x30, payload, writer, "test-client"))
  83. # Learned the client's serial.
  84. assert server._client_serials["test-client"] == "01P00AABCDEFGHI"
  85. # Wrote at least one packet to the slicer (the version response).
  86. assert writer.write.called
  87. all_bytes = b"".join(call.args[0] for call in writer.write.call_args_list)
  88. # Response topic must contain the *client's* serial, not self.serial.
  89. assert b"device/01P00AABCDEFGHI/report" in all_bytes
  90. assert b"device/01P00A391800001/report" not in all_bytes
  91. # Response body carries get_version with the client's serial as sn.
  92. assert b'"command": "get_version"' in all_bytes
  93. assert b'"sn": "01P00AABCDEFGHI"' in all_bytes
  94. def test_handle_publish_ignores_non_request_topics(self):
  95. server = _make_server()
  96. server._client_serials["c1"] = server.serial
  97. writer = MagicMock()
  98. writer.write = MagicMock()
  99. writer.drain = AsyncMock()
  100. payload = _build_publish_payload(
  101. "device/01P00AABCDEFGHI/report", # /report, not /request
  102. {"pushing": {"command": "pushall"}},
  103. )
  104. asyncio.run(server._handle_publish(0x30, payload, writer, "c1"))
  105. assert not writer.write.called # no response
  106. # Client serial unchanged
  107. assert server._client_serials["c1"] == server.serial
  108. def test_handle_publish_pushall_uses_client_serial(self):
  109. """pushall → status_report must be sent on the client's subscribed topic."""
  110. server = _make_server(serial="01P00A391800001")
  111. server._client_serials["c1"] = server.serial
  112. writer = MagicMock()
  113. writer.write = MagicMock()
  114. writer.drain = AsyncMock()
  115. payload = _build_publish_payload(
  116. "device/CUSTOMSERIAL123/request",
  117. {"pushing": {"command": "pushall", "sequence_id": "1"}},
  118. )
  119. asyncio.run(server._handle_publish(0x30, payload, writer, "c1"))
  120. all_bytes = b"".join(call.args[0] for call in writer.write.call_args_list)
  121. assert b"device/CUSTOMSERIAL123/report" in all_bytes
  122. assert b'"command": "push_status"' in all_bytes
  123. assert server._client_serials["c1"] == "CUSTOMSERIAL123"
  124. def test_handle_publish_tolerates_null_terminated_payload(self):
  125. """#927: OrcaSlicer on Linux appends the C-string \\0 to MQTT payloads.
  126. The handler must still parse and respond rather than silently dropping."""
  127. server = _make_server(serial="01P00A391800001")
  128. server._client_serials["c1"] = server.serial
  129. writer = MagicMock()
  130. writer.write = MagicMock()
  131. writer.drain = AsyncMock()
  132. topic = "device/01P00A391800001/request"
  133. topic_bytes = topic.encode("utf-8")
  134. # Real-world bytes captured from EdwardChamberlain's support log: the
  135. # JSON ends with an extra \x00 that strict json.loads rejects.
  136. message_bytes = b'{"pushing":{"command":"pushall","sequence_id":"7"}}\x00'
  137. payload = len(topic_bytes).to_bytes(2, "big") + topic_bytes + message_bytes
  138. asyncio.run(server._handle_publish(0x30, payload, writer, "c1"))
  139. all_bytes = b"".join(call.args[0] for call in writer.write.call_args_list)
  140. assert b"device/01P00A391800001/report" in all_bytes
  141. assert b'"command": "push_status"' in all_bytes
  142. class TestClientSerialLifecycle:
  143. """_client_serials must be cleaned up on disconnect/stop to avoid leaks."""
  144. def test_stop_clears_client_serials(self):
  145. server = _make_server()
  146. server._client_serials["a"] = "X"
  147. server._client_serials["b"] = "Y"
  148. # stop() is async but we only need to cover the clear() path; run a minimal version
  149. asyncio.run(server.stop())
  150. assert server._client_serials == {}
  151. def _build_connect_payload(
  152. keep_alive: int,
  153. access_code: str = "deadbeef",
  154. username: str = "bblp",
  155. client_id: str = "orca",
  156. ) -> bytes:
  157. """Build an MQTT CONNECT variable-header + payload (without the fixed header).
  158. Layout matches the parser in `_handle_connect`:
  159. proto_name_len(2) + "MQTT"(4) + level(1) + flags(1) + keepalive(2) +
  160. client_id_len(2) + client_id + username_len(2) + username +
  161. password_len(2) + password.
  162. """
  163. proto = b"MQTT"
  164. parts = bytearray()
  165. parts += len(proto).to_bytes(2, "big") + proto
  166. parts += bytes([0x04, 0xC2]) # protocol level 4 (MQTT 3.1.1), flags: user+pass+clean
  167. parts += keep_alive.to_bytes(2, "big")
  168. cid = client_id.encode("utf-8")
  169. parts += len(cid).to_bytes(2, "big") + cid
  170. user = username.encode("utf-8")
  171. parts += len(user).to_bytes(2, "big") + user
  172. pw = access_code.encode("utf-8")
  173. parts += len(pw).to_bytes(2, "big") + pw
  174. return bytes(parts)
  175. class TestHandleConnectKeepalive:
  176. """`_handle_connect` must return the negotiated keepalive (#1548).
  177. Pre-fix, the parser ignored this field and the read loop fell back to
  178. a hardcoded 60 s timeout, closing OrcaSlicer's idle MQTT connection
  179. after exactly 60 s instead of waiting 1.5× the client-negotiated
  180. keepalive as MQTT spec §4.4 requires.
  181. """
  182. def test_returns_negotiated_keepalive_on_auth_success(self):
  183. server = _make_server()
  184. writer = MagicMock()
  185. writer.write = MagicMock()
  186. writer.drain = AsyncMock()
  187. # Also stub status-report writes triggered post-auth
  188. payload = _build_connect_payload(keep_alive=120)
  189. result = asyncio.run(server._handle_connect(payload, writer))
  190. assert result == (True, 120)
  191. def test_returns_zero_keepalive_for_no_keepalive_clients(self):
  192. """`keep_alive == 0` in CONNECT means the client opted out per spec
  193. §3.1.2.10 — server must report it back so the read loop can drop
  194. the timeout entirely."""
  195. server = _make_server()
  196. writer = MagicMock()
  197. writer.write = MagicMock()
  198. writer.drain = AsyncMock()
  199. payload = _build_connect_payload(keep_alive=0)
  200. result = asyncio.run(server._handle_connect(payload, writer))
  201. assert result == (True, 0)
  202. def test_returns_false_with_zero_keepalive_on_auth_failure(self):
  203. """Bad password path still returns the tuple shape so the caller's
  204. unpack doesn't break."""
  205. server = _make_server()
  206. writer = MagicMock()
  207. writer.write = MagicMock()
  208. writer.drain = AsyncMock()
  209. payload = _build_connect_payload(keep_alive=60, access_code="wrong")
  210. result = asyncio.run(server._handle_connect(payload, writer))
  211. assert result == (False, 0)
  212. def test_returns_false_with_zero_keepalive_on_parse_error(self):
  213. """Malformed CONNECT (e.g. truncated) must not crash and must
  214. still hand a tuple back to the caller."""
  215. server = _make_server()
  216. writer = MagicMock()
  217. writer.write = MagicMock()
  218. writer.drain = AsyncMock()
  219. # 3 bytes is far shorter than even the protocol-name prefix needs.
  220. result = asyncio.run(server._handle_connect(b"\x00\x04MQ", writer))
  221. assert result == (False, 0)
  222. class TestHandleClientHonoursKeepalive:
  223. """`_handle_client` must use the client-negotiated keepalive for its
  224. read-loop timeout, not the hardcoded 60 s default (#1548)."""
  225. @pytest.mark.asyncio
  226. async def test_idle_client_kept_alive_beyond_60s_when_keepalive_is_long(self):
  227. """The literal #1548 repro: a client negotiates keepalive=180 and
  228. then sits idle. Pre-fix the read loop closed the connection after
  229. 60 s (hardcoded). Post-fix the timeout is 1.5×180=270 s — so the
  230. connection is still open after the original 60 s boundary."""
  231. server = _make_server()
  232. server._running = True
  233. reader = asyncio.StreamReader()
  234. # Feed CONNECT (with fixed header byte 0x10 + remaining length)
  235. connect_payload = _build_connect_payload(keep_alive=180)
  236. rl = len(connect_payload)
  237. # MQTT remaining-length encoding for values <128 is a single byte.
  238. assert rl < 128
  239. reader.feed_data(bytes([0x10, rl]) + connect_payload)
  240. # No further data — client goes idle.
  241. writer = MagicMock()
  242. writer.write = MagicMock()
  243. writer.drain = AsyncMock()
  244. writer.close = MagicMock()
  245. writer.wait_closed = AsyncMock()
  246. writer.get_extra_info = MagicMock(return_value=("1.2.3.4", 12345))
  247. # Patch the post-auth status-report send so the handler doesn't
  248. # depend on a real serial/payload path.
  249. server._send_status_report = AsyncMock()
  250. task = asyncio.create_task(server._handle_client(reader, writer))
  251. # Wait past the old hardcoded 60 s threshold by a margin. Real-time
  252. # 60 s would be far too slow for a unit test — drive simulated time
  253. # by yielding repeatedly. asyncio.wait_for with a real wall-clock
  254. # delay would actually consume 60 s of test time, so instead we
  255. # patch the timeout to a small value and assert the timeout chosen
  256. # by the loop matches our expectation.
  257. # Approach: let the task progress past the CONNECT, then cancel.
  258. await asyncio.sleep(0.1) # give the loop a chance to process CONNECT
  259. # The post-auth read should now be waiting on reader with the
  260. # negotiated keepalive. We can't observe the timeout directly, so
  261. # we just verify the connection wasn't closed by inspecting close().
  262. assert not writer.close.called, "connection should still be open after CONNECT"
  263. # Cancel cleanly
  264. task.cancel()
  265. try:
  266. await task
  267. except asyncio.CancelledError:
  268. pass
  269. @pytest.mark.asyncio
  270. async def test_idle_client_closed_after_one_and_a_half_times_keepalive(self):
  271. """Tight verification: keepalive=2 must close the connection in
  272. ~3 s (1.5×) of idle, well above the noise floor for an async test."""
  273. server = _make_server()
  274. server._running = True
  275. reader = asyncio.StreamReader()
  276. connect_payload = _build_connect_payload(keep_alive=2)
  277. rl = len(connect_payload)
  278. assert rl < 128
  279. reader.feed_data(bytes([0x10, rl]) + connect_payload)
  280. writer = MagicMock()
  281. writer.write = MagicMock()
  282. writer.drain = AsyncMock()
  283. writer.close = MagicMock()
  284. writer.wait_closed = AsyncMock()
  285. writer.get_extra_info = MagicMock(return_value=("1.2.3.4", 12345))
  286. server._send_status_report = AsyncMock()
  287. start = asyncio.get_event_loop().time()
  288. await server._handle_client(reader, writer)
  289. elapsed = asyncio.get_event_loop().time() - start
  290. # 1.5×2s = 3s expected. Allow ±1s slop for the read of CONNECT
  291. # itself + scheduler jitter on a loaded CI box.
  292. assert 2.0 < elapsed < 4.5, f"expected ~3s timeout, got {elapsed:.2f}s"
  293. @pytest.mark.asyncio
  294. async def test_pingreq_resets_idle_timeout(self):
  295. """A PINGREQ within the keepalive window must keep the connection
  296. open — the per-packet read timeout is restarted on every byte
  297. delivered, so the next idle window is measured from the PINGREQ."""
  298. server = _make_server()
  299. server._running = True
  300. reader = asyncio.StreamReader()
  301. connect_payload = _build_connect_payload(keep_alive=2)
  302. rl = len(connect_payload)
  303. assert rl < 128
  304. reader.feed_data(bytes([0x10, rl]) + connect_payload)
  305. writer = MagicMock()
  306. writer.write = MagicMock()
  307. writer.drain = AsyncMock()
  308. writer.close = MagicMock()
  309. writer.wait_closed = AsyncMock()
  310. writer.get_extra_info = MagicMock(return_value=("1.2.3.4", 12345))
  311. server._send_status_report = AsyncMock()
  312. async def _drive():
  313. # Feed a PINGREQ (0xC0 0x00 — type 12 with zero remaining length)
  314. # at 2s, which is 1s *before* the would-be timeout, and a
  315. # DISCONNECT at 2.5s so the test exits deterministically.
  316. await asyncio.sleep(2.0)
  317. reader.feed_data(bytes([0xC0, 0x00]))
  318. await asyncio.sleep(0.5)
  319. reader.feed_data(bytes([0xE0, 0x00])) # DISCONNECT
  320. driver = asyncio.create_task(_drive())
  321. start = asyncio.get_event_loop().time()
  322. await server._handle_client(reader, writer)
  323. elapsed = asyncio.get_event_loop().time() - start
  324. await driver # ensure no orphan task
  325. # Exit was via DISCONNECT at ~2.5s, NOT a 3s keepalive timeout.
  326. # Allow generous slop.
  327. assert 2.0 < elapsed < 3.0, f"expected exit on DISCONNECT near 2.5s, got {elapsed:.2f}s"
  328. class TestAuthRateLimit:
  329. """Per-IP rate-limiting of MQTT CONNECT auth attempts.
  330. Bambuddy's VP exposes an 8-char access code via the slicer-facing MQTT
  331. server. Without a rate-limit the code is brute-forceable by anyone who
  332. can reach the VP's bind IP (LAN or VPN). The limiter records each
  333. failed auth attempt per source IP and rejects further CONNECTs from
  334. that IP once the per-window threshold is crossed, then auto-recovers
  335. when the window expires. Verified here against the production
  336. constants imported from the module.
  337. """
  338. @pytest.fixture
  339. def server(self):
  340. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  341. return _make_server(serial="01P00A391800002")
  342. def test_under_limit_attempts_are_allowed(self, server):
  343. from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
  344. ip = "192.168.1.50"
  345. # Record (max-1) failures and verify the next attempt is still allowed.
  346. for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS - 1):
  347. server._record_auth_failure(ip)
  348. assert server._is_auth_rate_limited(ip) is False
  349. def test_exactly_max_attempts_triggers_rate_limit(self, server):
  350. from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
  351. ip = "192.168.1.50"
  352. for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS):
  353. server._record_auth_failure(ip)
  354. # At exactly the cap, further attempts must be rejected.
  355. assert server._is_auth_rate_limited(ip) is True
  356. def test_window_recovery_clears_old_failures(self, server):
  357. """A burst of failures older than the window must NOT count
  358. against the IP — the limiter is sliding, not cumulative."""
  359. import time as _time
  360. from backend.app.services.virtual_printer.mqtt_server import (
  361. _AUTH_RATE_LIMIT_MAX_ATTEMPTS,
  362. _AUTH_RATE_LIMIT_WINDOW_SECONDS,
  363. )
  364. ip = "192.168.1.50"
  365. # Inject stale timestamps directly — older than the window means the
  366. # limiter should drop them on the next probe.
  367. stale = _time.monotonic() - _AUTH_RATE_LIMIT_WINDOW_SECONDS - 1.0
  368. server._auth_failures[ip] = [stale] * _AUTH_RATE_LIMIT_MAX_ATTEMPTS
  369. # All recorded failures are outside the window — IP is no longer rate-limited.
  370. assert server._is_auth_rate_limited(ip) is False
  371. # And the dict entry was pruned (empty) instead of leaking forever.
  372. assert ip not in server._auth_failures
  373. def test_multiple_ips_tracked_independently(self, server):
  374. from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
  375. # One IP exhausts the budget; another IP must still be allowed.
  376. for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS):
  377. server._record_auth_failure("10.0.0.1")
  378. assert server._is_auth_rate_limited("10.0.0.1") is True
  379. assert server._is_auth_rate_limited("10.0.0.2") is False
  380. def test_successful_auth_clears_failure_history(self, server):
  381. """A successful auth must wipe the IP's prior-failures stash so the
  382. user isn't penalised for typos that they ultimately corrected."""
  383. from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
  384. ip = "192.168.1.50"
  385. # Build up failures one short of the cap.
  386. for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS - 1):
  387. server._record_auth_failure(ip)
  388. # Successful auth must clear them.
  389. server._clear_auth_failures(ip)
  390. # Now a subsequent failure starts the count over at 1 (well under cap).
  391. server._record_auth_failure(ip)
  392. assert server._is_auth_rate_limited(ip) is False
  393. class TestPendingRequestRouting:
  394. """`push_raw_to_clients` routes the printer's response back only to the
  395. slicer that originated the request, not to every connected slicer.
  396. The bridge calls `push_raw_to_clients(topic, payload)` for every
  397. response it sees from the real printer. Before the fix, this fanned
  398. out to every connected slicer — leaking slicer A's
  399. `extrusion_cali_get` response into slicer B's command stream. The
  400. fix records `sequence_id → client_id` on the way out and looks it
  401. back up on the way in.
  402. """
  403. @pytest.fixture
  404. def server(self):
  405. return _make_server(serial="01P00A391800003")
  406. def test_single_slicer_routes_to_that_slicer(self, server):
  407. """Sanity check: when one slicer is connected, the response goes
  408. to it regardless of whether the seq_id was recorded."""
  409. # No recorded request, no slicer seen → returns None (broadcast).
  410. assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "999"}}') is None
  411. def test_record_pending_request_walks_nested_blocks(self, server):
  412. """The slicer wraps its sequence_id under whichever subsystem the
  413. command targets (`print`, `info`, `system`, …). The helper must
  414. find it regardless of which key it's nested under."""
  415. server._record_pending_request(
  416. {"print": {"command": "extrusion_cali_get", "sequence_id": "42"}},
  417. "clientA",
  418. )
  419. assert server._pending_requests.get("42") == "clientA"
  420. server._record_pending_request(
  421. {"info": {"command": "get_version", "sequence_id": "43"}},
  422. "clientB",
  423. )
  424. assert server._pending_requests.get("43") == "clientB"
  425. def test_lookup_pops_entry_so_each_response_routes_once(self, server):
  426. """Once a response is matched, the pending entry is consumed so
  427. a later coincidental sequence_id from a printer-initiated push
  428. doesn't mis-route to the original client."""
  429. server._record_pending_request({"print": {"sequence_id": "100"}}, "clientA")
  430. # First lookup finds it…
  431. assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "100"}}') == "clientA"
  432. # …and removes it. Second lookup with the same seq returns None
  433. # (treated as printer-initiated → broadcast fallback).
  434. assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "100"}}') is None
  435. def test_fifo_eviction_when_cache_fills(self, server):
  436. """If a slicer sends many commands without responses (or the
  437. responses never arrive), the oldest entries age out so the dict
  438. can't grow unbounded."""
  439. from backend.app.services.virtual_printer.mqtt_server import _PENDING_REQUEST_MAX_ENTRIES
  440. # Fill the dict to one over the cap.
  441. for i in range(_PENDING_REQUEST_MAX_ENTRIES + 1):
  442. server._record_pending_request({"print": {"sequence_id": str(i)}}, "clientA")
  443. # The dict is capped — the oldest entry ("0") is gone, the newest is in.
  444. assert len(server._pending_requests) <= _PENDING_REQUEST_MAX_ENTRIES
  445. assert "0" not in server._pending_requests
  446. assert str(_PENDING_REQUEST_MAX_ENTRIES) in server._pending_requests
  447. def test_response_without_recorded_seq_returns_none_for_broadcast(self, server):
  448. """Printer-initiated pushes (push_status etc.) have a sequence_id
  449. the bridge never saw recorded. ``_lookup_pending_request_client``
  450. must return None so ``push_raw_to_clients`` falls back to fan-out
  451. — every slicer expects to receive these unsolicited messages."""
  452. # No record for this seq id.
  453. assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "777"}}') is None
  454. def test_malformed_payload_falls_through_to_broadcast(self, server):
  455. """A non-JSON / non-dict payload must NOT crash the routing path —
  456. return None so the response broadcasts."""
  457. assert server._lookup_pending_request_client(b"not valid json") is None
  458. assert server._lookup_pending_request_client(b'"a string, not a dict"') is None