test_camera_diagnose.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """Unit tests for the staged camera diagnostic.
  2. Covers the per-stage pass/fail contract that drives the frontend
  3. remediation hints. The live-stream shortcut and the failure-to-summary
  4. mapping are the load-bearing pieces — both are pinned with explicit
  5. tests so future profile/protocol changes don't silently turn
  6. "camera_port_closed" into "printer_unreachable".
  7. """
  8. from unittest.mock import AsyncMock, patch
  9. import pytest
  10. from backend.app.services.camera_diagnose import (
  11. _LIVE_FRAME_FRESHNESS_SECONDS,
  12. diagnose_camera,
  13. )
  14. class TestLiveStreamShortcut:
  15. """If a viewer is currently watching the camera with a fresh frame,
  16. diagnose must NOT open a fresh socket — single-camera-connection
  17. firmwares would kick the live viewer off. Trust the live evidence.
  18. """
  19. @pytest.mark.asyncio
  20. async def test_skips_test_when_fresh_frame_in_active_stream(self):
  21. result = await diagnose_camera(
  22. ip_address="192.0.2.1",
  23. access_code="x",
  24. model="X1C",
  25. printer_id=1,
  26. has_live_stream=True,
  27. live_frame_age_seconds=2.0,
  28. )
  29. assert result.overall_status == "ok"
  30. assert result.summary_code == "live_stream_active_healthy"
  31. assert len(result.stages) == 1
  32. assert result.stages[0].name == "live_stream_active"
  33. assert result.stages[0].status == "ok"
  34. @pytest.mark.asyncio
  35. async def test_runs_test_when_stale_frame_in_active_stream(self):
  36. """An active stream with a stale buffered frame (e.g. mid-
  37. reconnect) shouldn't short-circuit — the stream might be
  38. wedged and the user needs the real test."""
  39. with patch(
  40. "backend.app.services.camera_diagnose.asyncio.open_connection",
  41. new_callable=AsyncMock,
  42. side_effect=TimeoutError,
  43. ):
  44. result = await diagnose_camera(
  45. ip_address="192.0.2.1",
  46. access_code="x",
  47. model="X1C",
  48. printer_id=1,
  49. has_live_stream=True,
  50. live_frame_age_seconds=_LIVE_FRAME_FRESHNESS_SECONDS + 5,
  51. )
  52. # No short-circuit — we ran the real check and it failed.
  53. assert result.summary_code != "live_stream_active_healthy"
  54. assert any(s.name == "tcp_reachable" for s in result.stages)
  55. class TestTcpStage:
  56. """The first stage answers "can we even talk to the printer at all".
  57. The three failure modes (timeout / refused / unreachable) map to
  58. distinct user-facing remediation hints, so the codes must round-
  59. trip correctly through ``_summary_for_stages``."""
  60. @pytest.mark.asyncio
  61. async def test_timeout_maps_to_printer_unreachable(self):
  62. with patch(
  63. "backend.app.services.camera_diagnose.asyncio.open_connection",
  64. new_callable=AsyncMock,
  65. side_effect=TimeoutError,
  66. ):
  67. result = await diagnose_camera(
  68. ip_address="192.0.2.99",
  69. access_code="x",
  70. model="P2S",
  71. printer_id=1,
  72. )
  73. assert result.overall_status == "failed"
  74. assert result.summary_code == "printer_unreachable"
  75. first = result.stages[0]
  76. assert first.name == "tcp_reachable"
  77. assert first.code == "tcp_timeout"
  78. # Second stage was skipped — no point spawning ffmpeg with no socket.
  79. assert result.stages[1].name == "first_frame"
  80. assert result.stages[1].status == "skipped"
  81. @pytest.mark.asyncio
  82. async def test_connection_refused_maps_to_camera_port_closed(self):
  83. """ConnectionRefusedError = printer up, port closed. Common
  84. cause: LAN-only mode off, or developer mode off. The user
  85. sees a specific remediation hint, not the generic
  86. 'unreachable' message."""
  87. with patch(
  88. "backend.app.services.camera_diagnose.asyncio.open_connection",
  89. new_callable=AsyncMock,
  90. side_effect=ConnectionRefusedError(),
  91. ):
  92. result = await diagnose_camera(
  93. ip_address="192.0.2.1",
  94. access_code="x",
  95. model="P2S",
  96. printer_id=1,
  97. )
  98. assert result.summary_code == "camera_port_closed"
  99. assert result.stages[0].code == "tcp_refused"
  100. @pytest.mark.asyncio
  101. async def test_oserror_maps_to_printer_unreachable(self):
  102. """Generic OSError (no-route-to-host etc.) lumps under
  103. 'printer_unreachable' — same remediation as timeout."""
  104. with patch(
  105. "backend.app.services.camera_diagnose.asyncio.open_connection",
  106. new_callable=AsyncMock,
  107. side_effect=OSError("No route to host"),
  108. ):
  109. result = await diagnose_camera(
  110. ip_address="192.0.2.1",
  111. access_code="x",
  112. model="P2S",
  113. printer_id=1,
  114. )
  115. assert result.summary_code == "printer_unreachable"
  116. assert result.stages[0].code == "tcp_unreachable"
  117. class TestFirstFrameStage:
  118. """The second stage answers "is the camera actually producing
  119. frames". If TCP passes but no frame comes back, the answer is the
  120. same regardless of which sub-layer failed (auth, RTSP handshake,
  121. keyframe probe): the user can't see the camera."""
  122. @pytest.mark.asyncio
  123. async def test_no_frame_maps_to_no_frame_summary(self):
  124. async def _tcp_ok(*_a, **_kw):
  125. writer = AsyncMock()
  126. return AsyncMock(), writer
  127. with (
  128. patch(
  129. "backend.app.services.camera_diagnose.asyncio.open_connection",
  130. new=_tcp_ok,
  131. ),
  132. patch(
  133. "backend.app.services.camera_diagnose.capture_camera_frame_bytes",
  134. new_callable=AsyncMock,
  135. return_value=None,
  136. ),
  137. ):
  138. result = await diagnose_camera(
  139. ip_address="192.0.2.1",
  140. access_code="x",
  141. model="P2S",
  142. printer_id=1,
  143. )
  144. assert result.overall_status == "failed"
  145. assert result.summary_code == "no_frame"
  146. assert result.stages[0].status == "ok"
  147. assert result.stages[1].name == "first_frame"
  148. assert result.stages[1].code == "no_frame"
  149. @pytest.mark.asyncio
  150. async def test_capture_exception_maps_to_no_frame_summary(self):
  151. """ffmpeg crash / TLS proxy startup failure / etc. — all the
  152. sub-layer exceptions surface as 'no_frame' for the user, with
  153. a distinct ``capture_exception`` code in the stage so the
  154. support log retains the distinction."""
  155. async def _tcp_ok(*_a, **_kw):
  156. writer = AsyncMock()
  157. return AsyncMock(), writer
  158. with (
  159. patch(
  160. "backend.app.services.camera_diagnose.asyncio.open_connection",
  161. new=_tcp_ok,
  162. ),
  163. patch(
  164. "backend.app.services.camera_diagnose.capture_camera_frame_bytes",
  165. new_callable=AsyncMock,
  166. side_effect=RuntimeError("ffmpeg died"),
  167. ),
  168. ):
  169. result = await diagnose_camera(
  170. ip_address="192.0.2.1",
  171. access_code="x",
  172. model="P2S",
  173. printer_id=1,
  174. )
  175. assert result.summary_code == "no_frame"
  176. assert result.stages[1].code == "capture_exception"
  177. @pytest.mark.asyncio
  178. async def test_full_success_path(self):
  179. async def _tcp_ok(*_a, **_kw):
  180. writer = AsyncMock()
  181. return AsyncMock(), writer
  182. with (
  183. patch(
  184. "backend.app.services.camera_diagnose.asyncio.open_connection",
  185. new=_tcp_ok,
  186. ),
  187. patch(
  188. "backend.app.services.camera_diagnose.capture_camera_frame_bytes",
  189. new_callable=AsyncMock,
  190. return_value=b"\xff\xd8\xff\xd9", # tiny valid-looking JPEG
  191. ),
  192. ):
  193. result = await diagnose_camera(
  194. ip_address="192.0.2.1",
  195. access_code="x",
  196. model="P2S",
  197. printer_id=1,
  198. )
  199. assert result.overall_status == "ok"
  200. assert result.summary_code == "all_ok"
  201. assert all(s.status == "ok" for s in result.stages)
  202. class TestResultMetadata:
  203. """Surface fields the support triage relies on — protocol, port,
  204. profile name. The frontend renders these so we can ask the user
  205. 'is your profile 'P2S' or 'default'?' over a screenshot rather
  206. than asking for the support bundle."""
  207. @pytest.mark.asyncio
  208. async def test_p2s_reports_p2s_profile_and_rtsp_protocol(self):
  209. with patch(
  210. "backend.app.services.camera_diagnose.asyncio.open_connection",
  211. new_callable=AsyncMock,
  212. side_effect=TimeoutError,
  213. ):
  214. result = await diagnose_camera(
  215. ip_address="192.0.2.1",
  216. access_code="x",
  217. model="P2S",
  218. printer_id=1,
  219. )
  220. assert result.protocol == "rtsp"
  221. assert result.profile == "P2S"
  222. assert result.port == 322
  223. @pytest.mark.asyncio
  224. async def test_a1_reports_default_profile_and_chamber_protocol(self):
  225. with patch(
  226. "backend.app.services.camera_diagnose.asyncio.open_connection",
  227. new_callable=AsyncMock,
  228. side_effect=TimeoutError,
  229. ):
  230. result = await diagnose_camera(
  231. ip_address="192.0.2.1",
  232. access_code="x",
  233. model="A1",
  234. printer_id=1,
  235. )
  236. assert result.protocol == "chamber_image"
  237. assert result.profile == "default"
  238. assert result.port == 6000
  239. @pytest.mark.asyncio
  240. async def test_x1c_reports_default_profile_and_rtsp(self):
  241. with patch(
  242. "backend.app.services.camera_diagnose.asyncio.open_connection",
  243. new_callable=AsyncMock,
  244. side_effect=TimeoutError,
  245. ):
  246. result = await diagnose_camera(
  247. ip_address="192.0.2.1",
  248. access_code="x",
  249. model="X1C",
  250. printer_id=1,
  251. )
  252. assert result.protocol == "rtsp"
  253. assert result.profile == "default"
  254. assert result.port == 322