test_support_helpers.py 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961
  1. """Unit tests for support module helper functions.
  2. Tests _anonymize_mqtt_broker, _check_port, _get_container_memory_limit,
  3. _format_bytes, and _collect_support_info diagnostic sections.
  4. """
  5. import asyncio
  6. import tempfile
  7. from pathlib import Path
  8. from unittest.mock import AsyncMock, MagicMock, patch
  9. import pytest
  10. class TestApplyLogLevel:
  11. """Tests for _apply_log_level() debug noise suppression."""
  12. def test_debug_mode_suppresses_sqlalchemy_to_warning(self):
  13. """Verify sqlalchemy.engine is set to WARNING (not INFO) in debug mode."""
  14. import logging
  15. from backend.app.api.routes.support import _apply_log_level
  16. _apply_log_level(True)
  17. assert logging.getLogger("sqlalchemy.engine").level == logging.WARNING
  18. def test_debug_mode_suppresses_aiosqlite(self):
  19. """Verify aiosqlite is set to WARNING in debug mode to prevent cursor noise."""
  20. import logging
  21. from backend.app.api.routes.support import _apply_log_level
  22. _apply_log_level(True)
  23. assert logging.getLogger("aiosqlite").level == logging.WARNING
  24. def test_debug_mode_keeps_httpx_pinned_to_warning(self):
  25. """httpx/httpcore must stay at WARNING even in debug mode — at INFO/DEBUG
  26. they log full request URLs, leaking webhook tokens (Discord etc.)."""
  27. import logging
  28. from backend.app.api.routes.support import _apply_log_level
  29. _apply_log_level(True)
  30. assert logging.getLogger("httpcore").level == logging.WARNING
  31. assert logging.getLogger("httpx").level == logging.WARNING
  32. def test_non_debug_mode_suppresses_all_noisy_loggers(self):
  33. """Verify all noisy loggers are set to WARNING in non-debug mode."""
  34. import logging
  35. from backend.app.api.routes.support import _apply_log_level
  36. _apply_log_level(False)
  37. assert logging.getLogger("sqlalchemy.engine").level == logging.WARNING
  38. assert logging.getLogger("httpcore").level == logging.WARNING
  39. assert logging.getLogger("httpx").level == logging.WARNING
  40. assert logging.getLogger("paho.mqtt").level == logging.WARNING
  41. class TestAnonymizeMqttBroker:
  42. """Tests for _anonymize_mqtt_broker()."""
  43. def test_empty_string(self):
  44. from backend.app.api.routes.support import _anonymize_mqtt_broker
  45. assert _anonymize_mqtt_broker("") == ""
  46. def test_ipv4_address(self):
  47. from backend.app.api.routes.support import _anonymize_mqtt_broker
  48. assert _anonymize_mqtt_broker("192.168.1.100") == "[IP]"
  49. def test_ipv6_address(self):
  50. from backend.app.api.routes.support import _anonymize_mqtt_broker
  51. assert _anonymize_mqtt_broker("::1") == "[IP]"
  52. def test_hostname_with_domain(self):
  53. from backend.app.api.routes.support import _anonymize_mqtt_broker
  54. assert _anonymize_mqtt_broker("mqtt.example.com") == "*.example.com"
  55. def test_hostname_with_subdomain(self):
  56. from backend.app.api.routes.support import _anonymize_mqtt_broker
  57. assert _anonymize_mqtt_broker("broker.mqtt.example.com") == "*.example.com"
  58. def test_single_part_hostname(self):
  59. from backend.app.api.routes.support import _anonymize_mqtt_broker
  60. assert _anonymize_mqtt_broker("localhost") == "localhost"
  61. class TestCheckPort:
  62. """Tests for _check_port()."""
  63. @pytest.mark.asyncio
  64. @pytest.mark.unit
  65. async def test_reachable_port(self):
  66. from backend.app.api.routes.support import _check_port
  67. # Mock a successful connection
  68. mock_writer = AsyncMock()
  69. mock_writer.close = MagicMock()
  70. mock_writer.wait_closed = AsyncMock()
  71. with patch("backend.app.api.routes.support.asyncio.open_connection", return_value=(AsyncMock(), mock_writer)):
  72. result = await _check_port("192.168.1.1", 8883, timeout=1.0)
  73. assert result is True
  74. @pytest.mark.asyncio
  75. @pytest.mark.unit
  76. async def test_unreachable_port(self):
  77. from backend.app.api.routes.support import _check_port
  78. with (
  79. patch(
  80. "backend.app.api.routes.support.asyncio.open_connection",
  81. side_effect=ConnectionRefusedError,
  82. ),
  83. patch(
  84. "backend.app.api.routes.support.asyncio.wait_for",
  85. side_effect=ConnectionRefusedError,
  86. ),
  87. ):
  88. result = await _check_port("192.168.1.1", 8883, timeout=1.0)
  89. assert result is False
  90. @pytest.mark.asyncio
  91. @pytest.mark.unit
  92. async def test_timeout(self):
  93. from backend.app.api.routes.support import _check_port
  94. with patch(
  95. "backend.app.api.routes.support.asyncio.wait_for",
  96. side_effect=asyncio.TimeoutError,
  97. ):
  98. result = await _check_port("192.168.1.1", 8883, timeout=0.1)
  99. assert result is False
  100. class TestGetContainerMemoryLimit:
  101. """Tests for _get_container_memory_limit()."""
  102. def test_cgroup_v2_with_limit(self):
  103. from backend.app.api.routes.support import _get_container_memory_limit
  104. with tempfile.TemporaryDirectory() as tmpdir:
  105. v2_path = Path(tmpdir) / "memory.max"
  106. v2_path.write_text("1073741824\n")
  107. with patch("backend.app.api.routes.support.Path") as mock_path:
  108. # v2 path exists with value
  109. v2_mock = MagicMock()
  110. v2_mock.exists.return_value = True
  111. v2_mock.read_text.return_value = "1073741824\n"
  112. v1_mock = MagicMock()
  113. v1_mock.exists.return_value = False
  114. mock_path.side_effect = lambda p: v2_mock if "memory.max" in p else v1_mock
  115. result = _get_container_memory_limit()
  116. assert result == 1073741824
  117. def test_cgroup_v2_unlimited(self):
  118. from backend.app.api.routes.support import _get_container_memory_limit
  119. with patch("backend.app.api.routes.support.Path") as mock_path:
  120. v2_mock = MagicMock()
  121. v2_mock.exists.return_value = True
  122. v2_mock.read_text.return_value = "max\n"
  123. v1_mock = MagicMock()
  124. v1_mock.exists.return_value = False
  125. mock_path.side_effect = lambda p: v2_mock if "memory.max" in p else v1_mock
  126. result = _get_container_memory_limit()
  127. assert result is None
  128. def test_no_cgroup_files(self):
  129. from backend.app.api.routes.support import _get_container_memory_limit
  130. with patch("backend.app.api.routes.support.Path") as mock_path:
  131. mock_instance = MagicMock()
  132. mock_instance.exists.return_value = False
  133. mock_path.return_value = mock_instance
  134. result = _get_container_memory_limit()
  135. assert result is None
  136. class TestFormatBytes:
  137. """Tests for _format_bytes()."""
  138. def test_bytes(self):
  139. from backend.app.api.routes.support import _format_bytes
  140. assert _format_bytes(500) == "500 B"
  141. def test_kilobytes(self):
  142. from backend.app.api.routes.support import _format_bytes
  143. assert _format_bytes(2048) == "2.0 KB"
  144. def test_megabytes(self):
  145. from backend.app.api.routes.support import _format_bytes
  146. assert _format_bytes(10 * 1024 * 1024) == "10.0 MB"
  147. def test_gigabytes(self):
  148. from backend.app.api.routes.support import _format_bytes
  149. assert _format_bytes(2 * 1024 * 1024 * 1024) == "2.00 GB"
  150. def test_zero(self):
  151. from backend.app.api.routes.support import _format_bytes
  152. assert _format_bytes(0) == "0 B"
  153. class TestSanitizeLogContent:
  154. """Tests for _sanitize_log_content() redaction."""
  155. def test_ipv4_addresses_redacted(self):
  156. """IPv4 addresses in log lines are replaced with [IP]."""
  157. from backend.app.api.routes.support import _sanitize_log_content
  158. content = "2024-01-15 Connected to printer at 192.168.1.100 on port 8883"
  159. result = _sanitize_log_content(content)
  160. assert "192.168.1.100" not in result
  161. assert "[IP]" in result
  162. assert "on port 8883" in result
  163. def test_multiple_ipv4_addresses_redacted(self):
  164. """Multiple different IPs in the same line are all redacted."""
  165. from backend.app.api.routes.support import _sanitize_log_content
  166. content = "Proxy 10.0.0.1 -> 192.168.1.50"
  167. result = _sanitize_log_content(content)
  168. assert result == "Proxy [IP] -> [IP]"
  169. def test_firmware_versions_with_leading_zeros_preserved(self):
  170. """Firmware versions like 01.09.01.00 have leading zeros and should NOT be redacted."""
  171. from backend.app.api.routes.support import _sanitize_log_content
  172. content = "Firmware version: 01.09.01.00"
  173. result = _sanitize_log_content(content)
  174. assert "01.09.01.00" in result
  175. def test_firmware_version_mixed_with_ip(self):
  176. """Firmware versions preserved while real IPs are redacted in the same line."""
  177. from backend.app.api.routes.support import _sanitize_log_content
  178. content = "Printer at 192.168.1.5 running firmware 01.07.02.00"
  179. result = _sanitize_log_content(content)
  180. assert "192.168.1.5" not in result
  181. assert "01.07.02.00" in result
  182. assert "[IP] running firmware 01.07.02.00" in result
  183. def test_printer_ip_from_sensitive_strings(self):
  184. """Printer IPs in sensitive_strings are replaced before regex pass."""
  185. from backend.app.api.routes.support import _sanitize_log_content
  186. content = "Connecting to 192.168.1.100"
  187. result = _sanitize_log_content(content, sensitive_strings={"192.168.1.100": "[IP]"})
  188. assert result == "Connecting to [IP]"
  189. def test_edge_case_zero_ip(self):
  190. """0.0.0.0 is a valid IP and should be redacted."""
  191. from backend.app.api.routes.support import _sanitize_log_content
  192. content = "Binding to 0.0.0.0"
  193. result = _sanitize_log_content(content)
  194. assert result == "Binding to [IP]"
  195. def test_edge_case_broadcast_ip(self):
  196. """255.255.255.255 is a valid IP and should be redacted."""
  197. from backend.app.api.routes.support import _sanitize_log_content
  198. content = "Broadcast to 255.255.255.255"
  199. result = _sanitize_log_content(content)
  200. assert result == "Broadcast to [IP]"
  201. def test_invalid_octet_not_redacted(self):
  202. """Octets >255 are not valid IPs and should not be redacted."""
  203. from backend.app.api.routes.support import _sanitize_log_content
  204. content = "Value 999.999.999.999"
  205. result = _sanitize_log_content(content)
  206. assert "999.999.999.999" in result
  207. def test_existing_serial_redaction_still_works(self):
  208. """Serial number redaction still functions alongside IP redaction."""
  209. from backend.app.api.routes.support import _sanitize_log_content
  210. content = "Printer 01SABCDEF1234 at 10.0.0.5"
  211. result = _sanitize_log_content(content)
  212. assert "[SERIAL]" in result
  213. assert "[IP]" in result
  214. assert "01SABCDEF1234" not in result
  215. assert "10.0.0.5" not in result
  216. def test_existing_email_redaction_still_works(self):
  217. """Email redaction still functions alongside IP redaction."""
  218. from backend.app.api.routes.support import _sanitize_log_content
  219. content = "User user@example.com from 172.16.0.1"
  220. result = _sanitize_log_content(content)
  221. assert "[EMAIL]" in result
  222. assert "[IP]" in result
  223. def test_existing_path_redaction_still_works(self):
  224. """Path redaction still functions alongside IP redaction."""
  225. from backend.app.api.routes.support import _sanitize_log_content
  226. content = "Config at /home/john/config.yaml from 192.168.0.1"
  227. result = _sanitize_log_content(content)
  228. assert "/home/[user]/" in result
  229. assert "[IP]" in result
  230. class TestCollectSupportInfo:
  231. """Tests for _collect_support_info() new diagnostic sections."""
  232. @pytest.mark.asyncio
  233. @pytest.mark.unit
  234. async def test_environment_has_timezone(self):
  235. """Verify environment section includes timezone."""
  236. from backend.app.api.routes.support import _collect_support_info
  237. with (
  238. patch("backend.app.api.routes.support.is_running_in_docker", return_value=False),
  239. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  240. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  241. patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]),
  242. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  243. patch.dict("os.environ", {"TZ": "America/New_York"}),
  244. ):
  245. mock_pm.get_all_statuses.return_value = {}
  246. mock_ws.active_connections = []
  247. mock_db = AsyncMock()
  248. mock_result = MagicMock()
  249. mock_result.scalar.return_value = 0
  250. mock_result.scalar_one_or_none.return_value = None
  251. mock_result.scalars.return_value.all.return_value = []
  252. mock_db.execute = AsyncMock(return_value=mock_result)
  253. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  254. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  255. info = await _collect_support_info()
  256. assert info["environment"]["timezone"] == "America/New_York"
  257. assert info["environment"]["docker"] is False
  258. @pytest.mark.asyncio
  259. @pytest.mark.unit
  260. async def test_docker_section_present_when_in_docker(self):
  261. """Verify docker section is added when running in Docker."""
  262. from backend.app.api.routes.support import _collect_support_info
  263. with (
  264. patch("backend.app.api.routes.support.is_running_in_docker", return_value=True),
  265. patch("backend.app.api.routes.support._get_container_memory_limit", return_value=1073741824),
  266. patch("backend.app.api.routes.support._detect_docker_network_mode", return_value="bridge"),
  267. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  268. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  269. patch(
  270. "backend.app.api.routes.support.get_network_interfaces",
  271. return_value=[{"name": "eth0", "subnet": "172.17.0.0/16"}],
  272. ),
  273. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  274. ):
  275. mock_pm.get_all_statuses.return_value = {}
  276. mock_ws.active_connections = []
  277. mock_db = AsyncMock()
  278. mock_result = MagicMock()
  279. mock_result.scalar.return_value = 0
  280. mock_result.scalar_one_or_none.return_value = None
  281. mock_result.scalars.return_value.all.return_value = []
  282. mock_db.execute = AsyncMock(return_value=mock_result)
  283. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  284. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  285. info = await _collect_support_info()
  286. assert "docker" in info
  287. assert info["docker"]["container_memory_limit_bytes"] == 1073741824
  288. assert info["docker"]["container_memory_limit_formatted"] == "1.00 GB"
  289. assert info["docker"]["network_mode_hint"] == "bridge"
  290. @pytest.mark.asyncio
  291. @pytest.mark.unit
  292. async def test_docker_section_absent_when_not_docker(self):
  293. """Verify docker section is absent when not in Docker."""
  294. from backend.app.api.routes.support import _collect_support_info
  295. with (
  296. patch("backend.app.api.routes.support.is_running_in_docker", return_value=False),
  297. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  298. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  299. patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]),
  300. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  301. ):
  302. mock_pm.get_all_statuses.return_value = {}
  303. mock_ws.active_connections = []
  304. mock_db = AsyncMock()
  305. mock_result = MagicMock()
  306. mock_result.scalar.return_value = 0
  307. mock_result.scalar_one_or_none.return_value = None
  308. mock_result.scalars.return_value.all.return_value = []
  309. mock_db.execute = AsyncMock(return_value=mock_result)
  310. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  311. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  312. info = await _collect_support_info()
  313. assert "docker" not in info
  314. @pytest.mark.asyncio
  315. @pytest.mark.unit
  316. async def test_dependencies_section(self):
  317. """Verify dependencies section lists package versions."""
  318. from backend.app.api.routes.support import _collect_support_info
  319. with (
  320. patch("backend.app.api.routes.support.is_running_in_docker", return_value=False),
  321. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  322. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  323. patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]),
  324. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  325. ):
  326. mock_pm.get_all_statuses.return_value = {}
  327. mock_ws.active_connections = []
  328. mock_db = AsyncMock()
  329. mock_result = MagicMock()
  330. mock_result.scalar.return_value = 0
  331. mock_result.scalar_one_or_none.return_value = None
  332. mock_result.scalars.return_value.all.return_value = []
  333. mock_db.execute = AsyncMock(return_value=mock_result)
  334. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  335. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  336. info = await _collect_support_info()
  337. assert "dependencies" in info
  338. # fastapi should be installed in test environment
  339. assert "fastapi" in info["dependencies"]
  340. assert info["dependencies"]["fastapi"] is not None
  341. @pytest.mark.asyncio
  342. @pytest.mark.unit
  343. async def test_websockets_section(self):
  344. """Verify websockets section shows connection count."""
  345. from backend.app.api.routes.support import _collect_support_info
  346. with (
  347. patch("backend.app.api.routes.support.is_running_in_docker", return_value=False),
  348. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  349. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  350. patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]),
  351. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  352. ):
  353. mock_pm.get_all_statuses.return_value = {}
  354. mock_ws.active_connections = ["conn1", "conn2"]
  355. mock_db = AsyncMock()
  356. mock_result = MagicMock()
  357. mock_result.scalar.return_value = 0
  358. mock_result.scalar_one_or_none.return_value = None
  359. mock_result.scalars.return_value.all.return_value = []
  360. mock_db.execute = AsyncMock(return_value=mock_result)
  361. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  362. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  363. info = await _collect_support_info()
  364. assert info["websockets"]["active_connections"] == 2
  365. @pytest.mark.asyncio
  366. @pytest.mark.unit
  367. async def test_network_section(self):
  368. """Verify network section shows interface subnets."""
  369. from backend.app.api.routes.support import _collect_support_info
  370. mock_interfaces = [
  371. {"name": "eth0", "ip": "192.168.1.100", "netmask": "255.255.255.0", "subnet": "192.168.1.0/24"},
  372. {"name": "wlan0", "ip": "10.0.0.50", "netmask": "255.255.255.0", "subnet": "10.0.0.0/24"},
  373. ]
  374. with (
  375. patch("backend.app.api.routes.support.is_running_in_docker", return_value=False),
  376. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  377. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  378. patch("backend.app.api.routes.support.get_network_interfaces", return_value=mock_interfaces),
  379. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  380. ):
  381. mock_pm.get_all_statuses.return_value = {}
  382. mock_ws.active_connections = []
  383. mock_db = AsyncMock()
  384. mock_result = MagicMock()
  385. mock_result.scalar.return_value = 0
  386. mock_result.scalar_one_or_none.return_value = None
  387. mock_result.scalars.return_value.all.return_value = []
  388. mock_db.execute = AsyncMock(return_value=mock_result)
  389. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  390. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  391. info = await _collect_support_info()
  392. assert info["network"]["interface_count"] == 2
  393. assert info["network"]["interfaces"][0]["name"] == "eth0"
  394. assert info["network"]["interfaces"][0]["subnet"] == "x.x.1.0/24"
  395. # Verify IP addresses are NOT included (first two octets masked)
  396. for iface in info["network"]["interfaces"]:
  397. assert "ip" not in iface
  398. assert iface["subnet"].startswith("x.x.")
  399. @pytest.mark.asyncio
  400. @pytest.mark.unit
  401. async def test_log_file_section(self):
  402. """Verify log file section shows size info."""
  403. from backend.app.api.routes.support import _collect_support_info
  404. with tempfile.TemporaryDirectory() as tmpdir:
  405. log_dir = Path(tmpdir)
  406. log_file = log_dir / "bambuddy.log"
  407. log_file.write_text("some log content\n" * 100)
  408. with (
  409. patch("backend.app.api.routes.support.is_running_in_docker", return_value=False),
  410. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  411. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  412. patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]),
  413. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  414. patch("backend.app.api.routes.support.settings") as mock_settings,
  415. ):
  416. mock_settings.base_dir = Path(tmpdir)
  417. mock_settings.log_dir = log_dir
  418. mock_settings.debug = False
  419. mock_pm.get_all_statuses.return_value = {}
  420. mock_ws.active_connections = []
  421. mock_db = AsyncMock()
  422. mock_result = MagicMock()
  423. mock_result.scalar.return_value = 0
  424. mock_result.scalar_one_or_none.return_value = None
  425. mock_result.scalars.return_value.all.return_value = []
  426. mock_db.execute = AsyncMock(return_value=mock_result)
  427. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  428. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  429. info = await _collect_support_info()
  430. assert "log_file" in info
  431. assert info["log_file"]["size_bytes"] > 0
  432. assert "B" in info["log_file"]["size_formatted"] or "KB" in info["log_file"]["size_formatted"]
  433. @pytest.mark.asyncio
  434. @pytest.mark.unit
  435. async def test_settings_include_all_keys_with_sensitive_redacted(self):
  436. """All settings keys must appear in output; sensitive values are replaced with [REDACTED]."""
  437. from backend.app.api.routes.support import _collect_support_info
  438. fake_settings = [
  439. MagicMock(key="benign_flag", value="true"),
  440. MagicMock(key="bambu_cloud_token", value="super-secret"),
  441. MagicMock(key="github_webhook", value="https://hooks.example/abc"),
  442. MagicMock(key="empty_password", value=""),
  443. MagicMock(key="local_backup_path", value="/data/backups"),
  444. # Regression: setting was leaking before the `broker` keyword was added.
  445. MagicMock(key="mqtt_broker", value="192.168.255.16"),
  446. # Regression: setting was leaking before the `auth_key` keyword was
  447. # added — and a value-prefix safety net (`tskey-`) was introduced
  448. # so future Tailscale settings auto-redact even if we forget the key.
  449. MagicMock(key="virtual_printer_tailscale_auth_key", value="tskey-auth-secrettokenhere"),
  450. # Value-prefix safety net standalone: a hypothetical future setting
  451. # named without "auth_key" but whose value starts with the Tailscale
  452. # prefix must still redact.
  453. MagicMock(key="some_future_ts_setting", value="tskey-other-secret"),
  454. ]
  455. def make_result(rows=None):
  456. r = MagicMock()
  457. r.scalar.return_value = 0
  458. r.scalar_one_or_none.return_value = None
  459. r.scalars.return_value.all.return_value = rows or []
  460. r.all.return_value = []
  461. return r
  462. async def fake_execute(stmt, *_a, **_kw):
  463. sql = str(stmt).lower()
  464. # Route by table name in the compiled SQL
  465. if "from settings" in sql or "settings.key" in sql:
  466. return make_result(fake_settings)
  467. return make_result([])
  468. with (
  469. tempfile.TemporaryDirectory() as tmpdir,
  470. patch("backend.app.api.routes.support.is_running_in_docker", return_value=False),
  471. patch("backend.app.api.routes.support.async_session") as mock_session_ctx,
  472. patch("backend.app.api.routes.support.printer_manager") as mock_pm,
  473. patch("backend.app.api.routes.support.get_network_interfaces", return_value=[]),
  474. patch("backend.app.api.routes.support.ws_manager") as mock_ws,
  475. patch("backend.app.api.routes.support.settings") as mock_settings,
  476. ):
  477. mock_settings.base_dir = Path(tmpdir)
  478. mock_settings.log_dir = Path(tmpdir)
  479. mock_settings.debug = False
  480. mock_pm.get_all_statuses.return_value = {}
  481. mock_ws.active_connections = []
  482. mock_db = AsyncMock()
  483. mock_db.execute = fake_execute
  484. mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  485. mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  486. info = await _collect_support_info()
  487. s = info["settings"]
  488. assert s.get("bambu_cloud_token") == "[REDACTED]"
  489. assert s.get("github_webhook") == "[REDACTED]"
  490. assert s.get("local_backup_path") == "[REDACTED]"
  491. assert s.get("empty_password") == ""
  492. assert s.get("benign_flag") == "true"
  493. assert s.get("mqtt_broker") == "[REDACTED]"
  494. assert s.get("virtual_printer_tailscale_auth_key") == "[REDACTED]"
  495. assert s.get("some_future_ts_setting") == "[REDACTED]"
  496. class TestParseObicoEnabledPrinters:
  497. """Tests for the per-printer obico flag parser used by the bundle."""
  498. def test_empty_string_returns_empty_set(self):
  499. from backend.app.api.routes.support import _parse_obico_enabled_printers
  500. assert _parse_obico_enabled_printers("") == set()
  501. assert _parse_obico_enabled_printers(" ") == set()
  502. def test_comma_separated_ids(self):
  503. from backend.app.api.routes.support import _parse_obico_enabled_printers
  504. assert _parse_obico_enabled_printers("1,2,3") == {1, 2, 3}
  505. # Whitespace around tokens is forgiven (matches obico_detection's parser).
  506. assert _parse_obico_enabled_printers("1, 2 ,3") == {1, 2, 3}
  507. def test_non_integer_tokens_are_skipped(self):
  508. # Defensive against legacy/manually-edited setting values.
  509. from backend.app.api.routes.support import _parse_obico_enabled_printers
  510. assert _parse_obico_enabled_printers("1,abc,2") == {1, 2}
  511. assert _parse_obico_enabled_printers(",,1,") == {1}
  512. class TestCheckUrlReachable:
  513. """Tests for the slicer-API reachability ping."""
  514. @pytest.mark.asyncio
  515. async def test_empty_url_returns_none(self):
  516. from backend.app.api.routes.support import _check_url_reachable
  517. assert await _check_url_reachable("") is None
  518. assert await _check_url_reachable(" ") is None
  519. @pytest.mark.asyncio
  520. async def test_successful_response_is_reachable_even_on_404(self):
  521. # A 404 means the API is up; we want to separate network failure from
  522. # configuration mistakes, so non-empty status counts as reachable.
  523. from backend.app.api.routes.support import _check_url_reachable
  524. with patch("httpx.AsyncClient") as mock_client_cls:
  525. mock_client = AsyncMock()
  526. mock_client_cls.return_value.__aenter__.return_value = mock_client
  527. mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  528. mock_response = MagicMock()
  529. mock_response.status_code = 404
  530. mock_client.get = AsyncMock(return_value=mock_response)
  531. result = await _check_url_reachable("http://localhost:3001/api")
  532. assert result is True
  533. @pytest.mark.asyncio
  534. async def test_connection_error_returns_false(self):
  535. from backend.app.api.routes.support import _check_url_reachable
  536. with patch("httpx.AsyncClient") as mock_client_cls:
  537. mock_client_cls.return_value.__aenter__.side_effect = ConnectionError("boom")
  538. result = await _check_url_reachable("http://nowhere:9999")
  539. assert result is False
  540. class TestCollectSlicerApiInfo:
  541. """Tests for the slicer-API info block (configured URLs + reachability).
  542. The collector reads URLs DIRECTLY from the DB rather than from the
  543. already-redacted ``info["settings"]`` dict — the previous version was
  544. pinging the literal string "[REDACTED]" (which httpx rejects) and getting
  545. ``False`` for any installation that actually had a slicer-API configured.
  546. These tests inject the raw URLs via a mocked `async_session` so the
  547. collector sees them as if they came from the unredacted Settings table.
  548. """
  549. def _make_settings_session(self, settings_dict):
  550. rows = [MagicMock(key=k, value=v) for k, v in settings_dict.items()]
  551. result = MagicMock()
  552. result.scalars.return_value.all.return_value = rows
  553. mock_db = AsyncMock()
  554. mock_db.execute = AsyncMock(return_value=result)
  555. ctx = MagicMock()
  556. ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
  557. ctx.return_value.__aexit__ = AsyncMock(return_value=False)
  558. return ctx
  559. @pytest.mark.asyncio
  560. async def test_disabled_does_not_run_reachability_check(self):
  561. from backend.app.api.routes.support import _collect_slicer_api_info
  562. session_ctx = self._make_settings_session({"use_slicer_api": "false", "preferred_slicer": "bambu_studio"})
  563. with (
  564. patch("backend.app.api.routes.support.async_session", session_ctx),
  565. patch("backend.app.api.routes.support._check_url_reachable") as mock_check,
  566. ):
  567. info = await _collect_slicer_api_info()
  568. mock_check.assert_not_called()
  569. assert info["enabled"] is False
  570. assert info["preferred"] == "bambu_studio"
  571. assert info["bambu_studio_url_set_in_db"] is False
  572. assert info["orcaslicer_url_set_in_db"] is False
  573. assert "bambu_studio_reachable" not in info
  574. assert "orcaslicer_reachable" not in info
  575. @pytest.mark.asyncio
  576. async def test_enabled_runs_reachability_check_for_both_urls(self):
  577. from backend.app.api.routes.support import _collect_slicer_api_info
  578. async def fake_check(url, timeout=2.0):
  579. return "orca" in url
  580. session_ctx = self._make_settings_session(
  581. {
  582. "use_slicer_api": "true",
  583. "preferred_slicer": "orcaslicer",
  584. "bambu_studio_api_url": "http://bs:3001",
  585. "orcaslicer_api_url": "http://orca:3003",
  586. }
  587. )
  588. with (
  589. patch("backend.app.api.routes.support.async_session", session_ctx),
  590. patch("backend.app.api.routes.support._check_url_reachable", side_effect=fake_check),
  591. ):
  592. info = await _collect_slicer_api_info()
  593. assert info["enabled"] is True
  594. assert info["bambu_studio_url_set_in_db"] is True
  595. assert info["orcaslicer_url_set_in_db"] is True
  596. assert info["bambu_studio_url_source"] == "db"
  597. assert info["orcaslicer_url_source"] == "db"
  598. assert info["bambu_studio_reachable"] is False
  599. assert info["orcaslicer_reachable"] is True
  600. @pytest.mark.asyncio
  601. async def test_env_var_fallback_url_pinged_when_db_setting_empty(self):
  602. """Regression for the second pass on #support-bundle audit: the
  603. previous version returned `null` for `bambu_studio_reachable` on every
  604. installation that ran the sidecar via env var rather than via the DB
  605. setting (the common case for the default `http://localhost:3001`).
  606. The resolver now mirrors the precedence used by `archives.py:3174-3180`
  607. — DB setting first, then `app_settings.bambu_studio_api_url` (which
  608. reads the `BAMBU_STUDIO_API_URL` env var or the built-in default).
  609. """
  610. from backend.app.api.routes.support import _collect_slicer_api_info
  611. seen_urls: list[str] = []
  612. async def fake_check(url, timeout=2.0):
  613. seen_urls.append(url)
  614. return True
  615. # DB has use_slicer_api=true but NO bambu_studio_api_url row, simulating
  616. # a user who set the URL via the BAMBU_STUDIO_API_URL env var.
  617. session_ctx = self._make_settings_session({"use_slicer_api": "true", "preferred_slicer": "bambu_studio"})
  618. with (
  619. patch("backend.app.api.routes.support.async_session", session_ctx),
  620. patch("backend.app.api.routes.support._check_url_reachable", side_effect=fake_check),
  621. patch("backend.app.api.routes.support.settings") as mock_app_settings,
  622. ):
  623. # Pydantic-settings would normally do this for us when reading the
  624. # env var — we mock the resolved value directly.
  625. mock_app_settings.bambu_studio_api_url = "http://my-sidecar:3001"
  626. mock_app_settings.slicer_api_url = "http://localhost:3003"
  627. info = await _collect_slicer_api_info()
  628. # The env-var URL was the one actually pinged.
  629. assert "http://my-sidecar:3001" in seen_urls
  630. # And the source-tracking field shows we fell back from the DB to env.
  631. assert info["bambu_studio_url_set_in_db"] is False
  632. assert info["bambu_studio_url_source"] == "env_or_default"
  633. assert info["bambu_studio_reachable"] is True
  634. @pytest.mark.asyncio
  635. async def test_reachability_uses_unredacted_url(self):
  636. """Regression: the collector previously pinged the literal '[REDACTED]'
  637. from the already-sanitized info["settings"] dict and always returned
  638. False. The collector must read the un-redacted URL fresh from the DB.
  639. """
  640. from backend.app.api.routes.support import _collect_slicer_api_info
  641. seen_urls: list[str] = []
  642. async def fake_check(url, timeout=2.0):
  643. seen_urls.append(url)
  644. return True
  645. session_ctx = self._make_settings_session(
  646. {
  647. "use_slicer_api": "true",
  648. "bambu_studio_api_url": "http://real-bs-host:3001",
  649. "orcaslicer_api_url": "http://real-orca-host:3003",
  650. }
  651. )
  652. with (
  653. patch("backend.app.api.routes.support.async_session", session_ctx),
  654. patch("backend.app.api.routes.support._check_url_reachable", side_effect=fake_check),
  655. ):
  656. await _collect_slicer_api_info()
  657. assert "http://real-bs-host:3001" in seen_urls
  658. assert "http://real-orca-host:3003" in seen_urls
  659. assert "[REDACTED]" not in seen_urls
  660. class TestCollectAuthInfo:
  661. """Tests for the OIDC / 2FA / API-key / group bundle block."""
  662. @pytest.mark.asyncio
  663. async def test_empty_database_returns_zero_counts_and_empty_list(self):
  664. from backend.app.api.routes.support import _collect_auth_info
  665. def make_count(value):
  666. r = MagicMock()
  667. r.scalar.return_value = value
  668. r.scalar_one_or_none.return_value = None
  669. r.scalars.return_value.all.return_value = []
  670. r.all.return_value = []
  671. return r
  672. async def fake_execute(stmt, *_a, **_kw):
  673. return make_count(0)
  674. db = AsyncMock()
  675. db.execute = fake_execute
  676. info = await _collect_auth_info(db)
  677. assert info["oidc_providers"] == []
  678. assert info["users_with_totp"] == 0
  679. assert info["email_otp_codes_pending"] == 0
  680. assert info["api_keys_total"] == 0
  681. assert info["api_keys_enabled"] == 0
  682. assert info["api_keys_expired"] == 0
  683. assert info["long_lived_tokens_total"] == 0
  684. assert info["long_lived_tokens_active"] == 0
  685. assert info["groups_system"] == 0
  686. assert info["groups_custom"] == 0
  687. @pytest.mark.asyncio
  688. async def test_oidc_provider_names_exported_in_cleartext(self):
  689. """Provider names are login-button labels — public, not a secret. Triage
  690. for SSO bugs is significantly easier when the provider is identified."""
  691. from backend.app.api.routes.support import _collect_auth_info
  692. provider = MagicMock()
  693. provider.id = 1
  694. provider.name = "PocketID"
  695. provider.is_enabled = True
  696. provider.scopes = "openid email profile"
  697. provider.email_claim = "email"
  698. provider.require_email_verified = True
  699. provider.auto_create_users = False
  700. provider.auto_link_existing_accounts = False
  701. provider.default_group_id = None
  702. provider.icon_url = None
  703. def make_result(rows=None, count=0):
  704. r = MagicMock()
  705. r.scalar.return_value = count
  706. r.scalar_one_or_none.return_value = None
  707. r.scalars.return_value.all.return_value = rows or []
  708. r.all.return_value = []
  709. return r
  710. async def fake_execute(stmt, *_a, **_kw):
  711. sql = str(stmt).lower()
  712. if "oidc_providers" in sql and "user_oidc_link" not in sql:
  713. return make_result([provider])
  714. return make_result(count=0)
  715. db = AsyncMock()
  716. db.execute = fake_execute
  717. info = await _collect_auth_info(db)
  718. assert len(info["oidc_providers"]) == 1
  719. oidc = info["oidc_providers"][0]
  720. assert oidc["name"] == "PocketID"
  721. # No secrets leak through — these fields don't exist on the dict.
  722. assert "client_id" not in oidc
  723. assert "client_secret" not in oidc
  724. assert "issuer_url" not in oidc
  725. class TestCollectGitHubBackupInfo:
  726. """Tests for the GitHub-backup provider/failure-count block."""
  727. @pytest.mark.asyncio
  728. async def test_aggregates_providers_and_recent_failures(self):
  729. from backend.app.api.routes.support import _collect_github_backup_info
  730. c1 = MagicMock(provider="github", last_backup_status="success", schedule_enabled=True)
  731. c2 = MagicMock(provider="github", last_backup_status="failed", schedule_enabled=False)
  732. c3 = MagicMock(provider="gitea", last_backup_status="failed", schedule_enabled=True)
  733. result = MagicMock()
  734. result.scalars.return_value.all.return_value = [c1, c2, c3]
  735. db = AsyncMock()
  736. db.execute = AsyncMock(return_value=result)
  737. info = await _collect_github_backup_info(db)
  738. assert info["configs_total"] == 3
  739. assert info["providers_used"] == {"github": 2, "gitea": 1}
  740. assert info["schedule_enabled_count"] == 2
  741. assert info["last_failure_count"] == 2