| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- """Tailscale integration for virtual printer certificate provisioning.
- When Tailscale is present, provisions a Let's Encrypt certificate via
- `tailscale cert` for the machine's Tailscale FQDN. This cert is trusted
- by slicers without any manual CA installation, unlike the self-signed CA.
- Falls back gracefully when Tailscale is unavailable.
- """
- import asyncio
- import json
- import logging
- import os
- import re
- import shutil
- from dataclasses import dataclass, field
- from datetime import datetime, timezone
- from pathlib import Path
- from cryptography import x509
- logger = logging.getLogger(__name__)
- # Renew when fewer than this many days remain on the LE cert (LE issues 90-day certs;
- # Let's Encrypt recommends renewing at 30 days remaining)
- TS_CERT_EXPIRY_THRESHOLD_DAYS = 30
- # Defensive FQDN validation before passing to subprocess
- _FQDN_RE = re.compile(
- r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)+$",
- re.IGNORECASE,
- )
- # Detect tailnets where HTTPS cert generation is disabled — common for company/school
- # tailnets where the user is not a Tailscale admin.
- _HTTPS_DISABLED_RE = re.compile(
- r"(https? cert.*disabled|not enabled.*tailnet|cert.*not.*enabled)",
- re.IGNORECASE,
- )
- # Minimal environment for tailscale subprocess — passes OS/shell variables that
- # tailscale needs to locate its socket and config, but strips application secrets
- # (JWT keys, DB URLs, SMTP passwords, etc.) that the subprocess has no need for.
- _SUBPROCESS_ENV: dict[str, str] = {
- k: v
- for k, v in os.environ.items()
- if k
- in {
- "PATH",
- "HOME",
- "USER",
- "USERNAME",
- "LOGNAME",
- # Windows equivalents
- "USERPROFILE",
- "APPDATA",
- "LOCALAPPDATA",
- "PROGRAMFILES",
- "PROGRAMFILES(X86)",
- "SYSTEMROOT",
- "WINDIR",
- "COMPUTERNAME",
- "TEMP",
- "TMP",
- # Linux XDG dirs used by tailscale for socket/config
- "XDG_RUNTIME_DIR",
- "XDG_CONFIG_HOME",
- }
- }
- @dataclass
- class TailscaleStatus:
- """Runtime Tailscale availability and identity."""
- available: bool
- hostname: str # "myhost"
- tailnet_name: str # "tailnetname.ts.net"
- fqdn: str # "myhost.tailnetname.ts.net"
- tailscale_ips: list[str] = field(default_factory=list)
- error: str | None = None
- class TailscaleService:
- """Wraps Tailscale CLI commands for certificate provisioning.
- All methods are safe to call when Tailscale is absent — they return
- sensible defaults and never raise exceptions.
- """
- _docker_hint_logged: bool = False
- @classmethod
- def _log_docker_socket_hint(cls) -> None:
- """Log a one-time hint when running in Docker without the Tailscale socket mounted.
- Fires in both states: (a) tailscale binary missing and (b) binary present
- but the host socket isn't mounted into the container. The binary alone
- can't talk to the daemon — the host's tailscaled socket needs to be
- volume-mounted in docker-compose.yml.
- """
- if cls._docker_hint_logged:
- return
- if Path("/.dockerenv").exists() and not Path("/var/run/tailscale/tailscaled.sock").exists():
- logger.info(
- "Running in Docker but /var/run/tailscale/tailscaled.sock is not mounted. "
- "Add `- /var/run/tailscale/tailscaled.sock:/var/run/tailscale/tailscaled.sock` "
- "to docker-compose.yml (under volumes:) and run Tailscale on the host to enable "
- "Let's Encrypt certs for virtual printers."
- )
- cls._docker_hint_logged = True
- async def _run_tailscale(self, *args: str, timeout: float = 30.0) -> tuple[int | None, bytes, bytes]:
- """Run a tailscale subcommand and return (returncode, stdout, stderr).
- Resolves the binary to an absolute path to guard against PATH hijacking.
- Raises OSError if the binary cannot be found or launched.
- Raises asyncio.TimeoutError if the subprocess exceeds the timeout.
- """
- binary = shutil.which("tailscale")
- if not binary:
- raise OSError("tailscale binary not found")
- process = await asyncio.create_subprocess_exec(
- binary,
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- env=_SUBPROCESS_ENV,
- )
- try:
- stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
- except asyncio.TimeoutError:
- process.kill()
- await process.wait()
- raise
- return process.returncode, stdout, stderr
- async def get_status(self) -> TailscaleStatus:
- """Query Tailscale status and return machine identity.
- Runs: tailscale status --json
- Returns TailscaleStatus(available=False) if the binary is missing,
- the daemon is not running, or any other error occurs.
- """
- if not shutil.which("tailscale"):
- self._log_docker_socket_hint()
- return TailscaleStatus(
- available=False,
- hostname="",
- tailnet_name="",
- fqdn="",
- error="tailscale binary not found",
- )
- try:
- returncode, stdout, stderr = await self._run_tailscale("status", "--json", timeout=5.0)
- except OSError as e:
- return TailscaleStatus(
- available=False,
- hostname="",
- tailnet_name="",
- fqdn="",
- error=str(e),
- )
- if returncode is None or returncode != 0:
- # If the binary is present but the daemon socket is unreachable (e.g.
- # Docker without the socket mount), log the actionable hint rather than
- # just the opaque CLI stderr.
- self._log_docker_socket_hint()
- return TailscaleStatus(
- available=False,
- hostname="",
- tailnet_name="",
- fqdn="",
- error=stderr.decode(errors="replace").strip(),
- )
- try:
- data = json.loads(stdout)
- except json.JSONDecodeError as e:
- return TailscaleStatus(
- available=False,
- hostname="",
- tailnet_name="",
- fqdn="",
- error=f"JSON parse error: {e}",
- )
- self_info = data.get("Self", {})
- # DNSName includes trailing dot: "myhost.tailnetname.ts.net."
- fqdn = self_info.get("DNSName", "").rstrip(".")
- if not fqdn:
- return TailscaleStatus(
- available=False,
- hostname="",
- tailnet_name="",
- fqdn="",
- error="Tailscale not connected (no DNSName)",
- )
- # Split "myhost.tailnetname.ts.net" into hostname + tailnet_name
- parts = fqdn.split(".", 1)
- hostname = parts[0]
- tailnet_name = parts[1] if len(parts) > 1 else ""
- tailscale_ips = self_info.get("TailscaleIPs", [])
- logger.debug("Tailscale available: fqdn=%s, ips=%s", fqdn, tailscale_ips)
- return TailscaleStatus(
- available=True,
- hostname=hostname,
- tailnet_name=tailnet_name,
- fqdn=fqdn,
- tailscale_ips=tailscale_ips,
- )
- async def provision_cert(self, fqdn: str, cert_path: Path, key_path: Path) -> bool:
- """Request a Let's Encrypt certificate for the given Tailscale FQDN.
- Runs: tailscale cert --cert-file <cert_path> --key-file <key_path> <fqdn>
- Returns True on success, False on any error.
- """
- if not _FQDN_RE.match(fqdn):
- logger.warning("provision_cert: invalid FQDN %r, skipping", fqdn)
- return False
- # Ensure the target directory exists before tailscale cert writes to it
- cert_path.parent.mkdir(parents=True, exist_ok=True)
- logger.info("Provisioning Tailscale cert for %s -> %s", fqdn, cert_path)
- try:
- returncode, _, stderr = await self._run_tailscale(
- "cert",
- "--cert-file",
- str(cert_path),
- "--key-file",
- str(key_path),
- fqdn,
- timeout=60.0,
- )
- except OSError as e:
- logger.warning("tailscale cert failed (OS error): %s", e)
- return False
- if returncode is None or returncode != 0:
- err_text = stderr.decode(errors="replace").strip()
- if _HTTPS_DISABLED_RE.search(err_text):
- logger.warning(
- "Tailscale HTTPS certs are not enabled for this tailnet. "
- "Visit https://login.tailscale.com/admin/dns and enable HTTPS. "
- "Falling back to self-signed cert."
- )
- else:
- logger.warning("tailscale cert failed (exit %s): %s", returncode, err_text)
- return False
- # Restrict private key permissions
- try:
- key_path.chmod(0o600)
- except OSError as e:
- logger.warning("Could not set key permissions on %s: %s", key_path, e)
- # Verify the files are readable by the current process — on bare-metal, the
- # tailscale daemon or a prior sudo invocation may have left them root-owned.
- if not os.access(cert_path, os.R_OK) or not os.access(key_path, os.R_OK):
- logger.error(
- "Tailscale cert files at %s are not readable by this process. "
- "Fix with: sudo chown $(whoami):$(whoami) %s %s",
- cert_path.parent,
- cert_path,
- key_path,
- )
- return False
- logger.info("Tailscale cert provisioned: %s", cert_path)
- return True
- def cert_needs_renewal(self, cert_path: Path, fqdn: str | None = None) -> bool:
- """Check whether the certificate at cert_path needs to be renewed.
- Returns True if the file is absent, unreadable, expires within
- TS_CERT_EXPIRY_THRESHOLD_DAYS days, or if fqdn is given and does not
- appear in the certificate's Subject Alternative Names.
- """
- if not cert_path.exists():
- return True
- try:
- cert_pem = cert_path.read_bytes()
- # The file may contain a full chain; load only the first PEM block
- cert = x509.load_pem_x509_certificate(cert_pem)
- now = datetime.now(timezone.utc)
- days_remaining = (cert.not_valid_after_utc - now).days
- if days_remaining < TS_CERT_EXPIRY_THRESHOLD_DAYS:
- logger.info("Tailscale cert expires in %d days, renewal needed", days_remaining)
- return True
- # Validate that the cert covers the requested FQDN (guards against stale
- # cert after machine rename or tailnet migration). Case-insensitive per RFC 4343.
- if fqdn:
- try:
- san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
- dns_names = san.value.get_values_for_type(x509.DNSName)
- if fqdn.lower() not in {n.lower() for n in dns_names}:
- logger.info(
- "Tailscale cert SAN mismatch (cert has %s, need %s), renewal needed",
- dns_names,
- fqdn,
- )
- return True
- except x509.ExtensionNotFound:
- logger.info("Tailscale cert has no SAN extension, renewal needed")
- return True
- logger.debug("Tailscale cert valid for %d more days", days_remaining)
- return False
- except (OSError, ValueError) as e:
- logger.warning("Could not read Tailscale cert %s: %s", cert_path, e)
- return True
- async def ensure_cert(self, fqdn: str, cert_path: Path, key_path: Path) -> bool:
- """Ensure a fresh certificate exists at cert_path.
- Skips provisioning if the cert is present, not near expiry, and covers fqdn.
- Returns True if a valid cert is now available.
- """
- if not self.cert_needs_renewal(cert_path, fqdn=fqdn):
- logger.debug("Tailscale cert is fresh, skipping provision")
- return True
- return await self.provision_cert(fqdn, cert_path, key_path)
- # Module-level singleton — import this in other modules
- tailscale_service = TailscaleService()
|