certificate.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. """TLS certificate generation for virtual printer services.
  2. Generates certificates that mimic real Bambu printer certificate format:
  3. - CA certificate mimics "BBL CA" from "BBL Technologies Co., Ltd"
  4. - Printer certificate has CN = serial number, signed by the CA
  5. The CA certificate is persistent and only regenerated if missing or expired.
  6. This allows users to add the CA to their slicer's trust store once.
  7. """
  8. import logging
  9. import socket
  10. from datetime import datetime, timedelta, timezone
  11. from ipaddress import IPv4Address
  12. from pathlib import Path
  13. from cryptography import x509
  14. from cryptography.hazmat.primitives import hashes, serialization
  15. from cryptography.hazmat.primitives.asymmetric import rsa
  16. from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
  17. logger = logging.getLogger(__name__)
  18. # Default serial number for virtual printer (matches SSDP/MQTT config)
  19. DEFAULT_SERIAL = "00M09A391800001"
  20. # Minimum days remaining before CA is considered expired and needs regeneration
  21. CA_EXPIRY_THRESHOLD_DAYS = 30
  22. def _get_local_ip() -> str:
  23. """Get the local IP address."""
  24. try:
  25. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  26. s.connect(("8.8.8.8", 80))
  27. ip = s.getsockname()[0]
  28. s.close()
  29. return ip
  30. except Exception:
  31. return "127.0.0.1"
  32. class CertificateService:
  33. """Generate and manage TLS certificates for virtual printer.
  34. Creates a certificate chain mimicking real Bambu printers:
  35. - Root CA with CN="BBL CA", O="BBL Technologies Co., Ltd", C="CN"
  36. - Printer cert with CN=serial_number, signed by the CA
  37. """
  38. def __init__(self, cert_dir: Path, serial: str = DEFAULT_SERIAL):
  39. """Initialize the certificate service.
  40. Args:
  41. cert_dir: Directory to store certificates
  42. serial: Serial number to use as CN in printer certificate
  43. """
  44. self.cert_dir = cert_dir
  45. self.serial = serial
  46. self.ca_cert_path = cert_dir / "bbl_ca.crt"
  47. self.ca_key_path = cert_dir / "bbl_ca.key"
  48. self.cert_path = cert_dir / "virtual_printer.crt"
  49. self.key_path = cert_dir / "virtual_printer.key"
  50. def ensure_certificates(self) -> tuple[Path, Path]:
  51. """Ensure certificates exist, generate if needed.
  52. Returns:
  53. Tuple of (cert_path, key_path)
  54. """
  55. if self.cert_path.exists() and self.key_path.exists():
  56. logger.debug("Using existing virtual printer certificates")
  57. return self.cert_path, self.key_path
  58. return self.generate_certificates()
  59. def _load_existing_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate] | None:
  60. """Try to load existing CA certificate and key.
  61. Returns:
  62. Tuple of (ca_private_key, ca_certificate) if valid CA exists, None otherwise
  63. """
  64. if not self.ca_cert_path.exists() or not self.ca_key_path.exists():
  65. logger.debug("CA certificate or key not found")
  66. return None
  67. try:
  68. # Load CA certificate
  69. ca_cert_pem = self.ca_cert_path.read_bytes()
  70. ca_cert = x509.load_pem_x509_certificate(ca_cert_pem)
  71. # Check if CA is expired or about to expire
  72. now = datetime.now(timezone.utc)
  73. days_remaining = (ca_cert.not_valid_after_utc - now).days
  74. if days_remaining < CA_EXPIRY_THRESHOLD_DAYS:
  75. logger.warning(f"CA certificate expires in {days_remaining} days, will regenerate")
  76. return None
  77. # Load CA private key
  78. ca_key_pem = self.ca_key_path.read_bytes()
  79. ca_key = serialization.load_pem_private_key(ca_key_pem, password=None)
  80. logger.info(f"Using existing CA certificate (expires in {days_remaining} days)")
  81. return ca_key, ca_cert
  82. except Exception as e:
  83. logger.warning(f"Failed to load existing CA: {e}")
  84. return None
  85. def _get_or_create_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate]:
  86. """Get existing CA or create a new one.
  87. Returns:
  88. Tuple of (ca_private_key, ca_certificate)
  89. """
  90. # Try to load existing CA first
  91. existing = self._load_existing_ca()
  92. if existing:
  93. return existing
  94. # Generate new CA
  95. ca_key, ca_cert = self._generate_ca_certificate()
  96. # Save CA certificate and key
  97. self.cert_dir.mkdir(parents=True, exist_ok=True)
  98. self.ca_key_path.write_bytes(
  99. ca_key.private_bytes(
  100. encoding=serialization.Encoding.PEM,
  101. format=serialization.PrivateFormat.TraditionalOpenSSL,
  102. encryption_algorithm=serialization.NoEncryption(),
  103. )
  104. )
  105. self.ca_key_path.chmod(0o600)
  106. self.ca_cert_path.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM))
  107. logger.info("Saved new CA certificate")
  108. return ca_key, ca_cert
  109. def _generate_ca_certificate(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate]:
  110. """Generate a new CA certificate for the virtual printer.
  111. We use a generic name instead of mimicking BBL CA, since the slicer
  112. may specifically reject certificates claiming to be from BBL but
  113. with a different public key.
  114. Returns:
  115. Tuple of (ca_private_key, ca_certificate)
  116. """
  117. logger.info("Generating new Virtual Printer CA certificate...")
  118. # Generate CA private key
  119. ca_key = rsa.generate_private_key(
  120. public_exponent=65537,
  121. key_size=2048,
  122. )
  123. # Use a generic CA name - NOT BBL to avoid being rejected as fake
  124. ca_name = x509.Name(
  125. [
  126. x509.NameAttribute(NameOID.COMMON_NAME, "Virtual Printer CA"),
  127. ]
  128. )
  129. now = datetime.now(timezone.utc)
  130. ca_cert = (
  131. x509.CertificateBuilder()
  132. .subject_name(ca_name)
  133. .issuer_name(ca_name)
  134. .public_key(ca_key.public_key())
  135. .serial_number(x509.random_serial_number())
  136. .not_valid_before(now)
  137. .not_valid_after(now + timedelta(days=7300)) # 20 years
  138. .add_extension(
  139. x509.BasicConstraints(ca=True, path_length=0),
  140. critical=True,
  141. )
  142. .add_extension(
  143. x509.KeyUsage(
  144. digital_signature=True,
  145. content_commitment=False,
  146. key_encipherment=False,
  147. data_encipherment=False,
  148. key_agreement=False,
  149. key_cert_sign=True,
  150. crl_sign=True,
  151. encipher_only=False,
  152. decipher_only=False,
  153. ),
  154. critical=True,
  155. )
  156. .sign(ca_key, hashes.SHA256())
  157. )
  158. return ca_key, ca_cert
  159. def generate_certificates(self) -> tuple[Path, Path]:
  160. """Generate printer certificate (reusing existing CA if available).
  161. Creates a certificate chain mimicking real Bambu printers:
  162. - CA certificate (reused if exists and valid, otherwise generated)
  163. - Printer certificate (CN=serial, signed by CA)
  164. Returns:
  165. Tuple of (cert_path, key_path)
  166. """
  167. logger.info(f"Generating certificates for virtual printer (serial: {self.serial})...")
  168. # Ensure directory exists
  169. self.cert_dir.mkdir(parents=True, exist_ok=True)
  170. # Get or create CA (reuses existing if valid)
  171. ca_key, ca_cert = self._get_or_create_ca()
  172. # Generate printer private key
  173. printer_key = rsa.generate_private_key(
  174. public_exponent=65537,
  175. key_size=2048,
  176. )
  177. # Printer certificate subject - CN is the serial number (like real Bambu printers)
  178. printer_subject = x509.Name(
  179. [
  180. x509.NameAttribute(NameOID.COMMON_NAME, self.serial),
  181. ]
  182. )
  183. # Issuer is the CA
  184. issuer = ca_cert.subject
  185. now = datetime.now(timezone.utc)
  186. local_ip = _get_local_ip()
  187. logger.info(f"Generating printer certificate with CN={self.serial}, local IP: {local_ip}")
  188. # Build printer certificate signed by CA
  189. printer_cert = (
  190. x509.CertificateBuilder()
  191. .subject_name(printer_subject)
  192. .issuer_name(issuer)
  193. .public_key(printer_key.public_key())
  194. .serial_number(x509.random_serial_number())
  195. .not_valid_before(now)
  196. .not_valid_after(now + timedelta(days=3650)) # 10 years
  197. .add_extension(
  198. x509.BasicConstraints(ca=False, path_length=None),
  199. critical=True,
  200. )
  201. .add_extension(
  202. x509.SubjectAlternativeName(
  203. [
  204. x509.DNSName("localhost"),
  205. x509.DNSName("bambuddy"),
  206. x509.DNSName(self.serial),
  207. x509.IPAddress(IPv4Address(local_ip)),
  208. x509.IPAddress(IPv4Address("127.0.0.1")),
  209. ]
  210. ),
  211. critical=False,
  212. )
  213. .add_extension(
  214. x509.ExtendedKeyUsage(
  215. [
  216. ExtendedKeyUsageOID.SERVER_AUTH,
  217. ExtendedKeyUsageOID.CLIENT_AUTH,
  218. ]
  219. ),
  220. critical=False,
  221. )
  222. .add_extension(
  223. x509.KeyUsage(
  224. digital_signature=True,
  225. content_commitment=False,
  226. key_encipherment=True,
  227. data_encipherment=False,
  228. key_agreement=False,
  229. key_cert_sign=False,
  230. crl_sign=False,
  231. encipher_only=False,
  232. decipher_only=False,
  233. ),
  234. critical=True,
  235. )
  236. .sign(ca_key, hashes.SHA256()) # Signed by CA, not self-signed
  237. )
  238. # Write printer private key
  239. self.key_path.write_bytes(
  240. printer_key.private_bytes(
  241. encoding=serialization.Encoding.PEM,
  242. format=serialization.PrivateFormat.TraditionalOpenSSL,
  243. encryption_algorithm=serialization.NoEncryption(),
  244. )
  245. )
  246. self.key_path.chmod(0o600)
  247. # Write printer certificate (include CA cert in chain for full chain)
  248. cert_chain = printer_cert.public_bytes(serialization.Encoding.PEM) + ca_cert.public_bytes(
  249. serialization.Encoding.PEM
  250. )
  251. self.cert_path.write_bytes(cert_chain)
  252. logger.info(f"Generated certificate chain at {self.cert_dir}")
  253. logger.info(" CA: CN=Virtual Printer CA")
  254. logger.info(f" Printer: CN={self.serial}")
  255. return self.cert_path, self.key_path
  256. def delete_printer_certificate(self) -> None:
  257. """Delete only the printer certificate (preserves CA)."""
  258. for path in [self.cert_path, self.key_path]:
  259. if path.exists():
  260. path.unlink()
  261. logger.info("Deleted printer certificate (CA preserved)")
  262. def delete_certificates(self, include_ca: bool = False) -> None:
  263. """Delete existing certificates.
  264. Args:
  265. include_ca: If True, also delete CA certificate and key.
  266. If False (default), only delete printer certificate.
  267. """
  268. # Always delete printer certificate
  269. for path in [self.cert_path, self.key_path]:
  270. if path.exists():
  271. path.unlink()
  272. # Only delete CA if explicitly requested
  273. if include_ca:
  274. for path in [self.ca_cert_path, self.ca_key_path]:
  275. if path.exists():
  276. path.unlink()
  277. logger.info("Deleted all certificates including CA")
  278. else:
  279. logger.info("Deleted printer certificate (CA preserved)")