test_camera_tls_proxy.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """Tests for the camera TLS proxy and RTSP URL rewriting."""
  2. import asyncio
  3. import pytest
  4. from backend.app.services.camera import create_tls_proxy, rewrite_rtsp_request_url
  5. class TestRewriteRtspRequestUrl:
  6. """Tests for RTSP request-line URL rewriting."""
  7. def test_rewrites_describe_request_line(self):
  8. proxy_url = b"rtsp://127.0.0.1:45221"
  9. real_url = b"rtsps://192.168.1.100:322"
  10. data = b"DESCRIBE rtsp://127.0.0.1:45221/streaming/live/1 RTSP/1.0\r\nCSeq: 1\r\n\r\n"
  11. result = rewrite_rtsp_request_url(data, proxy_url, real_url)
  12. assert b"DESCRIBE rtsps://192.168.1.100:322/streaming/live/1 RTSP/1.0\r\n" in result
  13. def test_rewrites_setup_request_line(self):
  14. proxy_url = b"rtsp://127.0.0.1:45221"
  15. real_url = b"rtsps://192.168.1.100:322"
  16. data = b"SETUP rtsp://127.0.0.1:45221/streaming/live/1/trackID=0 RTSP/1.0\r\nCSeq: 3\r\n\r\n"
  17. result = rewrite_rtsp_request_url(data, proxy_url, real_url)
  18. assert b"SETUP rtsps://192.168.1.100:322/streaming/live/1/trackID=0 RTSP/1.0\r\n" in result
  19. def test_rewrites_play_request_line(self):
  20. proxy_url = b"rtsp://127.0.0.1:45221"
  21. real_url = b"rtsps://192.168.1.100:322"
  22. data = b"PLAY rtsp://127.0.0.1:45221/streaming/live/1 RTSP/1.0\r\nCSeq: 5\r\n\r\n"
  23. result = rewrite_rtsp_request_url(data, proxy_url, real_url)
  24. assert b"PLAY rtsps://192.168.1.100:322/streaming/live/1 RTSP/1.0\r\n" in result
  25. def test_preserves_authorization_header(self):
  26. """Digest auth embeds the URI in a hash — rewriting it breaks auth."""
  27. proxy_url = b"rtsp://127.0.0.1:45221"
  28. real_url = b"rtsps://192.168.1.100:322"
  29. data = (
  30. b"DESCRIBE rtsp://127.0.0.1:45221/streaming/live/1 RTSP/1.0\r\n"
  31. b"CSeq: 2\r\n"
  32. b'Authorization: Digest username="bblp", '
  33. b'uri="rtsp://127.0.0.1:45221/streaming/live/1", '
  34. b'response="abc123"\r\n'
  35. b"\r\n"
  36. )
  37. result = rewrite_rtsp_request_url(data, proxy_url, real_url)
  38. # Request line IS rewritten
  39. assert b"DESCRIBE rtsps://192.168.1.100:322/streaming/live/1 RTSP/1.0\r\n" in result
  40. # Authorization header is NOT rewritten
  41. assert b'uri="rtsp://127.0.0.1:45221/streaming/live/1"' in result
  42. assert b'response="abc123"' in result
  43. def test_no_rewrite_on_non_rtsp_data(self):
  44. """Binary RTP data and other non-RTSP data should pass through unchanged."""
  45. proxy_url = b"rtsp://127.0.0.1:45221"
  46. real_url = b"rtsps://192.168.1.100:322"
  47. # Interleaved RTP data (starts with $)
  48. data = b"$\x00\x00\x10" + b"\x00" * 16
  49. result = rewrite_rtsp_request_url(data, proxy_url, real_url)
  50. assert result == data
  51. def test_no_rewrite_on_empty_data(self):
  52. proxy_url = b"rtsp://127.0.0.1:45221"
  53. real_url = b"rtsps://192.168.1.100:322"
  54. assert rewrite_rtsp_request_url(b"", proxy_url, real_url) == b""
  55. def test_only_first_rtsp_line_rewritten(self):
  56. """If somehow multiple RTSP/1.0 lines exist, only the first is rewritten."""
  57. proxy_url = b"rtsp://127.0.0.1:45221"
  58. real_url = b"rtsps://192.168.1.100:322"
  59. data = (
  60. b"DESCRIBE rtsp://127.0.0.1:45221/streaming/live/1 RTSP/1.0\r\n"
  61. b"CSeq: 1\r\n"
  62. b"X-Custom: rtsp://127.0.0.1:45221/other RTSP/1.0\r\n"
  63. b"\r\n"
  64. )
  65. result = rewrite_rtsp_request_url(data, proxy_url, real_url)
  66. lines = result.split(b"\r\n")
  67. # First line rewritten
  68. assert lines[0] == b"DESCRIBE rtsps://192.168.1.100:322/streaming/live/1 RTSP/1.0"
  69. # Hypothetical other line NOT rewritten
  70. assert lines[2] == b"X-Custom: rtsp://127.0.0.1:45221/other RTSP/1.0"
  71. def test_preserves_crlf_structure(self):
  72. proxy_url = b"rtsp://127.0.0.1:45221"
  73. real_url = b"rtsps://192.168.1.100:322"
  74. data = b"DESCRIBE rtsp://127.0.0.1:45221/streaming/live/1 RTSP/1.0\r\nCSeq: 1\r\n\r\n"
  75. result = rewrite_rtsp_request_url(data, proxy_url, real_url)
  76. # Must still end with double CRLF (empty line terminates headers)
  77. assert result.endswith(b"\r\n\r\n")
  78. # Must have CSeq intact
  79. assert b"CSeq: 1\r\n" in result
  80. class TestCreateTlsProxy:
  81. """Tests for TLS proxy server lifecycle."""
  82. @pytest.mark.asyncio
  83. async def test_proxy_returns_port_and_server(self):
  84. """Verify proxy creates a listening server on an ephemeral port."""
  85. # Use a non-routable target — we just test the server starts, not the TLS connection
  86. port, server = await create_tls_proxy("192.0.2.1", 322)
  87. assert isinstance(port, int)
  88. assert port > 0
  89. assert server.is_serving()
  90. server.close()
  91. await server.wait_closed()
  92. @pytest.mark.asyncio
  93. async def test_proxy_accepts_connection(self):
  94. """Verify proxy accepts TCP connections (TLS to target will fail, but accept works)."""
  95. port, server = await create_tls_proxy("192.0.2.1", 322)
  96. try:
  97. # Connect to the proxy — it should accept the connection
  98. reader, writer = await asyncio.wait_for(
  99. asyncio.open_connection("127.0.0.1", port),
  100. timeout=2.0,
  101. )
  102. # The proxy will try to connect to 192.0.2.1:322 (non-routable), fail,
  103. # and close our connection. That's expected.
  104. writer.close()
  105. await writer.wait_closed()
  106. except (ConnectionError, TimeoutError):
  107. pass # Expected — target is unreachable
  108. server.close()
  109. await server.wait_closed()
  110. @pytest.mark.asyncio
  111. async def test_proxy_cleanup(self):
  112. """Verify proxy stops serving after close."""
  113. port, server = await create_tls_proxy("192.0.2.1", 322)
  114. assert server.is_serving()
  115. server.close()
  116. await server.wait_closed()
  117. assert not server.is_serving()
  118. class TestForwardersCatchRuntimeError:
  119. """Regression contract: the bidirectional forwarders inside ``_handle``
  120. must catch ``RuntimeError``, not just the connection-error tuple.
  121. asyncio's default selector event loop reports a write-to-closed-handle as
  122. ``ConnectionResetError`` / ``OSError``. uvloop (which is what runs under
  123. uvicorn's ``--loop uvloop`` / when ``uvloop`` is installed) raises a plain
  124. ``RuntimeError`` from ``UVHandle._ensure_alive``. If the except clause
  125. drops ``RuntimeError`` the handler escapes the forwarder, asyncio's
  126. ``client_connected_cb`` task-exception handler logs an "Unhandled
  127. exception" stack, and the user sees noise like:
  128. ERROR [asyncio] Unhandled exception in client_connected_cb
  129. ...
  130. RuntimeError: unable to perform operation on
  131. <TCPTransport closed=True ...>; the handler is closed
  132. Regression guard for that path. Source-level check rather than a runtime
  133. test because the forwarders are nested closures inside ``_handle`` and
  134. extracting them just for testability would require a pure-cosmetic
  135. refactor of the proxy.
  136. """
  137. def test_fwd_to_server_catches_runtime_error(self):
  138. import inspect
  139. src = inspect.getsource(create_tls_proxy)
  140. fwd_section = src.split("async def _fwd_to_server")[1].split("async def _fwd_to_client")[0]
  141. assert "RuntimeError" in fwd_section, (
  142. "_fwd_to_server must catch RuntimeError to absorb uvloop's "
  143. "write-to-closed-handle error; otherwise it leaks to "
  144. "asyncio.client_connected_cb's unhandled-exception logger."
  145. )
  146. def test_fwd_to_client_catches_runtime_error(self):
  147. import inspect
  148. src = inspect.getsource(create_tls_proxy)
  149. # Slice from `_fwd_to_client` to `await asyncio.gather` so we only
  150. # inspect that closure's body.
  151. fwd_section = src.split("async def _fwd_to_client")[1].split("await asyncio.gather")[0]
  152. assert "RuntimeError" in fwd_section, (
  153. "_fwd_to_client must catch RuntimeError — that's the actual frame "
  154. "in the original bug report (camera.py:191 dst.write(data) under "
  155. "uvloop)."
  156. )