| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- """Tests for the diagnostic snapshot helper that aggregates connection,
- virtual-printer, and log-health diagnostics for the support bundle and
- bug-report submission paths (#1506 follow-up).
- The helper has three hard requirements:
- - Always returns the three top-level keys, even when sections are empty.
- - Fail-soft per probe — a single crash doesn't break the snapshot.
- - Bounded total runtime — concurrent gather caps wall-clock to the slowest probe.
- """
- from types import SimpleNamespace
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from backend.app.services.diagnostic_snapshot import collect_diagnostic_snapshot
- def _make_db_with_printers_and_vps(printers: list, vps: list):
- """Stub AsyncSession whose two .execute() calls return printers then VPs."""
- printers_result = MagicMock()
- printers_result.scalars.return_value.all.return_value = printers
- vps_result = MagicMock()
- vps_result.scalars.return_value.all.return_value = vps
- db = MagicMock()
- # Two execute calls — printer query, then VP query (order matches the
- # helper). side_effect cycles through the queue.
- db.execute = AsyncMock(side_effect=[printers_result, vps_result])
- return db
- @pytest.mark.asyncio
- async def test_snapshot_always_returns_three_top_level_keys_when_empty():
- """No printers, no VPs — still get the three keys (empty lists, empty
- log-health). Callers downstream rely on the shape being stable."""
- db = _make_db_with_printers_and_vps([], [])
- with patch(
- "backend.app.services.diagnostic_snapshot._run_log_health",
- new=AsyncMock(return_value={"findings": []}),
- ):
- out = await collect_diagnostic_snapshot(db)
- assert set(out.keys()) == {"connection_diagnostics", "vp_diagnostics", "log_health"}
- assert out["connection_diagnostics"] == []
- assert out["vp_diagnostics"] == []
- assert out["log_health"] == {"findings": []}
- @pytest.mark.asyncio
- async def test_snapshot_runs_diagnostic_per_active_printer():
- """Each active printer gets a connection check; each enabled VP gets a
- setup check. Result list length matches the input lists."""
- printers = [
- SimpleNamespace(id=1, name="P1S", ip_address="192.168.1.10", serial_number="01S00A", access_code="abc123"),
- SimpleNamespace(id=2, name="X1C", ip_address="192.168.1.11", serial_number="C11Y00", access_code="xyz456"),
- ]
- vps = [SimpleNamespace(id=10, name="VP-1")]
- db = _make_db_with_printers_and_vps(printers, vps)
- fake_conn = SimpleNamespace(model_dump=lambda: {"checks": []})
- fake_vp = SimpleNamespace(model_dump=lambda: {"checks": []})
- with (
- patch(
- "backend.app.services.printer_diagnostic.run_connection_diagnostic",
- new=AsyncMock(return_value=fake_conn),
- ),
- patch(
- "backend.app.services.virtual_printer.virtual_printer_manager.get_instance",
- return_value=None,
- ),
- patch(
- "backend.app.services.virtual_printer.diagnostic.run_vp_diagnostic",
- new=AsyncMock(return_value=fake_vp),
- ),
- patch(
- "backend.app.services.diagnostic_snapshot._run_log_health",
- new=AsyncMock(return_value={"findings": []}),
- ),
- ):
- out = await collect_diagnostic_snapshot(db)
- assert len(out["connection_diagnostics"]) == 2
- assert out["connection_diagnostics"][0]["printer_id"] == 1
- assert out["connection_diagnostics"][1]["printer_id"] == 2
- assert all("result" in entry for entry in out["connection_diagnostics"])
- assert len(out["vp_diagnostics"]) == 1
- assert out["vp_diagnostics"][0]["vp_id"] == 10
- assert "result" in out["vp_diagnostics"][0]
- @pytest.mark.asyncio
- async def test_snapshot_fails_soft_when_single_printer_diagnostic_raises():
- """A crash inside one printer's diagnostic emits an error marker for that
- printer, but the snapshot's other sections still complete. This is the
- whole point of including diagnostics in the bundle — a partial result
- beats a 500."""
- printers = [
- SimpleNamespace(id=1, name="ok", ip_address="1.1.1.1", serial_number="s1", access_code="a"),
- SimpleNamespace(id=2, name="bad", ip_address="2.2.2.2", serial_number="s2", access_code="b"),
- ]
- db = _make_db_with_printers_and_vps(printers, [])
- fake_ok = SimpleNamespace(model_dump=lambda: {"status": "ok"})
- async def diag(ip_address, **_):
- if ip_address == "2.2.2.2":
- raise RuntimeError("simulated crash")
- return fake_ok
- with (
- patch("backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=diag)),
- patch(
- "backend.app.services.diagnostic_snapshot._run_log_health",
- new=AsyncMock(return_value={"findings": []}),
- ),
- ):
- out = await collect_diagnostic_snapshot(db)
- # Both printers represented; the crashing one carries an `error` field.
- assert len(out["connection_diagnostics"]) == 2
- ok_entry = next(e for e in out["connection_diagnostics"] if e["printer_id"] == 1)
- bad_entry = next(e for e in out["connection_diagnostics"] if e["printer_id"] == 2)
- assert "result" in ok_entry
- assert "error" not in ok_entry
- assert "error" in bad_entry
- assert "simulated crash" in bad_entry["error"]
- # Log-health still completes despite the per-printer crash.
- assert out["log_health"] == {"findings": []}
- @pytest.mark.asyncio
- async def test_snapshot_emits_timed_out_marker_when_probe_exceeds_cap():
- """If a single probe stalls past the per-diagnostic timeout, the entry
- is marked `timed_out` rather than blocking the whole snapshot. Patch
- the timeout small so the test runs fast."""
- printers = [SimpleNamespace(id=1, name="slow", ip_address="1.1.1.1", serial_number="s", access_code="a")]
- db = _make_db_with_printers_and_vps(printers, [])
- async def slow_diag(*a, **k):
- import asyncio
- await asyncio.sleep(5) # well past the patched cap below
- return SimpleNamespace(model_dump=lambda: {})
- with (
- patch(
- "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=slow_diag)
- ),
- patch("backend.app.services.diagnostic_snapshot._PER_DIAGNOSTIC_TIMEOUT_SECONDS", 0.05),
- patch(
- "backend.app.services.diagnostic_snapshot._run_log_health",
- new=AsyncMock(return_value={"findings": []}),
- ),
- ):
- out = await collect_diagnostic_snapshot(db)
- assert len(out["connection_diagnostics"]) == 1
- assert out["connection_diagnostics"][0]["error"] == "timed_out"
- @pytest.mark.asyncio
- async def test_snapshot_masks_ip_addresses_in_all_diagnostic_fields():
- """The diagnostic schemas embed raw IPv4 in three places — the top-level
- ``PrinterDiagnosticResult.ip_address``, the network-mode check's
- ``params.{printer_ip, host_ip}``, and the VP diagnostic's
- ``params.bind_ip``. None of those should leak into the submitted
- snapshot. Sanitization runs after the per-probe gather; both DB-known
- IPs (covered by sensitive_strings → "[IP]") and host / VP-bind IPs
- (caught by the IPv4 regex fallback) end up redacted.
- """
- printers = [
- SimpleNamespace(
- id=1, name="Workshop", ip_address="192.168.255.131", serial_number="01S00ABC123", access_code="abcd1234"
- )
- ]
- vps = [SimpleNamespace(id=10, name="VP-Workshop")]
- db = _make_db_with_printers_and_vps(printers, vps)
- fake_conn = SimpleNamespace(
- model_dump=lambda: {
- "ip_address": "192.168.255.131",
- "overall": "ok",
- "checks": [
- {
- "id": "network_mode",
- "status": "warn",
- "params": {"printer_ip": "192.168.255.131", "host_ip": "192.168.255.16"},
- }
- ],
- }
- )
- fake_vp = SimpleNamespace(
- model_dump=lambda: {
- "overall": "ok",
- "checks": [{"id": "bind_interface", "status": "pass", "params": {"bind_ip": "192.168.254.2"}}],
- }
- )
- with (
- patch(
- "backend.app.services.log_reader.collect_sensitive_strings",
- new=AsyncMock(
- return_value={
- "Workshop": "[PRINTER]",
- "192.168.255.131": "[IP]",
- "01S00ABC123": "[SERIAL]",
- "abcd1234": "[ACCESS_CODE]",
- }
- ),
- ),
- patch(
- "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(return_value=fake_conn)
- ),
- patch("backend.app.services.virtual_printer.virtual_printer_manager.get_instance", return_value=None),
- patch("backend.app.services.virtual_printer.diagnostic.run_vp_diagnostic", new=AsyncMock(return_value=fake_vp)),
- patch(
- "backend.app.services.diagnostic_snapshot._run_log_health",
- new=AsyncMock(return_value={"findings": [{"sample": "Connecting to 10.0.0.5..."}]}),
- ),
- ):
- out = await collect_diagnostic_snapshot(db)
- # Conection diagnostic — top-level ip_address and check params both masked.
- conn_entry = out["connection_diagnostics"][0]
- assert conn_entry["printer_name"] == "[PRINTER]"
- assert conn_entry["result"]["ip_address"] == "[IP]"
- check_params = conn_entry["result"]["checks"][0]["params"]
- assert check_params["printer_ip"] == "[IP]"
- assert check_params["host_ip"] == "[IP]" # not in DB; caught by regex fallback
- # VP diagnostic — bind_ip masked (regex fallback; never in DB).
- vp_entry = out["vp_diagnostics"][0]
- assert vp_entry["result"]["checks"][0]["params"]["bind_ip"] == "[IP]"
- # Log-health findings — IPs in log samples also masked (regex applies
- # recursively through the dict, not just to known fields).
- assert "10.0.0.5" not in str(out["log_health"])
- assert "[IP]" in out["log_health"]["findings"][0]["sample"]
- # Sanity: no raw IPv4 anywhere in the serialized snapshot.
- import json
- import re as _re
- serialized = json.dumps(out)
- raw_ipv4 = _re.search(
- r"\b(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}\b", serialized
- )
- assert raw_ipv4 is None, f"raw IPv4 leaked into snapshot: {raw_ipv4.group()}"
- @pytest.mark.asyncio
- async def test_snapshot_runs_probes_concurrently_not_sequentially():
- """Total wall-clock for N printers should be O(slowest), not O(sum) —
- this is what makes the feature usable on a fleet. Set each probe to
- take 0.2 s; with 4 printers, sequential is 0.8 s, concurrent is 0.2 s.
- Allow margin for scheduling and the test still catches a regression
- to sequential execution.
- """
- import time
- printers = [
- SimpleNamespace(id=i, name=f"P{i}", ip_address=f"1.1.1.{i}", serial_number=f"s{i}", access_code="a")
- for i in range(4)
- ]
- db = _make_db_with_printers_and_vps(printers, [])
- async def slow_diag(*a, **k):
- import asyncio
- await asyncio.sleep(0.2)
- return SimpleNamespace(model_dump=lambda: {"ok": True})
- with (
- patch(
- "backend.app.services.printer_diagnostic.run_connection_diagnostic", new=AsyncMock(side_effect=slow_diag)
- ),
- patch(
- "backend.app.services.diagnostic_snapshot._run_log_health",
- new=AsyncMock(return_value={"findings": []}),
- ),
- ):
- start = time.monotonic()
- out = await collect_diagnostic_snapshot(db)
- elapsed = time.monotonic() - start
- assert len(out["connection_diagnostics"]) == 4
- # Concurrent should be ~0.2 s; sequential would be ~0.8 s. Use 0.5 s
- # as the threshold — slack enough for slow CI, tight enough to catch
- # a regression to sequential execution.
- assert elapsed < 0.5, f"snapshot ran sequentially: {elapsed:.2f}s for 4 x 0.2s probes"
|