|
|
@@ -0,0 +1,291 @@
|
|
|
+"""Tests for the diagnostic snapshot helper that aggregates connection,
|
|
|
+virtual-printer, and log-health diagnostics for the support bundle and
|
|
|
+bug-report submission paths (#1506 follow-up).
|
|
|
+
|
|
|
+The helper has three hard requirements:
|
|
|
+
|
|
|
+- Always returns the three top-level keys, even when sections are empty.
|
|
|
+- Fail-soft per probe — a single crash doesn't break the snapshot.
|
|
|
+- Bounded total runtime — concurrent gather caps wall-clock to the slowest probe.
|
|
|
+"""
|
|
|
+
|
|
|
+from types import SimpleNamespace
|
|
|
+from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
+
|
|
|
+import pytest
|
|
|
+
|
|
|
+from backend.app.services.diagnostic_snapshot import collect_diagnostic_snapshot
|
|
|
+
|
|
|
+
|
|
|
+def _make_db_with_printers_and_vps(printers: list, vps: list):
|
|
|
+ """Stub AsyncSession whose two .execute() calls return printers then VPs."""
|
|
|
+ printers_result = MagicMock()
|
|
|
+ printers_result.scalars.return_value.all.return_value = printers
|
|
|
+ vps_result = MagicMock()
|
|
|
+ vps_result.scalars.return_value.all.return_value = vps
|
|
|
+ db = MagicMock()
|
|
|
+ # Two execute calls — printer query, then VP query (order matches the
|
|
|
+ # helper). side_effect cycles through the queue.
|
|
|
+ db.execute = AsyncMock(side_effect=[printers_result, vps_result])
|
|
|
+ return db
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.asyncio
|
|
|
+async def test_snapshot_always_returns_three_top_level_keys_when_empty():
|
|
|
+ """No printers, no VPs — still get the three keys (empty lists, empty
|
|
|
+ log-health). Callers downstream rely on the shape being stable."""
|
|
|
+ db = _make_db_with_printers_and_vps([], [])
|
|
|
+ with patch(
|
|
|
+ "backend.app.services.diagnostic_snapshot._run_log_health",
|
|
|
+ new=AsyncMock(return_value={"findings": []}),
|
|
|
+ ):
|
|
|
+ out = await collect_diagnostic_snapshot(db)
|
|
|
+ assert set(out.keys()) == {"connection_diagnostics", "vp_diagnostics", "log_health"}
|
|
|
+ assert out["connection_diagnostics"] == []
|
|
|
+ assert out["vp_diagnostics"] == []
|
|
|
+ assert out["log_health"] == {"findings": []}
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.asyncio
|
|
|
+async def test_snapshot_runs_diagnostic_per_active_printer():
|
|
|
+ """Each active printer gets a connection check; each enabled VP gets a
|
|
|
+ setup check. Result list length matches the input lists."""
|
|
|
+ printers = [
|
|
|
+ SimpleNamespace(id=1, name="P1S", ip_address="192.168.1.10", serial_number="01S00A", access_code="abc123"),
|
|
|
+ SimpleNamespace(id=2, name="X1C", ip_address="192.168.1.11", serial_number="C11Y00", access_code="xyz456"),
|
|
|
+ ]
|
|
|
+ vps = [SimpleNamespace(id=10, name="VP-1")]
|
|
|
+ db = _make_db_with_printers_and_vps(printers, vps)
|
|
|
+
|
|
|
+ fake_conn = SimpleNamespace(model_dump=lambda: {"checks": []})
|
|
|
+ fake_vp = SimpleNamespace(model_dump=lambda: {"checks": []})
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch(
|
|
|
+ "backend.app.services.printer_diagnostic.run_connection_diagnostic",
|
|
|
+ new=AsyncMock(return_value=fake_conn),
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.virtual_printer.virtual_printer_manager.get_instance",
|
|
|
+ return_value=None,
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.virtual_printer.diagnostic.run_vp_diagnostic",
|
|
|
+ new=AsyncMock(return_value=fake_vp),
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.diagnostic_snapshot._run_log_health",
|
|
|
+ new=AsyncMock(return_value={"findings": []}),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ out = await collect_diagnostic_snapshot(db)
|
|
|
+
|
|
|
+ assert len(out["connection_diagnostics"]) == 2
|
|
|
+ assert out["connection_diagnostics"][0]["printer_id"] == 1
|
|
|
+ assert out["connection_diagnostics"][1]["printer_id"] == 2
|
|
|
+ assert all("result" in entry for entry in out["connection_diagnostics"])
|
|
|
+ assert len(out["vp_diagnostics"]) == 1
|
|
|
+ assert out["vp_diagnostics"][0]["vp_id"] == 10
|
|
|
+ assert "result" in out["vp_diagnostics"][0]
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.asyncio
|
|
|
+async def test_snapshot_fails_soft_when_single_printer_diagnostic_raises():
|
|
|
+ """A crash inside one printer's diagnostic emits an error marker for that
|
|
|
+ printer, but the snapshot's other sections still complete. This is the
|
|
|
+ whole point of including diagnostics in the bundle — a partial result
|
|
|
+ beats a 500."""
|
|
|
+ printers = [
|
|
|
+ SimpleNamespace(id=1, name="ok", ip_address="1.1.1.1", serial_number="s1", access_code="a"),
|
|
|
+ SimpleNamespace(id=2, name="bad", ip_address="2.2.2.2", serial_number="s2", access_code="b"),
|
|
|
+ ]
|
|
|
+ db = _make_db_with_printers_and_vps(printers, [])
|
|
|
+
|
|
|
+ fake_ok = SimpleNamespace(model_dump=lambda: {"status": "ok"})
|
|
|
+
|
|
|
+ async def diag(ip_address, **_):
|
|
|
+ if ip_address == "2.2.2.2":
|
|
|
+ raise RuntimeError("simulated crash")
|
|
|
+ return fake_ok
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch("backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=diag)),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.diagnostic_snapshot._run_log_health",
|
|
|
+ new=AsyncMock(return_value={"findings": []}),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ out = await collect_diagnostic_snapshot(db)
|
|
|
+
|
|
|
+ # Both printers represented; the crashing one carries an `error` field.
|
|
|
+ assert len(out["connection_diagnostics"]) == 2
|
|
|
+ ok_entry = next(e for e in out["connection_diagnostics"] if e["printer_id"] == 1)
|
|
|
+ bad_entry = next(e for e in out["connection_diagnostics"] if e["printer_id"] == 2)
|
|
|
+ assert "result" in ok_entry
|
|
|
+ assert "error" not in ok_entry
|
|
|
+ assert "error" in bad_entry
|
|
|
+ assert "simulated crash" in bad_entry["error"]
|
|
|
+ # Log-health still completes despite the per-printer crash.
|
|
|
+ assert out["log_health"] == {"findings": []}
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.asyncio
|
|
|
+async def test_snapshot_emits_timed_out_marker_when_probe_exceeds_cap():
|
|
|
+ """If a single probe stalls past the per-diagnostic timeout, the entry
|
|
|
+ is marked `timed_out` rather than blocking the whole snapshot. Patch
|
|
|
+ the timeout small so the test runs fast."""
|
|
|
+ printers = [SimpleNamespace(id=1, name="slow", ip_address="1.1.1.1", serial_number="s", access_code="a")]
|
|
|
+ db = _make_db_with_printers_and_vps(printers, [])
|
|
|
+
|
|
|
+ async def slow_diag(*a, **k):
|
|
|
+ import asyncio
|
|
|
+
|
|
|
+ await asyncio.sleep(5) # well past the patched cap below
|
|
|
+ return SimpleNamespace(model_dump=lambda: {})
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch(
|
|
|
+ "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=slow_diag)
|
|
|
+ ),
|
|
|
+ patch("backend.app.services.diagnostic_snapshot._PER_DIAGNOSTIC_TIMEOUT_SECONDS", 0.05),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.diagnostic_snapshot._run_log_health",
|
|
|
+ new=AsyncMock(return_value={"findings": []}),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ out = await collect_diagnostic_snapshot(db)
|
|
|
+
|
|
|
+ assert len(out["connection_diagnostics"]) == 1
|
|
|
+ assert out["connection_diagnostics"][0]["error"] == "timed_out"
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.asyncio
|
|
|
+async def test_snapshot_masks_ip_addresses_in_all_diagnostic_fields():
|
|
|
+ """The diagnostic schemas embed raw IPv4 in three places — the top-level
|
|
|
+ ``PrinterDiagnosticResult.ip_address``, the network-mode check's
|
|
|
+ ``params.{printer_ip, host_ip}``, and the VP diagnostic's
|
|
|
+ ``params.bind_ip``. None of those should leak into the submitted
|
|
|
+ snapshot. Sanitization runs after the per-probe gather; both DB-known
|
|
|
+ IPs (covered by sensitive_strings → "[IP]") and host / VP-bind IPs
|
|
|
+ (caught by the IPv4 regex fallback) end up redacted.
|
|
|
+ """
|
|
|
+ printers = [
|
|
|
+ SimpleNamespace(
|
|
|
+ id=1, name="Workshop", ip_address="192.168.255.131", serial_number="01S00ABC123", access_code="abcd1234"
|
|
|
+ )
|
|
|
+ ]
|
|
|
+ vps = [SimpleNamespace(id=10, name="VP-Workshop")]
|
|
|
+ db = _make_db_with_printers_and_vps(printers, vps)
|
|
|
+
|
|
|
+ fake_conn = SimpleNamespace(
|
|
|
+ model_dump=lambda: {
|
|
|
+ "ip_address": "192.168.255.131",
|
|
|
+ "overall": "ok",
|
|
|
+ "checks": [
|
|
|
+ {
|
|
|
+ "id": "network_mode",
|
|
|
+ "status": "warn",
|
|
|
+ "params": {"printer_ip": "192.168.255.131", "host_ip": "192.168.255.16"},
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ }
|
|
|
+ )
|
|
|
+ fake_vp = SimpleNamespace(
|
|
|
+ model_dump=lambda: {
|
|
|
+ "overall": "ok",
|
|
|
+ "checks": [{"id": "bind_interface", "status": "pass", "params": {"bind_ip": "192.168.254.2"}}],
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch(
|
|
|
+ "backend.app.services.log_reader.collect_sensitive_strings",
|
|
|
+ new=AsyncMock(
|
|
|
+ return_value={
|
|
|
+ "Workshop": "[PRINTER]",
|
|
|
+ "192.168.255.131": "[IP]",
|
|
|
+ "01S00ABC123": "[SERIAL]",
|
|
|
+ "abcd1234": "[ACCESS_CODE]",
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(return_value=fake_conn)
|
|
|
+ ),
|
|
|
+ patch("backend.app.services.virtual_printer.virtual_printer_manager.get_instance", return_value=None),
|
|
|
+ patch("backend.app.services.virtual_printer.diagnostic.run_vp_diagnostic", new=AsyncMock(return_value=fake_vp)),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.diagnostic_snapshot._run_log_health",
|
|
|
+ new=AsyncMock(return_value={"findings": [{"sample": "Connecting to 10.0.0.5..."}]}),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ out = await collect_diagnostic_snapshot(db)
|
|
|
+
|
|
|
+ # Conection diagnostic — top-level ip_address and check params both masked.
|
|
|
+ conn_entry = out["connection_diagnostics"][0]
|
|
|
+ assert conn_entry["printer_name"] == "[PRINTER]"
|
|
|
+ assert conn_entry["result"]["ip_address"] == "[IP]"
|
|
|
+ check_params = conn_entry["result"]["checks"][0]["params"]
|
|
|
+ assert check_params["printer_ip"] == "[IP]"
|
|
|
+ assert check_params["host_ip"] == "[IP]" # not in DB; caught by regex fallback
|
|
|
+
|
|
|
+ # VP diagnostic — bind_ip masked (regex fallback; never in DB).
|
|
|
+ vp_entry = out["vp_diagnostics"][0]
|
|
|
+ assert vp_entry["result"]["checks"][0]["params"]["bind_ip"] == "[IP]"
|
|
|
+
|
|
|
+ # Log-health findings — IPs in log samples also masked (regex applies
|
|
|
+ # recursively through the dict, not just to known fields).
|
|
|
+ assert "10.0.0.5" not in str(out["log_health"])
|
|
|
+ assert "[IP]" in out["log_health"]["findings"][0]["sample"]
|
|
|
+
|
|
|
+ # Sanity: no raw IPv4 anywhere in the serialized snapshot.
|
|
|
+ import json
|
|
|
+ import re as _re
|
|
|
+
|
|
|
+ serialized = json.dumps(out)
|
|
|
+ raw_ipv4 = _re.search(
|
|
|
+ r"\b(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}\b", serialized
|
|
|
+ )
|
|
|
+ assert raw_ipv4 is None, f"raw IPv4 leaked into snapshot: {raw_ipv4.group()}"
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.asyncio
|
|
|
+async def test_snapshot_runs_probes_concurrently_not_sequentially():
|
|
|
+ """Total wall-clock for N printers should be O(slowest), not O(sum) —
|
|
|
+ this is what makes the feature usable on a fleet. Set each probe to
|
|
|
+ take 0.2 s; with 4 printers, sequential is 0.8 s, concurrent is 0.2 s.
|
|
|
+ Allow margin for scheduling and the test still catches a regression
|
|
|
+ to sequential execution.
|
|
|
+ """
|
|
|
+ import time
|
|
|
+
|
|
|
+ printers = [
|
|
|
+ SimpleNamespace(id=i, name=f"P{i}", ip_address=f"1.1.1.{i}", serial_number=f"s{i}", access_code="a")
|
|
|
+ for i in range(4)
|
|
|
+ ]
|
|
|
+ db = _make_db_with_printers_and_vps(printers, [])
|
|
|
+
|
|
|
+ async def slow_diag(*a, **k):
|
|
|
+ import asyncio
|
|
|
+
|
|
|
+ await asyncio.sleep(0.2)
|
|
|
+ return SimpleNamespace(model_dump=lambda: {"ok": True})
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch(
|
|
|
+ "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=slow_diag)
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.services.diagnostic_snapshot._run_log_health",
|
|
|
+ new=AsyncMock(return_value={"findings": []}),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ start = time.monotonic()
|
|
|
+ out = await collect_diagnostic_snapshot(db)
|
|
|
+ elapsed = time.monotonic() - start
|
|
|
+
|
|
|
+ assert len(out["connection_diagnostics"]) == 4
|
|
|
+ # Concurrent should be ~0.2 s; sequential would be ~0.8 s. Use 0.5 s
|
|
|
+ # as the threshold — slack enough for slow CI, tight enough to catch
|
|
|
+ # a regression to sequential execution.
|
|
|
+ assert elapsed < 0.5, f"snapshot ran sequentially: {elapsed:.2f}s for 4 x 0.2s probes"
|