certificate.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. """TLS certificate generation for virtual printer services.
  2. Generates certificates that mimic real Bambu printer certificate format:
  3. - CA certificate mimics "BBL CA" from "BBL Technologies Co., Ltd"
  4. - Printer certificate has CN = serial number, signed by the CA
  5. The CA certificate is persistent and only regenerated if missing or expired.
  6. This allows users to add the CA to their slicer's trust store once.
  7. """
  8. import logging
  9. import socket
  10. from datetime import datetime, timedelta, timezone
  11. from ipaddress import IPv4Address
  12. from pathlib import Path
  13. from cryptography import x509
  14. from cryptography.hazmat.primitives import hashes, serialization
  15. from cryptography.hazmat.primitives.asymmetric import rsa
  16. from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
  17. logger = logging.getLogger(__name__)
  18. # Default serial number for virtual printer (matches SSDP/MQTT config)
  19. DEFAULT_SERIAL = "00M09A391800001"
  20. # Minimum days remaining before CA is considered expired and needs regeneration
  21. CA_EXPIRY_THRESHOLD_DAYS = 30
  22. def _get_local_ip() -> str:
  23. """Get the local IP address."""
  24. try:
  25. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  26. s.connect(("8.8.8.8", 80))
  27. ip = s.getsockname()[0]
  28. s.close()
  29. return ip
  30. except OSError:
  31. return "127.0.0.1"
  32. class CertificateService:
  33. """Generate and manage TLS certificates for virtual printer.
  34. Creates a certificate chain mimicking real Bambu printers:
  35. - Root CA with CN="BBL CA", O="BBL Technologies Co., Ltd", C="CN"
  36. - Printer cert with CN=serial_number, signed by the CA
  37. """
  38. def __init__(self, cert_dir: Path, serial: str = DEFAULT_SERIAL, shared_ca_dir: Path | None = None):
  39. """Initialize the certificate service.
  40. Args:
  41. cert_dir: Directory to store per-instance certificates
  42. serial: Serial number to use as CN in printer certificate
  43. shared_ca_dir: If set, CA cert/key are read from this directory
  44. instead of cert_dir (for multi-instance shared CA)
  45. """
  46. self.cert_dir = cert_dir
  47. self.serial = serial
  48. ca_dir = shared_ca_dir or cert_dir
  49. self.ca_cert_path = ca_dir / "bbl_ca.crt"
  50. self.ca_key_path = ca_dir / "bbl_ca.key"
  51. self.cert_path = cert_dir / "virtual_printer.crt"
  52. self.key_path = cert_dir / "virtual_printer.key"
  53. def ensure_certificates(self) -> tuple[Path, Path]:
  54. """Ensure certificates exist, generate if needed.
  55. Returns:
  56. Tuple of (cert_path, key_path)
  57. """
  58. if self.cert_path.exists() and self.key_path.exists():
  59. logger.debug("Using existing virtual printer certificates")
  60. return self.cert_path, self.key_path
  61. return self.generate_certificates()
  62. def _load_existing_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate] | None:
  63. """Try to load existing CA certificate and key.
  64. Returns:
  65. Tuple of (ca_private_key, ca_certificate) if valid CA exists, None otherwise
  66. """
  67. if not self.ca_cert_path.exists() or not self.ca_key_path.exists():
  68. logger.debug("CA certificate or key not found")
  69. return None
  70. try:
  71. # Load CA certificate
  72. ca_cert_pem = self.ca_cert_path.read_bytes()
  73. ca_cert = x509.load_pem_x509_certificate(ca_cert_pem)
  74. # Check if CA is expired or about to expire
  75. now = datetime.now(timezone.utc)
  76. days_remaining = (ca_cert.not_valid_after_utc - now).days
  77. if days_remaining < CA_EXPIRY_THRESHOLD_DAYS:
  78. logger.warning("CA certificate expires in %s days, will regenerate", days_remaining)
  79. return None
  80. # Load CA private key
  81. ca_key_pem = self.ca_key_path.read_bytes()
  82. ca_key = serialization.load_pem_private_key(ca_key_pem, password=None)
  83. logger.info("Using existing CA certificate (expires in %s days)", days_remaining)
  84. return ca_key, ca_cert
  85. except (OSError, ValueError) as e:
  86. logger.warning("Failed to load existing CA: %s", e)
  87. return None
  88. def _get_or_create_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate]:
  89. """Get existing CA or create a new one.
  90. Returns:
  91. Tuple of (ca_private_key, ca_certificate)
  92. """
  93. # Try to load existing CA first
  94. existing = self._load_existing_ca()
  95. if existing:
  96. return existing
  97. # Generate new CA
  98. ca_key, ca_cert = self._generate_ca_certificate()
  99. # Save CA certificate and key
  100. self.cert_dir.mkdir(parents=True, exist_ok=True)
  101. self.ca_key_path.write_bytes(
  102. ca_key.private_bytes(
  103. encoding=serialization.Encoding.PEM,
  104. format=serialization.PrivateFormat.TraditionalOpenSSL,
  105. encryption_algorithm=serialization.NoEncryption(),
  106. )
  107. )
  108. self.ca_key_path.chmod(0o600)
  109. self.ca_cert_path.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM))
  110. logger.info("Saved new CA certificate")
  111. return ca_key, ca_cert
  112. def _generate_ca_certificate(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate]:
  113. """Generate a new CA certificate for the virtual printer.
  114. We use a generic name instead of mimicking BBL CA, since the slicer
  115. may specifically reject certificates claiming to be from BBL but
  116. with a different public key.
  117. Returns:
  118. Tuple of (ca_private_key, ca_certificate)
  119. """
  120. logger.info("Generating new Virtual Printer CA certificate...")
  121. # Generate CA private key
  122. ca_key = rsa.generate_private_key(
  123. public_exponent=65537,
  124. key_size=2048,
  125. )
  126. # Use a generic CA name - NOT BBL to avoid being rejected as fake
  127. ca_name = x509.Name(
  128. [
  129. x509.NameAttribute(NameOID.COMMON_NAME, "Virtual Printer CA"),
  130. ]
  131. )
  132. now = datetime.now(timezone.utc)
  133. ca_cert = (
  134. x509.CertificateBuilder()
  135. .subject_name(ca_name)
  136. .issuer_name(ca_name)
  137. .public_key(ca_key.public_key())
  138. .serial_number(x509.random_serial_number())
  139. .not_valid_before(now)
  140. .not_valid_after(now + timedelta(days=7300)) # 20 years
  141. .add_extension(
  142. x509.BasicConstraints(ca=True, path_length=0),
  143. critical=True,
  144. )
  145. .add_extension(
  146. x509.KeyUsage(
  147. digital_signature=True,
  148. content_commitment=False,
  149. key_encipherment=False,
  150. data_encipherment=False,
  151. key_agreement=False,
  152. key_cert_sign=True,
  153. crl_sign=True,
  154. encipher_only=False,
  155. decipher_only=False,
  156. ),
  157. critical=True,
  158. )
  159. .sign(ca_key, hashes.SHA256())
  160. )
  161. return ca_key, ca_cert
  162. def _build_san_entries(self, local_ip: str, additional_ips: list[str] | None) -> list[x509.GeneralName]:
  163. """Build Subject Alternative Name entries for the printer certificate."""
  164. entries: list[x509.GeneralName] = [
  165. x509.DNSName("localhost"),
  166. x509.DNSName("bambuddy"),
  167. x509.DNSName(self.serial),
  168. x509.IPAddress(IPv4Address(local_ip)),
  169. x509.IPAddress(IPv4Address("127.0.0.1")),
  170. ]
  171. seen_ips = {local_ip, "127.0.0.1"}
  172. if additional_ips:
  173. for ip in additional_ips:
  174. if ip and ip not in seen_ips:
  175. try:
  176. entries.append(x509.IPAddress(IPv4Address(ip)))
  177. seen_ips.add(ip)
  178. logger.info("Added additional SAN IP: %s", ip)
  179. except ValueError:
  180. logger.warning("Skipping invalid additional SAN IP: %s", ip)
  181. return entries
  182. def generate_certificates(self, additional_ips: list[str] | None = None) -> tuple[Path, Path]:
  183. """Generate printer certificate (reusing existing CA if available).
  184. Creates a certificate chain mimicking real Bambu printers:
  185. - CA certificate (reused if exists and valid, otherwise generated)
  186. - Printer certificate (CN=serial, signed by CA)
  187. Args:
  188. additional_ips: Extra IP addresses to include in certificate SAN.
  189. Used in proxy mode to include the remote interface IP so the
  190. slicer's TLS handshake succeeds when connecting to the proxy.
  191. Returns:
  192. Tuple of (cert_path, key_path)
  193. """
  194. logger.info("Generating certificates for virtual printer (serial: %s)...", self.serial)
  195. # Ensure directory exists
  196. self.cert_dir.mkdir(parents=True, exist_ok=True)
  197. # Get or create CA (reuses existing if valid)
  198. ca_key, ca_cert = self._get_or_create_ca()
  199. # Generate printer private key
  200. printer_key = rsa.generate_private_key(
  201. public_exponent=65537,
  202. key_size=2048,
  203. )
  204. # Printer certificate subject - CN is the serial number (like real Bambu printers)
  205. printer_subject = x509.Name(
  206. [
  207. x509.NameAttribute(NameOID.COMMON_NAME, self.serial),
  208. ]
  209. )
  210. # Issuer is the CA
  211. issuer = ca_cert.subject
  212. now = datetime.now(timezone.utc)
  213. local_ip = _get_local_ip()
  214. logger.info("Generating printer certificate with CN=%s, local IP: %s", self.serial, local_ip)
  215. # Build printer certificate signed by CA
  216. printer_cert = (
  217. x509.CertificateBuilder()
  218. .subject_name(printer_subject)
  219. .issuer_name(issuer)
  220. .public_key(printer_key.public_key())
  221. .serial_number(x509.random_serial_number())
  222. .not_valid_before(now)
  223. .not_valid_after(now + timedelta(days=3650)) # 10 years
  224. .add_extension(
  225. x509.BasicConstraints(ca=False, path_length=None),
  226. critical=True,
  227. )
  228. .add_extension(
  229. x509.SubjectAlternativeName(self._build_san_entries(local_ip, additional_ips)),
  230. critical=False,
  231. )
  232. .add_extension(
  233. x509.ExtendedKeyUsage(
  234. [
  235. ExtendedKeyUsageOID.SERVER_AUTH,
  236. ExtendedKeyUsageOID.CLIENT_AUTH,
  237. ]
  238. ),
  239. critical=False,
  240. )
  241. .add_extension(
  242. x509.KeyUsage(
  243. digital_signature=True,
  244. content_commitment=False,
  245. key_encipherment=True,
  246. data_encipherment=False,
  247. key_agreement=False,
  248. key_cert_sign=False,
  249. crl_sign=False,
  250. encipher_only=False,
  251. decipher_only=False,
  252. ),
  253. critical=True,
  254. )
  255. .sign(ca_key, hashes.SHA256()) # Signed by CA, not self-signed
  256. )
  257. # Write printer private key
  258. self.key_path.write_bytes(
  259. printer_key.private_bytes(
  260. encoding=serialization.Encoding.PEM,
  261. format=serialization.PrivateFormat.TraditionalOpenSSL,
  262. encryption_algorithm=serialization.NoEncryption(),
  263. )
  264. )
  265. self.key_path.chmod(0o600)
  266. # Write printer certificate (include CA cert in chain for full chain)
  267. cert_chain = printer_cert.public_bytes(serialization.Encoding.PEM) + ca_cert.public_bytes(
  268. serialization.Encoding.PEM
  269. )
  270. self.cert_path.write_bytes(cert_chain)
  271. logger.info("Generated certificate chain at %s", self.cert_dir)
  272. logger.info(" CA: CN=Virtual Printer CA")
  273. logger.info(" Printer: CN=%s", self.serial)
  274. return self.cert_path, self.key_path
  275. def delete_printer_certificate(self) -> None:
  276. """Delete only the printer certificate (preserves CA)."""
  277. for path in [self.cert_path, self.key_path]:
  278. if path.exists():
  279. path.unlink()
  280. logger.info("Deleted printer certificate (CA preserved)")
  281. def delete_certificates(self, include_ca: bool = False) -> None:
  282. """Delete existing certificates.
  283. Args:
  284. include_ca: If True, also delete CA certificate and key.
  285. If False (default), only delete printer certificate.
  286. """
  287. # Always delete printer certificate
  288. for path in [self.cert_path, self.key_path]:
  289. if path.exists():
  290. path.unlink()
  291. # Only delete CA if explicitly requested
  292. if include_ca:
  293. for path in [self.ca_cert_path, self.ca_key_path]:
  294. if path.exists():
  295. path.unlink()
  296. logger.info("Deleted all certificates including CA")
  297. else:
  298. logger.info("Deleted printer certificate (CA preserved)")