certificate.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. """TLS certificate generation for virtual printer services.
  2. Generates certificates that mimic real Bambu printer certificate format:
  3. - CA certificate mimics "BBL CA" from "BBL Technologies Co., Ltd"
  4. - Printer certificate has CN = serial number, signed by the CA
  5. """
  6. import logging
  7. import socket
  8. from datetime import UTC, datetime, timedelta
  9. from ipaddress import IPv4Address
  10. from pathlib import Path
  11. from cryptography import x509
  12. from cryptography.hazmat.primitives import hashes, serialization
  13. from cryptography.hazmat.primitives.asymmetric import rsa
  14. from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
  15. logger = logging.getLogger(__name__)
  16. # Default serial number for virtual printer (matches SSDP/MQTT config)
  17. DEFAULT_SERIAL = "00M09A391800001"
  18. def _get_local_ip() -> str:
  19. """Get the local IP address."""
  20. try:
  21. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  22. s.connect(("8.8.8.8", 80))
  23. ip = s.getsockname()[0]
  24. s.close()
  25. return ip
  26. except Exception:
  27. return "127.0.0.1"
  28. class CertificateService:
  29. """Generate and manage TLS certificates for virtual printer.
  30. Creates a certificate chain mimicking real Bambu printers:
  31. - Root CA with CN="BBL CA", O="BBL Technologies Co., Ltd", C="CN"
  32. - Printer cert with CN=serial_number, signed by the CA
  33. """
  34. def __init__(self, cert_dir: Path, serial: str = DEFAULT_SERIAL):
  35. """Initialize the certificate service.
  36. Args:
  37. cert_dir: Directory to store certificates
  38. serial: Serial number to use as CN in printer certificate
  39. """
  40. self.cert_dir = cert_dir
  41. self.serial = serial
  42. self.ca_cert_path = cert_dir / "bbl_ca.crt"
  43. self.ca_key_path = cert_dir / "bbl_ca.key"
  44. self.cert_path = cert_dir / "virtual_printer.crt"
  45. self.key_path = cert_dir / "virtual_printer.key"
  46. def ensure_certificates(self) -> tuple[Path, Path]:
  47. """Ensure certificates exist, generate if needed.
  48. Returns:
  49. Tuple of (cert_path, key_path)
  50. """
  51. if self.cert_path.exists() and self.key_path.exists():
  52. logger.debug("Using existing virtual printer certificates")
  53. return self.cert_path, self.key_path
  54. return self.generate_certificates()
  55. def _generate_ca_certificate(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate]:
  56. """Generate a CA certificate for the virtual printer.
  57. We use a generic name instead of mimicking BBL CA, since the slicer
  58. may specifically reject certificates claiming to be from BBL but
  59. with a different public key.
  60. Returns:
  61. Tuple of (ca_private_key, ca_certificate)
  62. """
  63. logger.info("Generating Virtual Printer CA certificate...")
  64. # Generate CA private key
  65. ca_key = rsa.generate_private_key(
  66. public_exponent=65537,
  67. key_size=2048,
  68. )
  69. # Use a generic CA name - NOT BBL to avoid being rejected as fake
  70. ca_name = x509.Name(
  71. [
  72. x509.NameAttribute(NameOID.COMMON_NAME, "Virtual Printer CA"),
  73. ]
  74. )
  75. now = datetime.now(UTC)
  76. ca_cert = (
  77. x509.CertificateBuilder()
  78. .subject_name(ca_name)
  79. .issuer_name(ca_name)
  80. .public_key(ca_key.public_key())
  81. .serial_number(x509.random_serial_number())
  82. .not_valid_before(now)
  83. .not_valid_after(now + timedelta(days=7300)) # 20 years
  84. .add_extension(
  85. x509.BasicConstraints(ca=True, path_length=0),
  86. critical=True,
  87. )
  88. .add_extension(
  89. x509.KeyUsage(
  90. digital_signature=True,
  91. content_commitment=False,
  92. key_encipherment=False,
  93. data_encipherment=False,
  94. key_agreement=False,
  95. key_cert_sign=True,
  96. crl_sign=True,
  97. encipher_only=False,
  98. decipher_only=False,
  99. ),
  100. critical=True,
  101. )
  102. .sign(ca_key, hashes.SHA256())
  103. )
  104. return ca_key, ca_cert
  105. def generate_certificates(self) -> tuple[Path, Path]:
  106. """Generate CA and printer certificates.
  107. Creates a certificate chain mimicking real Bambu printers:
  108. - BBL CA (self-signed root)
  109. - Printer certificate (CN=serial, signed by BBL CA)
  110. Returns:
  111. Tuple of (cert_path, key_path)
  112. """
  113. logger.info(f"Generating certificates for virtual printer (serial: {self.serial})...")
  114. # Ensure directory exists
  115. self.cert_dir.mkdir(parents=True, exist_ok=True)
  116. # Generate or load CA
  117. ca_key, ca_cert = self._generate_ca_certificate()
  118. # Save CA certificate and key
  119. self.ca_key_path.write_bytes(
  120. ca_key.private_bytes(
  121. encoding=serialization.Encoding.PEM,
  122. format=serialization.PrivateFormat.TraditionalOpenSSL,
  123. encryption_algorithm=serialization.NoEncryption(),
  124. )
  125. )
  126. self.ca_key_path.chmod(0o600)
  127. self.ca_cert_path.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM))
  128. # Generate printer private key
  129. printer_key = rsa.generate_private_key(
  130. public_exponent=65537,
  131. key_size=2048,
  132. )
  133. # Printer certificate subject - CN is the serial number (like real Bambu printers)
  134. printer_subject = x509.Name(
  135. [
  136. x509.NameAttribute(NameOID.COMMON_NAME, self.serial),
  137. ]
  138. )
  139. # Issuer is the CA
  140. issuer = ca_cert.subject
  141. now = datetime.now(UTC)
  142. local_ip = _get_local_ip()
  143. logger.info(f"Generating printer certificate with CN={self.serial}, local IP: {local_ip}")
  144. # Build printer certificate signed by CA
  145. printer_cert = (
  146. x509.CertificateBuilder()
  147. .subject_name(printer_subject)
  148. .issuer_name(issuer)
  149. .public_key(printer_key.public_key())
  150. .serial_number(x509.random_serial_number())
  151. .not_valid_before(now)
  152. .not_valid_after(now + timedelta(days=3650)) # 10 years
  153. .add_extension(
  154. x509.BasicConstraints(ca=False, path_length=None),
  155. critical=True,
  156. )
  157. .add_extension(
  158. x509.SubjectAlternativeName(
  159. [
  160. x509.DNSName("localhost"),
  161. x509.DNSName("bambuddy"),
  162. x509.DNSName(self.serial),
  163. x509.IPAddress(IPv4Address(local_ip)),
  164. x509.IPAddress(IPv4Address("127.0.0.1")),
  165. ]
  166. ),
  167. critical=False,
  168. )
  169. .add_extension(
  170. x509.ExtendedKeyUsage(
  171. [
  172. ExtendedKeyUsageOID.SERVER_AUTH,
  173. ExtendedKeyUsageOID.CLIENT_AUTH,
  174. ]
  175. ),
  176. critical=False,
  177. )
  178. .add_extension(
  179. x509.KeyUsage(
  180. digital_signature=True,
  181. content_commitment=False,
  182. key_encipherment=True,
  183. data_encipherment=False,
  184. key_agreement=False,
  185. key_cert_sign=False,
  186. crl_sign=False,
  187. encipher_only=False,
  188. decipher_only=False,
  189. ),
  190. critical=True,
  191. )
  192. .sign(ca_key, hashes.SHA256()) # Signed by CA, not self-signed
  193. )
  194. # Write printer private key
  195. self.key_path.write_bytes(
  196. printer_key.private_bytes(
  197. encoding=serialization.Encoding.PEM,
  198. format=serialization.PrivateFormat.TraditionalOpenSSL,
  199. encryption_algorithm=serialization.NoEncryption(),
  200. )
  201. )
  202. self.key_path.chmod(0o600)
  203. # Write printer certificate (include CA cert in chain for full chain)
  204. cert_chain = printer_cert.public_bytes(serialization.Encoding.PEM) + ca_cert.public_bytes(
  205. serialization.Encoding.PEM
  206. )
  207. self.cert_path.write_bytes(cert_chain)
  208. logger.info(f"Generated certificate chain at {self.cert_dir}")
  209. logger.info(" CA: CN=Virtual Printer CA")
  210. logger.info(f" Printer: CN={self.serial}")
  211. return self.cert_path, self.key_path
  212. def delete_certificates(self) -> None:
  213. """Delete existing certificates."""
  214. for path in [self.cert_path, self.key_path, self.ca_cert_path, self.ca_key_path]:
  215. if path.exists():
  216. path.unlink()
  217. logger.info("Deleted virtual printer certificates")