test_diagnostic_snapshot.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. """Tests for the diagnostic snapshot helper that aggregates connection,
  2. virtual-printer, and log-health diagnostics for the support bundle and
  3. bug-report submission paths (#1506 follow-up).
  4. The helper has three hard requirements:
  5. - Always returns the three top-level keys, even when sections are empty.
  6. - Fail-soft per probe — a single crash doesn't break the snapshot.
  7. - Bounded total runtime — concurrent gather caps wall-clock to the slowest probe.
  8. """
  9. from types import SimpleNamespace
  10. from unittest.mock import AsyncMock, MagicMock, patch
  11. import pytest
  12. from backend.app.services.diagnostic_snapshot import collect_diagnostic_snapshot
  13. def _make_db_with_printers_and_vps(printers: list, vps: list):
  14. """Stub AsyncSession whose two .execute() calls return printers then VPs."""
  15. printers_result = MagicMock()
  16. printers_result.scalars.return_value.all.return_value = printers
  17. vps_result = MagicMock()
  18. vps_result.scalars.return_value.all.return_value = vps
  19. db = MagicMock()
  20. # Two execute calls — printer query, then VP query (order matches the
  21. # helper). side_effect cycles through the queue.
  22. db.execute = AsyncMock(side_effect=[printers_result, vps_result])
  23. return db
  24. @pytest.mark.asyncio
  25. async def test_snapshot_always_returns_three_top_level_keys_when_empty():
  26. """No printers, no VPs — still get the three keys (empty lists, empty
  27. log-health). Callers downstream rely on the shape being stable."""
  28. db = _make_db_with_printers_and_vps([], [])
  29. with patch(
  30. "backend.app.services.diagnostic_snapshot._run_log_health",
  31. new=AsyncMock(return_value={"findings": []}),
  32. ):
  33. out = await collect_diagnostic_snapshot(db)
  34. assert set(out.keys()) == {"connection_diagnostics", "vp_diagnostics", "log_health"}
  35. assert out["connection_diagnostics"] == []
  36. assert out["vp_diagnostics"] == []
  37. assert out["log_health"] == {"findings": []}
  38. @pytest.mark.asyncio
  39. async def test_snapshot_runs_diagnostic_per_active_printer():
  40. """Each active printer gets a connection check; each enabled VP gets a
  41. setup check. Result list length matches the input lists."""
  42. printers = [
  43. SimpleNamespace(id=1, name="P1S", ip_address="192.168.1.10", serial_number="01S00A", access_code="abc123"),
  44. SimpleNamespace(id=2, name="X1C", ip_address="192.168.1.11", serial_number="C11Y00", access_code="xyz456"),
  45. ]
  46. vps = [SimpleNamespace(id=10, name="VP-1")]
  47. db = _make_db_with_printers_and_vps(printers, vps)
  48. fake_conn = SimpleNamespace(model_dump=lambda: {"checks": []})
  49. fake_vp = SimpleNamespace(model_dump=lambda: {"checks": []})
  50. with (
  51. patch(
  52. "backend.app.services.printer_diagnostic.run_connection_diagnostic",
  53. new=AsyncMock(return_value=fake_conn),
  54. ),
  55. patch(
  56. "backend.app.services.virtual_printer.virtual_printer_manager.get_instance",
  57. return_value=None,
  58. ),
  59. patch(
  60. "backend.app.services.virtual_printer.diagnostic.run_vp_diagnostic",
  61. new=AsyncMock(return_value=fake_vp),
  62. ),
  63. patch(
  64. "backend.app.services.diagnostic_snapshot._run_log_health",
  65. new=AsyncMock(return_value={"findings": []}),
  66. ),
  67. ):
  68. out = await collect_diagnostic_snapshot(db)
  69. assert len(out["connection_diagnostics"]) == 2
  70. assert out["connection_diagnostics"][0]["printer_id"] == 1
  71. assert out["connection_diagnostics"][1]["printer_id"] == 2
  72. assert all("result" in entry for entry in out["connection_diagnostics"])
  73. assert len(out["vp_diagnostics"]) == 1
  74. assert out["vp_diagnostics"][0]["vp_id"] == 10
  75. assert "result" in out["vp_diagnostics"][0]
  76. @pytest.mark.asyncio
  77. async def test_snapshot_fails_soft_when_single_printer_diagnostic_raises():
  78. """A crash inside one printer's diagnostic emits an error marker for that
  79. printer, but the snapshot's other sections still complete. This is the
  80. whole point of including diagnostics in the bundle — a partial result
  81. beats a 500."""
  82. printers = [
  83. SimpleNamespace(id=1, name="ok", ip_address="1.1.1.1", serial_number="s1", access_code="a"),
  84. SimpleNamespace(id=2, name="bad", ip_address="2.2.2.2", serial_number="s2", access_code="b"),
  85. ]
  86. db = _make_db_with_printers_and_vps(printers, [])
  87. fake_ok = SimpleNamespace(model_dump=lambda: {"status": "ok"})
  88. async def diag(ip_address, **_):
  89. if ip_address == "2.2.2.2":
  90. raise RuntimeError("simulated crash")
  91. return fake_ok
  92. with (
  93. patch("backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=diag)),
  94. patch(
  95. "backend.app.services.diagnostic_snapshot._run_log_health",
  96. new=AsyncMock(return_value={"findings": []}),
  97. ),
  98. ):
  99. out = await collect_diagnostic_snapshot(db)
  100. # Both printers represented; the crashing one carries an `error` field.
  101. assert len(out["connection_diagnostics"]) == 2
  102. ok_entry = next(e for e in out["connection_diagnostics"] if e["printer_id"] == 1)
  103. bad_entry = next(e for e in out["connection_diagnostics"] if e["printer_id"] == 2)
  104. assert "result" in ok_entry
  105. assert "error" not in ok_entry
  106. assert "error" in bad_entry
  107. assert "simulated crash" in bad_entry["error"]
  108. # Log-health still completes despite the per-printer crash.
  109. assert out["log_health"] == {"findings": []}
  110. @pytest.mark.asyncio
  111. async def test_snapshot_emits_timed_out_marker_when_probe_exceeds_cap():
  112. """If a single probe stalls past the per-diagnostic timeout, the entry
  113. is marked `timed_out` rather than blocking the whole snapshot. Patch
  114. the timeout small so the test runs fast."""
  115. printers = [SimpleNamespace(id=1, name="slow", ip_address="1.1.1.1", serial_number="s", access_code="a")]
  116. db = _make_db_with_printers_and_vps(printers, [])
  117. async def slow_diag(*a, **k):
  118. import asyncio
  119. await asyncio.sleep(5) # well past the patched cap below
  120. return SimpleNamespace(model_dump=lambda: {})
  121. with (
  122. patch(
  123. "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=slow_diag)
  124. ),
  125. patch("backend.app.services.diagnostic_snapshot._PER_DIAGNOSTIC_TIMEOUT_SECONDS", 0.05),
  126. patch(
  127. "backend.app.services.diagnostic_snapshot._run_log_health",
  128. new=AsyncMock(return_value={"findings": []}),
  129. ),
  130. ):
  131. out = await collect_diagnostic_snapshot(db)
  132. assert len(out["connection_diagnostics"]) == 1
  133. assert out["connection_diagnostics"][0]["error"] == "timed_out"
  134. @pytest.mark.asyncio
  135. async def test_snapshot_masks_ip_addresses_in_all_diagnostic_fields():
  136. """The diagnostic schemas embed raw IPv4 in three places — the top-level
  137. ``PrinterDiagnosticResult.ip_address``, the network-mode check's
  138. ``params.{printer_ip, host_ip}``, and the VP diagnostic's
  139. ``params.bind_ip``. None of those should leak into the submitted
  140. snapshot. Sanitization runs after the per-probe gather; both DB-known
  141. IPs (covered by sensitive_strings → "[IP]") and host / VP-bind IPs
  142. (caught by the IPv4 regex fallback) end up redacted.
  143. """
  144. printers = [
  145. SimpleNamespace(
  146. id=1, name="Workshop", ip_address="192.168.255.131", serial_number="01S00ABC123", access_code="abcd1234"
  147. )
  148. ]
  149. vps = [SimpleNamespace(id=10, name="VP-Workshop")]
  150. db = _make_db_with_printers_and_vps(printers, vps)
  151. fake_conn = SimpleNamespace(
  152. model_dump=lambda: {
  153. "ip_address": "192.168.255.131",
  154. "overall": "ok",
  155. "checks": [
  156. {
  157. "id": "network_mode",
  158. "status": "warn",
  159. "params": {"printer_ip": "192.168.255.131", "host_ip": "192.168.255.16"},
  160. }
  161. ],
  162. }
  163. )
  164. fake_vp = SimpleNamespace(
  165. model_dump=lambda: {
  166. "overall": "ok",
  167. "checks": [{"id": "bind_interface", "status": "pass", "params": {"bind_ip": "192.168.254.2"}}],
  168. }
  169. )
  170. with (
  171. patch(
  172. "backend.app.services.log_reader.collect_sensitive_strings",
  173. new=AsyncMock(
  174. return_value={
  175. "Workshop": "[PRINTER]",
  176. "192.168.255.131": "[IP]",
  177. "01S00ABC123": "[SERIAL]",
  178. "abcd1234": "[ACCESS_CODE]",
  179. }
  180. ),
  181. ),
  182. patch(
  183. "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(return_value=fake_conn)
  184. ),
  185. patch("backend.app.services.virtual_printer.virtual_printer_manager.get_instance", return_value=None),
  186. patch("backend.app.services.virtual_printer.diagnostic.run_vp_diagnostic", new=AsyncMock(return_value=fake_vp)),
  187. patch(
  188. "backend.app.services.diagnostic_snapshot._run_log_health",
  189. new=AsyncMock(return_value={"findings": [{"sample": "Connecting to 10.0.0.5..."}]}),
  190. ),
  191. ):
  192. out = await collect_diagnostic_snapshot(db)
  193. # Conection diagnostic — top-level ip_address and check params both masked.
  194. conn_entry = out["connection_diagnostics"][0]
  195. assert conn_entry["printer_name"] == "[PRINTER]"
  196. assert conn_entry["result"]["ip_address"] == "[IP]"
  197. check_params = conn_entry["result"]["checks"][0]["params"]
  198. assert check_params["printer_ip"] == "[IP]"
  199. assert check_params["host_ip"] == "[IP]" # not in DB; caught by regex fallback
  200. # VP diagnostic — bind_ip masked (regex fallback; never in DB).
  201. vp_entry = out["vp_diagnostics"][0]
  202. assert vp_entry["result"]["checks"][0]["params"]["bind_ip"] == "[IP]"
  203. # Log-health findings — IPs in log samples also masked (regex applies
  204. # recursively through the dict, not just to known fields).
  205. assert "10.0.0.5" not in str(out["log_health"])
  206. assert "[IP]" in out["log_health"]["findings"][0]["sample"]
  207. # Sanity: no raw IPv4 anywhere in the serialized snapshot.
  208. import json
  209. import re as _re
  210. serialized = json.dumps(out)
  211. raw_ipv4 = _re.search(
  212. r"\b(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}\b", serialized
  213. )
  214. assert raw_ipv4 is None, f"raw IPv4 leaked into snapshot: {raw_ipv4.group()}"
  215. @pytest.mark.asyncio
  216. async def test_snapshot_runs_probes_concurrently_not_sequentially():
  217. """Total wall-clock for N printers should be O(slowest), not O(sum) —
  218. this is what makes the feature usable on a fleet. Set each probe to
  219. take 0.2 s; with 4 printers, sequential is 0.8 s, concurrent is 0.2 s.
  220. Allow margin for scheduling and the test still catches a regression
  221. to sequential execution.
  222. """
  223. import time
  224. printers = [
  225. SimpleNamespace(id=i, name=f"P{i}", ip_address=f"1.1.1.{i}", serial_number=f"s{i}", access_code="a")
  226. for i in range(4)
  227. ]
  228. db = _make_db_with_printers_and_vps(printers, [])
  229. async def slow_diag(*a, **k):
  230. import asyncio
  231. await asyncio.sleep(0.2)
  232. return SimpleNamespace(model_dump=lambda: {"ok": True})
  233. with (
  234. patch(
  235. "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=slow_diag)
  236. ),
  237. patch(
  238. "backend.app.services.diagnostic_snapshot._run_log_health",
  239. new=AsyncMock(return_value={"findings": []}),
  240. ),
  241. ):
  242. start = time.monotonic()
  243. out = await collect_diagnostic_snapshot(db)
  244. elapsed = time.monotonic() - start
  245. assert len(out["connection_diagnostics"]) == 4
  246. # Concurrent should be ~0.2 s; sequential would be ~0.8 s. Use 0.5 s
  247. # as the threshold — slack enough for slow CI, tight enough to catch
  248. # a regression to sequential execution.
  249. assert elapsed < 0.5, f"snapshot ran sequentially: {elapsed:.2f}s for 4 x 0.2s probes"