certificate.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. """TLS certificate generation for virtual printer services.
  2. Generates certificates that mimic real Bambu printer certificate format:
  3. - CA certificate mimics "BBL CA" from "BBL Technologies Co., Ltd"
  4. - Printer certificate has CN = serial number, signed by the CA
  5. The CA certificate is persistent and only regenerated if missing or expired.
  6. This allows users to add the CA to their slicer's trust store once.
  7. """
  8. import logging
  9. import socket
  10. from datetime import datetime, timedelta, timezone
  11. from ipaddress import IPv4Address
  12. from pathlib import Path
  13. from cryptography import x509
  14. from cryptography.hazmat.primitives import hashes, serialization
  15. from cryptography.hazmat.primitives.asymmetric import rsa
  16. from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
  17. logger = logging.getLogger(__name__)
  18. # Default serial number for virtual printer (matches SSDP/MQTT config)
  19. DEFAULT_SERIAL = "00M09A391800001"
  20. # Minimum days remaining before CA is considered expired and needs regeneration
  21. CA_EXPIRY_THRESHOLD_DAYS = 30
  22. def _get_local_ip() -> str:
  23. """Get the local IP address."""
  24. try:
  25. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  26. s.connect(("8.8.8.8", 80))
  27. ip = s.getsockname()[0]
  28. s.close()
  29. return ip
  30. except OSError:
  31. return "127.0.0.1"
  32. class CertificateService:
  33. """Generate and manage TLS certificates for virtual printer.
  34. Creates a certificate chain mimicking real Bambu printers:
  35. - Root CA with CN="BBL CA", O="BBL Technologies Co., Ltd", C="CN"
  36. - Printer cert with CN=serial_number, signed by the CA
  37. """
  38. def __init__(self, cert_dir: Path, serial: str = DEFAULT_SERIAL, shared_ca_dir: Path | None = None):
  39. """Initialize the certificate service.
  40. Args:
  41. cert_dir: Directory to store per-instance certificates
  42. serial: Serial number to use as CN in printer certificate
  43. shared_ca_dir: If set, CA cert/key are read from this directory
  44. instead of cert_dir (for multi-instance shared CA)
  45. """
  46. self.cert_dir = cert_dir
  47. self.serial = serial
  48. ca_dir = shared_ca_dir or cert_dir
  49. self.ca_cert_path = ca_dir / "bbl_ca.crt"
  50. self.ca_key_path = ca_dir / "bbl_ca.key"
  51. self.cert_path = cert_dir / "virtual_printer.crt"
  52. self.key_path = cert_dir / "virtual_printer.key"
  53. def ensure_certificates(self) -> tuple[Path, Path]:
  54. """Ensure certificates exist, generate if needed.
  55. Returns:
  56. Tuple of (cert_path, key_path)
  57. """
  58. if self.cert_path.exists() and self.key_path.exists():
  59. if self._cert_matches_current_ca():
  60. logger.debug("Using existing virtual printer certificates")
  61. return self.cert_path, self.key_path
  62. logger.warning(
  63. "Existing per-VP certificate's issuer doesn't match the current CA "
  64. "(likely a CA rotation since the cert was signed). Regenerating "
  65. "to keep the slicer's imported CA in sync with the served chain."
  66. )
  67. return self.generate_certificates()
  68. def _cert_matches_current_ca(self) -> bool:
  69. """Check whether the on-disk per-VP cert was signed by the current CA.
  70. Slicers that import the shared CA validate the per-VP cert against it.
  71. If the CA has been rotated since the per-VP cert was signed, the chain
  72. is broken even though both files exist on disk. ``ensure_certificates``
  73. uses this to decide whether to regenerate.
  74. Uses real signature verification — Bambuddy's auto-generated CAs all
  75. share the same Subject DN ("Virtual Printer CA"), so a DN-only compare
  76. would incorrectly return True even after rotation.
  77. """
  78. try:
  79. if not self.ca_cert_path.exists():
  80. # No CA yet — let generate_certificates create one and the
  81. # matching per-VP chain.
  82. return False
  83. cert_pem = self.cert_path.read_bytes()
  84. cert = x509.load_pem_x509_certificate(cert_pem)
  85. ca_pem = self.ca_cert_path.read_bytes()
  86. ca_cert = x509.load_pem_x509_certificate(ca_pem)
  87. from cryptography.exceptions import InvalidSignature
  88. from cryptography.hazmat.primitives.asymmetric import padding
  89. try:
  90. ca_cert.public_key().verify(
  91. cert.signature,
  92. cert.tbs_certificate_bytes,
  93. padding.PKCS1v15(),
  94. cert.signature_hash_algorithm,
  95. )
  96. return True
  97. except InvalidSignature:
  98. return False
  99. except (OSError, ValueError) as e:
  100. logger.debug("CA-match probe failed for %s: %s", self.cert_path, e)
  101. return False
  102. except Exception as e:
  103. # Any unexpected exception during verification → treat as mismatch
  104. # and regenerate. Safer than reusing a cert we can't validate.
  105. logger.debug("CA-match verification failed for %s: %s", self.cert_path, e)
  106. return False
  107. def _load_existing_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate] | None:
  108. """Try to load existing CA certificate and key.
  109. Returns:
  110. Tuple of (ca_private_key, ca_certificate) if valid CA exists, None otherwise
  111. """
  112. if not self.ca_cert_path.exists() or not self.ca_key_path.exists():
  113. logger.debug("CA certificate or key not found")
  114. return None
  115. try:
  116. # Load CA certificate
  117. ca_cert_pem = self.ca_cert_path.read_bytes()
  118. ca_cert = x509.load_pem_x509_certificate(ca_cert_pem)
  119. # Check if CA is expired or about to expire
  120. now = datetime.now(timezone.utc)
  121. days_remaining = (ca_cert.not_valid_after_utc - now).days
  122. if days_remaining < CA_EXPIRY_THRESHOLD_DAYS:
  123. logger.warning("CA certificate expires in %s days, will regenerate", days_remaining)
  124. return None
  125. # Load CA private key
  126. ca_key_pem = self.ca_key_path.read_bytes()
  127. ca_key = serialization.load_pem_private_key(ca_key_pem, password=None)
  128. logger.info("Using existing CA certificate (expires in %s days)", days_remaining)
  129. return ca_key, ca_cert
  130. except (OSError, ValueError) as e:
  131. logger.warning("Failed to load existing CA: %s", e)
  132. return None
  133. def _get_or_create_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate]:
  134. """Get existing CA or create a new one.
  135. Returns:
  136. Tuple of (ca_private_key, ca_certificate)
  137. """
  138. # Try to load existing CA first
  139. existing = self._load_existing_ca()
  140. if existing:
  141. return existing
  142. # Generate new CA
  143. ca_key, ca_cert = self._generate_ca_certificate()
  144. # Save CA certificate and key. ``ca_key_path`` and ``ca_cert_path``
  145. # resolve under ``shared_ca_dir`` (which may differ from cert_dir),
  146. # so the parent we need to mkdir is the CA file's parent — not
  147. # cert_dir. Previously this created the per-VP subdirectory while
  148. # the writes targeted the parent CA dir, which works only because
  149. # the manager pre-creates both — the method itself was latent.
  150. self.ca_key_path.parent.mkdir(parents=True, exist_ok=True)
  151. self.ca_key_path.write_bytes(
  152. ca_key.private_bytes(
  153. encoding=serialization.Encoding.PEM,
  154. format=serialization.PrivateFormat.TraditionalOpenSSL,
  155. encryption_algorithm=serialization.NoEncryption(),
  156. )
  157. )
  158. try:
  159. self.ca_key_path.chmod(0o600)
  160. except OSError as e:
  161. logger.warning("Could not set CA key permissions on %s: %s", self.ca_key_path, e)
  162. self.ca_cert_path.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM))
  163. logger.info("Saved new CA certificate")
  164. return ca_key, ca_cert
  165. def _generate_ca_certificate(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate]:
  166. """Generate a new CA certificate for the virtual printer.
  167. We use a generic name instead of mimicking BBL CA, since the slicer
  168. may specifically reject certificates claiming to be from BBL but
  169. with a different public key.
  170. Returns:
  171. Tuple of (ca_private_key, ca_certificate)
  172. """
  173. logger.info("Generating new Virtual Printer CA certificate...")
  174. # Generate CA private key
  175. ca_key = rsa.generate_private_key(
  176. public_exponent=65537,
  177. key_size=2048,
  178. )
  179. # Use a generic CA name - NOT BBL to avoid being rejected as fake
  180. ca_name = x509.Name(
  181. [
  182. x509.NameAttribute(NameOID.COMMON_NAME, "Virtual Printer CA"),
  183. ]
  184. )
  185. now = datetime.now(timezone.utc)
  186. ca_cert = (
  187. x509.CertificateBuilder()
  188. .subject_name(ca_name)
  189. .issuer_name(ca_name)
  190. .public_key(ca_key.public_key())
  191. .serial_number(x509.random_serial_number())
  192. .not_valid_before(now)
  193. .not_valid_after(now + timedelta(days=7300)) # 20 years
  194. .add_extension(
  195. x509.BasicConstraints(ca=True, path_length=0),
  196. critical=True,
  197. )
  198. .add_extension(
  199. x509.KeyUsage(
  200. digital_signature=True,
  201. content_commitment=False,
  202. key_encipherment=False,
  203. data_encipherment=False,
  204. key_agreement=False,
  205. key_cert_sign=True,
  206. crl_sign=True,
  207. encipher_only=False,
  208. decipher_only=False,
  209. ),
  210. critical=True,
  211. )
  212. .sign(ca_key, hashes.SHA256())
  213. )
  214. return ca_key, ca_cert
  215. def _build_san_entries(self, local_ip: str, additional_ips: list[str] | None) -> list[x509.GeneralName]:
  216. """Build Subject Alternative Name entries for the printer certificate."""
  217. entries: list[x509.GeneralName] = [
  218. x509.DNSName("localhost"),
  219. x509.DNSName("bambuddy"),
  220. x509.DNSName(self.serial),
  221. x509.IPAddress(IPv4Address(local_ip)),
  222. x509.IPAddress(IPv4Address("127.0.0.1")),
  223. ]
  224. seen_ips = {local_ip, "127.0.0.1"}
  225. if additional_ips:
  226. for ip in additional_ips:
  227. if ip and ip not in seen_ips:
  228. try:
  229. entries.append(x509.IPAddress(IPv4Address(ip)))
  230. seen_ips.add(ip)
  231. logger.info("Added additional SAN IP: %s", ip)
  232. except ValueError:
  233. logger.warning("Skipping invalid additional SAN IP: %s", ip)
  234. return entries
  235. def generate_certificates(self, additional_ips: list[str] | None = None) -> tuple[Path, Path]:
  236. """Generate printer certificate (reusing existing CA if available).
  237. Creates a certificate chain mimicking real Bambu printers:
  238. - CA certificate (reused if exists and valid, otherwise generated)
  239. - Printer certificate (CN=serial, signed by CA)
  240. Args:
  241. additional_ips: Extra IP addresses to include in certificate SAN.
  242. Used in proxy mode to include the remote interface IP so the
  243. slicer's TLS handshake succeeds when connecting to the proxy.
  244. Returns:
  245. Tuple of (cert_path, key_path)
  246. """
  247. logger.info("Generating certificates for virtual printer (serial: %s)...", self.serial)
  248. # Ensure directory exists
  249. self.cert_dir.mkdir(parents=True, exist_ok=True)
  250. # Get or create CA (reuses existing if valid)
  251. ca_key, ca_cert = self._get_or_create_ca()
  252. # Generate printer private key
  253. printer_key = rsa.generate_private_key(
  254. public_exponent=65537,
  255. key_size=2048,
  256. )
  257. # Printer certificate subject - CN is the serial number (like real Bambu printers)
  258. printer_subject = x509.Name(
  259. [
  260. x509.NameAttribute(NameOID.COMMON_NAME, self.serial),
  261. ]
  262. )
  263. # Issuer is the CA
  264. issuer = ca_cert.subject
  265. now = datetime.now(timezone.utc)
  266. local_ip = _get_local_ip()
  267. logger.info("Generating printer certificate with CN=%s, local IP: %s", self.serial, local_ip)
  268. # Build printer certificate signed by CA
  269. printer_cert = (
  270. x509.CertificateBuilder()
  271. .subject_name(printer_subject)
  272. .issuer_name(issuer)
  273. .public_key(printer_key.public_key())
  274. .serial_number(x509.random_serial_number())
  275. .not_valid_before(now)
  276. .not_valid_after(now + timedelta(days=3650)) # 10 years
  277. .add_extension(
  278. x509.BasicConstraints(ca=False, path_length=None),
  279. critical=True,
  280. )
  281. .add_extension(
  282. x509.SubjectAlternativeName(self._build_san_entries(local_ip, additional_ips)),
  283. critical=False,
  284. )
  285. .add_extension(
  286. x509.ExtendedKeyUsage(
  287. [
  288. ExtendedKeyUsageOID.SERVER_AUTH,
  289. ExtendedKeyUsageOID.CLIENT_AUTH,
  290. ]
  291. ),
  292. critical=False,
  293. )
  294. .add_extension(
  295. x509.KeyUsage(
  296. digital_signature=True,
  297. content_commitment=False,
  298. key_encipherment=True,
  299. data_encipherment=False,
  300. key_agreement=False,
  301. key_cert_sign=False,
  302. crl_sign=False,
  303. encipher_only=False,
  304. decipher_only=False,
  305. ),
  306. critical=True,
  307. )
  308. .sign(ca_key, hashes.SHA256()) # Signed by CA, not self-signed
  309. )
  310. # Write printer private key
  311. self.key_path.write_bytes(
  312. printer_key.private_bytes(
  313. encoding=serialization.Encoding.PEM,
  314. format=serialization.PrivateFormat.TraditionalOpenSSL,
  315. encryption_algorithm=serialization.NoEncryption(),
  316. )
  317. )
  318. try:
  319. self.key_path.chmod(0o600)
  320. except OSError as e:
  321. logger.warning("Could not set printer key permissions on %s: %s", self.key_path, e)
  322. # Write printer certificate (include CA cert in chain for full chain)
  323. cert_chain = printer_cert.public_bytes(serialization.Encoding.PEM) + ca_cert.public_bytes(
  324. serialization.Encoding.PEM
  325. )
  326. self.cert_path.write_bytes(cert_chain)
  327. logger.info("Generated certificate chain at %s", self.cert_dir)
  328. logger.info(" CA: CN=Virtual Printer CA")
  329. logger.info(" Printer: CN=%s", self.serial)
  330. return self.cert_path, self.key_path
  331. def get_ca_certificate_info(self) -> dict:
  332. """Return the shared CA certificate as PEM text plus identifying metadata.
  333. Generates the CA if it does not exist yet. Safe to expose over the
  334. API: this is the *public* CA certificate users import into their
  335. slicer's trust store. The CA private key (``bbl_ca.key``) is never
  336. included and never leaves the backend.
  337. Returns:
  338. Dict with ``pem`` (PEM-encoded certificate), ``fingerprint_sha256``
  339. (colon-separated uppercase hex) and ``not_valid_after`` (ISO 8601).
  340. """
  341. _ca_key, ca_cert = self._get_or_create_ca()
  342. pem = ca_cert.public_bytes(serialization.Encoding.PEM).decode("ascii")
  343. digest = ca_cert.fingerprint(hashes.SHA256()).hex().upper()
  344. fingerprint = ":".join(digest[i : i + 2] for i in range(0, len(digest), 2))
  345. return {
  346. "pem": pem,
  347. "fingerprint_sha256": fingerprint,
  348. "not_valid_after": ca_cert.not_valid_after_utc.isoformat(),
  349. }
  350. def delete_printer_certificate(self) -> None:
  351. """Delete only the printer certificate (preserves CA)."""
  352. for path in [self.cert_path, self.key_path]:
  353. if path.exists():
  354. path.unlink()
  355. logger.info("Deleted printer certificate (CA preserved)")
  356. def delete_certificates(self, include_ca: bool = False) -> None:
  357. """Delete existing certificates.
  358. Args:
  359. include_ca: If True, also delete CA certificate and key.
  360. If False (default), only delete printer certificate.
  361. """
  362. # Always delete printer certificate
  363. for path in [self.cert_path, self.key_path]:
  364. if path.exists():
  365. path.unlink()
  366. # Only delete CA if explicitly requested
  367. if include_ca:
  368. for path in [self.ca_cert_path, self.ca_key_path]:
  369. if path.exists():
  370. path.unlink()
  371. logger.info("Deleted all certificates including CA")
  372. else:
  373. logger.info("Deleted printer certificate (CA preserved)")