test_vp_tls_ciphers.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. """Regression tests for #1610: every slicer-facing VP TLS context must
  2. explicitly include the plain-RSA AES-GCM cipher suites that real Bambu
  3. printers (and therefore the BambuStudio / OrcaSlicer client paths) expect.
  4. Real Bambu printers offer only ``AES256-GCM-SHA384`` / ``AES128-GCM-SHA256``
  5. (plain RSA key exchange) on their TLS endpoints. Slicers built against
  6. that surface assume the server side will accept those suites. On
  7. distributions whose OpenSSL ``DEFAULT`` cipher list has been narrowed by a
  8. system crypto policy (Fedora / RHEL ``update-crypto-policies``, hardened
  9. Alpine builds), Python's stock ``SSLContext`` ends up offering only
  10. ECDHE/DHE — no overlap with the slicer's ClientHello, the handshake
  11. aborts, and the slicer reports a generic ``code=-1`` connect error.
  12. The #620 patch fixed this for the printer-facing CLIENT context in
  13. ``tcp_proxy.py::_create_client_ssl_context``. #1610 audited the remaining
  14. slicer-facing surface and applied the same explicit cipher pin to every
  15. context that accepts a slicer connection:
  16. * ``bind_server.py::_create_tls_context`` — port 3002 (bind/detect)
  17. * ``mqtt_server.py`` (inline in ``start``) — port 8883 (MQTT-over-TLS)
  18. * ``tcp_proxy.py::_create_server_ssl_context`` — proxy-mode 3002
  19. * ``ftp_server.py`` (inline in ``start``) — port 990 (FTPS)
  20. If any of these regress to a context that no longer offers
  21. ``AES256-GCM-SHA384`` / ``AES128-GCM-SHA256``, users on hardened distros
  22. will hit the #1610 / #620 cipher-mismatch failure mode.
  23. """
  24. import ssl
  25. from pathlib import Path
  26. from unittest.mock import patch
  27. import pytest
  28. from backend.app.services.virtual_printer.bind_server import BindServer
  29. from backend.app.services.virtual_printer.certificate import CertificateService
  30. from backend.app.services.virtual_printer.ftp_server import VirtualPrinterFTPServer
  31. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  32. from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
  33. REQUIRED_CIPHERS = ("AES256-GCM-SHA384", "AES128-GCM-SHA256")
  34. def _cert_pair(tmp_path: Path) -> tuple[Path, Path]:
  35. """Generate a real self-signed CA + per-VP cert pair via the production
  36. CertificateService. Returns ``(cert_path, key_path)`` suitable for
  37. ``load_cert_chain`` calls inside the VP services under test."""
  38. svc = CertificateService(cert_dir=tmp_path, serial="01P00A391800001")
  39. return svc.ensure_certificates()
  40. def _assert_required_ciphers(ctx: ssl.SSLContext, where: str) -> None:
  41. """Fail with a useful diagnostic if either required cipher is missing."""
  42. offered = {c["name"] for c in ctx.get_ciphers()}
  43. missing = [c for c in REQUIRED_CIPHERS if c not in offered]
  44. assert not missing, (
  45. f"{where}: missing plain-RSA AES-GCM cipher(s) {missing}. "
  46. f"Real Bambu printers / slicers require these on the slicer-facing "
  47. f"TLS surface — see #1610. Offered ciphers: {sorted(offered)}"
  48. )
  49. class TestBindServerTlsCiphers:
  50. def test_create_tls_context_offers_plain_rsa_aes_gcm(self, tmp_path):
  51. cert_path, key_path = _cert_pair(tmp_path)
  52. server = BindServer(
  53. serial="01P00A391800001",
  54. model="C12",
  55. name="vp",
  56. version="01.07.00.00",
  57. bind_address="127.0.0.1",
  58. cert_path=cert_path,
  59. key_path=key_path,
  60. )
  61. ctx = server._create_tls_context()
  62. assert ctx is not None
  63. _assert_required_ciphers(ctx, "bind_server._create_tls_context")
  64. class TestTlsProxyServerCiphers:
  65. """Slicer-facing side of proxy mode — was not patched by the #620 fix."""
  66. def test_create_server_ssl_context_offers_plain_rsa_aes_gcm(self, tmp_path):
  67. cert_path, key_path = _cert_pair(tmp_path)
  68. proxy = TLSProxy(
  69. name="Bind-TLS",
  70. listen_port=3002,
  71. target_host="127.0.0.1",
  72. target_port=3002,
  73. server_cert_path=str(cert_path),
  74. server_key_path=str(key_path),
  75. on_connect=lambda cid: None,
  76. on_disconnect=lambda cid: None,
  77. bind_address="127.0.0.1",
  78. )
  79. ctx = proxy._create_server_ssl_context()
  80. _assert_required_ciphers(ctx, "tcp_proxy._create_server_ssl_context")
  81. def test_create_client_ssl_context_still_offers_plain_rsa_aes_gcm(self, tmp_path):
  82. """The original #620 fix must remain in place."""
  83. cert_path, key_path = _cert_pair(tmp_path)
  84. proxy = TLSProxy(
  85. name="Bind-TLS",
  86. listen_port=3002,
  87. target_host="127.0.0.1",
  88. target_port=3002,
  89. server_cert_path=str(cert_path),
  90. server_key_path=str(key_path),
  91. on_connect=lambda cid: None,
  92. on_disconnect=lambda cid: None,
  93. bind_address="127.0.0.1",
  94. )
  95. ctx = proxy._create_client_ssl_context()
  96. _assert_required_ciphers(ctx, "tcp_proxy._create_client_ssl_context")
  97. class TestMqttServerTlsCiphers:
  98. """MQTT server builds its SSLContext inline in start(); intercept the
  99. ``asyncio.start_server`` call so the test doesn't actually bind a port."""
  100. @pytest.mark.asyncio
  101. async def test_start_configures_plain_rsa_aes_gcm(self, tmp_path):
  102. cert_path, key_path = _cert_pair(tmp_path)
  103. server = SimpleMQTTServer(
  104. serial="01P00A391800001",
  105. access_code="deadbeef",
  106. cert_path=cert_path,
  107. key_path=key_path,
  108. model="C12",
  109. bind_address="127.0.0.1",
  110. )
  111. captured: dict[str, ssl.SSLContext] = {}
  112. async def _capture(*_args, ssl=None, **_kwargs):
  113. captured["ctx"] = ssl
  114. class _FakeServer:
  115. sockets = []
  116. def close(self):
  117. pass
  118. async def wait_closed(self):
  119. pass
  120. def is_serving(self):
  121. return False
  122. return _FakeServer()
  123. with patch("asyncio.start_server", _capture):
  124. await server.start()
  125. try:
  126. assert "ctx" in captured, "asyncio.start_server was not invoked"
  127. _assert_required_ciphers(captured["ctx"], "mqtt_server.start")
  128. finally:
  129. await server.stop()
  130. class TestFtpServerTlsCiphers:
  131. """FTP server builds its SSLContext inline in start()."""
  132. @pytest.mark.asyncio
  133. async def test_start_configures_plain_rsa_aes_gcm(self, tmp_path):
  134. cert_path, key_path = _cert_pair(tmp_path)
  135. upload_dir = tmp_path / "uploads"
  136. upload_dir.mkdir()
  137. server = VirtualPrinterFTPServer(
  138. upload_dir=upload_dir,
  139. access_code="deadbeef",
  140. cert_path=cert_path,
  141. key_path=key_path,
  142. bind_address="127.0.0.1",
  143. )
  144. captured: dict[str, ssl.SSLContext] = {}
  145. async def _capture(*_args, ssl=None, **_kwargs):
  146. captured["ctx"] = ssl
  147. class _FakeServer:
  148. sockets = []
  149. def close(self):
  150. pass
  151. async def wait_closed(self):
  152. pass
  153. def is_serving(self):
  154. return False
  155. async def serve_forever(self):
  156. pass
  157. return _FakeServer()
  158. with patch("asyncio.start_server", _capture):
  159. await server.start()
  160. try:
  161. assert "ctx" in captured, "asyncio.start_server was not invoked"
  162. _assert_required_ciphers(captured["ctx"], "ftp_server.start")
  163. finally:
  164. await server.stop()