From fd5ee311c3b90c064bb00c8c9f84eba1c335a6d3 Mon Sep 17 00:00:00 2001 From: Irbe Krumina Date: Tue, 25 Feb 2025 11:32:49 -0800 Subject: [PATCH] initial Signed-off-by: Irbe Krumina --- .../deploy/examples/proxygroup.yaml | 6 +- cmd/k8s-operator/proxygroup_specs.go | 49 +++- ipn/ipnlocal/cert.go | 35 ++- ipn/store/kubestore/store_kube.go | 217 ++++++++++++++++-- 4 files changed, 279 insertions(+), 28 deletions(-) diff --git a/cmd/k8s-operator/deploy/examples/proxygroup.yaml b/cmd/k8s-operator/deploy/examples/proxygroup.yaml index 337d87f0b..ace7d1a99 100644 --- a/cmd/k8s-operator/deploy/examples/proxygroup.yaml +++ b/cmd/k8s-operator/deploy/examples/proxygroup.yaml @@ -1,7 +1,7 @@ apiVersion: tailscale.com/v1alpha1 kind: ProxyGroup metadata: - name: egress-proxies + name: ingress-proxies spec: - type: egress - replicas: 3 + type: ingress + replicas: 5 diff --git a/cmd/k8s-operator/proxygroup_specs.go b/cmd/k8s-operator/proxygroup_specs.go index 1ea91004b..cf76d815c 100644 --- a/cmd/k8s-operator/proxygroup_specs.go +++ b/cmd/k8s-operator/proxygroup_specs.go @@ -25,6 +25,12 @@ import ( // deletionGracePeriodSeconds is set to 6 minutes to ensure that the pre-stop hook of these proxies have enough chance to terminate gracefully. const deletionGracePeriodSeconds int64 = 360 +// Add this constant at the top with other constants +const ( + // ... existing constants ... + certSecretSuffix = "-certs" +) + // Returns the base StatefulSet definition for a ProxyGroup. A ProxyClass may be // applied over the top after. func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string, proxyClass *tsapi.ProxyClass) (*appsv1.StatefulSet, error) { @@ -178,7 +184,32 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string corev1.EnvVar{ Name: "TS_SERVE_CONFIG", Value: fmt.Sprintf("/etc/proxies/%s", serveConfigKey), + }, + corev1.EnvVar{ + Name: "TS_KUBE_CERT_SECRET", + Value: pg.Name + certSecretSuffix, + }, + corev1.EnvVar{ + Name: "TS_KUBE_CERT_DIR", + Value: "/var/run/tailscale/certs", }) + + // Add cert secret volume + tmpl.Spec.Volumes = append(tmpl.Spec.Volumes, corev1.Volume{ + Name: "certs", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: pg.Name + certSecretSuffix, + }, + }, + }) + + // Add cert volume mount + c.VolumeMounts = append(c.VolumeMounts, corev1.VolumeMount{ + Name: "certs", + MountPath: "/var/run/tailscale/certs", + ReadOnly: true, + }) } return append(c.Env, envs...) }() @@ -217,7 +248,7 @@ func pgServiceAccount(pg *tsapi.ProxyGroup, namespace string) *corev1.ServiceAcc } func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role { - return &rbacv1.Role{ + role := &rbacv1.Role{ ObjectMeta: metav1.ObjectMeta{ Name: pg.Name, Namespace: namespace, @@ -240,6 +271,9 @@ func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role { fmt.Sprintf("%s-%d", pg.Name, i), // State. ) } + if pg.Spec.Type == tsapi.ProxyGroupTypeIngress { + secrets = append(secrets, pg.Name+certSecretSuffix) // Cert secret + } return secrets }(), }, @@ -254,6 +288,7 @@ func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role { }, }, } + return role } func pgRoleBinding(pg *tsapi.ProxyGroup, namespace string) *rbacv1.RoleBinding { @@ -290,6 +325,18 @@ func pgStateSecrets(pg *tsapi.ProxyGroup, namespace string) (secrets []*corev1.S }) } + // For ingress ProxyGroups, create an additional secret for certificates + if pg.Spec.Type == tsapi.ProxyGroupTypeIngress { + secrets = append(secrets, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: pg.Name + certSecretSuffix, + Namespace: namespace, + Labels: pgSecretLabels(pg.Name, "cert"), + OwnerReferences: pgOwnerReference(pg), + }, + }) + } + return secrets } diff --git a/ipn/ipnlocal/cert.go b/ipn/ipnlocal/cert.go index cfa4fe1ba..fb7ab931f 100644 --- a/ipn/ipnlocal/cert.go +++ b/ipn/ipnlocal/cert.go @@ -250,15 +250,13 @@ type certStore interface { // for now. If they're expired, it returns errCertExpired. // If they don't exist, it returns ipn.ErrStateNotExist. Read(domain string, now time.Time) (*TLSCertKeyPair, error) - // WriteCert writes the cert for domain. - WriteCert(domain string, cert []byte) error - // WriteKey writes the key for domain. - WriteKey(domain string, key []byte) error // ACMEKey returns the value previously stored via WriteACMEKey. // It is a PEM encoded ECDSA key. ACMEKey() ([]byte, error) // WriteACMEKey stores the provided PEM encoded ECDSA key. WriteACMEKey([]byte) error + // WriteTLSCertAndKey writes the cert and key for domain. + WriteTLSCertAndKey(domain string, cert, key []byte) error } var errCertExpired = errors.New("cert expired") @@ -344,6 +342,13 @@ func (f certFileStore) WriteKey(domain string, key []byte) error { return atomicfile.WriteFile(keyFile(f.dir, domain), key, 0600) } +func (f certFileStore) WriteTLSCertAndKey(domain string, cert, key []byte) error { + if err := f.WriteKey(domain, key); err != nil { + return err + } + return f.WriteCert(domain, cert) +} + // certStateStore implements certStore by storing the cert & key files in an ipn.StateStore. type certStateStore struct { ipn.StateStore @@ -384,6 +389,23 @@ func (s certStateStore) WriteACMEKey(key []byte) error { return ipn.WriteState(s.StateStore, ipn.StateKey(acmePEMName), key) } +// atomicCertWriter is an interface for stores that can write cert and key atomically. +type atomicCertWriter interface { + WriteTLSCertAndKey(domain string, cert, key []byte) error +} + +func (s certStateStore) WriteTLSCertAndKey(domain string, cert, key []byte) error { + // If we're using a store that supports atomic writes, use that + if aw, ok := s.StateStore.(atomicCertWriter); ok { + return aw.WriteTLSCertAndKey(domain, cert, key) + } + // Otherwise fall back to separate writes + if err := s.WriteKey(domain, key); err != nil { + return err + } + return s.WriteCert(domain, cert) +} + // TLSCertKeyPair is a TLS public and private key, and whether they were obtained // from cache or freshly obtained. type TLSCertKeyPair struct { @@ -546,9 +568,6 @@ func (b *LocalBackend) getCertPEM(ctx context.Context, cs certStore, logf logger if err := encodeECDSAKey(&privPEM, certPrivKey); err != nil { return nil, err } - if err := cs.WriteKey(domain, privPEM.Bytes()); err != nil { - return nil, err - } csr, err := certRequest(certPrivKey, domain, nil) if err != nil { @@ -570,7 +589,7 @@ func (b *LocalBackend) getCertPEM(ctx context.Context, cs certStore, logf logger return nil, err } } - if err := cs.WriteCert(domain, certPEM.Bytes()); err != nil { + if err := cs.WriteTLSCertAndKey(domain, certPEM.Bytes(), privPEM.Bytes()); err != nil { return nil, err } b.domainRenewed(domain) diff --git a/ipn/store/kubestore/store_kube.go b/ipn/store/kubestore/store_kube.go index 462e6d434..189902654 100644 --- a/ipn/store/kubestore/store_kube.go +++ b/ipn/store/kubestore/store_kube.go @@ -10,9 +10,11 @@ import ( "log" "net" "os" + "path/filepath" "strings" "time" + "github.com/fsnotify/fsnotify" "tailscale.com/ipn" "tailscale.com/ipn/store/mem" "tailscale.com/kube/kubeapi" @@ -31,13 +33,20 @@ const ( reasonTailscaleStateLoadFailed = "TailscaleStateLoadFailed" eventTypeWarning = "Warning" eventTypeNormal = "Normal" + + // envCertSecretName is the environment variable for specifying a separate Secret for certificates. + envCertSecretName = "TS_KUBE_CERT_SECRET" + // envCertDir is the environment variable for specifying a directory to load certificates from. + envCertDir = "TS_KUBE_CERT_DIR" ) // Store is an ipn.StateStore that uses a Kubernetes Secret for persistence. type Store struct { - client kubeclient.Client - canPatch bool - secretName string + client kubeclient.Client + canPatch bool + stateSecretName string + certSecretName string + certDir string // memory holds the latest tailscale state. Writes write state to a kube Secret and memory, Reads read from // memory. @@ -45,7 +54,7 @@ type Store struct { } // New returns a new Store that persists to the named Secret. -func New(_ logger.Logf, secretName string) (*Store, error) { +func New(logf logger.Logf, secretName string) (*Store, error) { c, err := kubeclient.New("tailscale-state-store") if err != nil { return nil, err @@ -54,19 +63,45 @@ func New(_ logger.Logf, secretName string) (*Store, error) { // Derive the API server address from the environment variables c.SetURL(fmt.Sprintf("https://%s:%s", os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS"))) } + canPatch, _, err := c.CheckSecretPermissions(context.Background(), secretName) if err != nil { return nil, err } - s := &Store{ - client: c, - canPatch: canPatch, - secretName: secretName, + + certSecretName := os.Getenv(envCertSecretName) + if certSecretName != "" { + logf("kubestore: using separate secret %q for certificates", certSecretName) + // Also check permissions for cert secret + _, _, err := c.CheckSecretPermissions(context.Background(), certSecretName) + if err != nil { + return nil, fmt.Errorf("checking cert secret permissions: %w", err) + } } - // Load latest state from kube Secret if it already exists. + + s := &Store{ + client: c, + canPatch: canPatch, + stateSecretName: secretName, + certSecretName: certSecretName, + certDir: os.Getenv(envCertDir), + } + + // Load latest state from kube Secret if it already exists if err := s.loadState(); err != nil && err != ipn.ErrStateNotExist { return nil, fmt.Errorf("error loading state from kube Secret: %w", err) } + + // If cert directory is specified, load certs into secret + if s.certDir != "" { + logf("kubestore: loading certificates from directory %q", s.certDir) + if err := s.loadCertsFromDir(); err != nil { + return nil, fmt.Errorf("error loading certs from directory: %w", err) + } + logf("kubestore: starting certificate directory watcher") + go s.watchCertDir(context.Background()) + } + return s, nil } @@ -100,7 +135,7 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { cancel() }() - secret, err := s.client.GetSecret(ctx, s.secretName) + secret, err := s.client.GetSecret(ctx, s.stateSecretName) if err != nil { if kubeclient.IsNotFoundErr(err) { return s.client.CreateSecret(ctx, &kubeapi.Secret{ @@ -109,7 +144,7 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { Kind: "Secret", }, ObjectMeta: kubeapi.ObjectMeta{ - Name: s.secretName, + Name: s.stateSecretName, }, Data: map[string][]byte{ sanitizeKey(id): bs, @@ -127,8 +162,8 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { Value: map[string][]byte{sanitizeKey(id): bs}, }, } - if err := s.client.JSONPatchResource(ctx, s.secretName, kubeclient.TypeSecrets, m); err != nil { - return fmt.Errorf("error patching Secret %s with a /data field: %v", s.secretName, err) + if err := s.client.JSONPatchResource(ctx, s.stateSecretName, kubeclient.TypeSecrets, m); err != nil { + return fmt.Errorf("error patching Secret %s with a /data field: %v", s.stateSecretName, err) } return nil } @@ -139,8 +174,8 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { Value: bs, }, } - if err := s.client.JSONPatchResource(ctx, s.secretName, kubeclient.TypeSecrets, m); err != nil { - return fmt.Errorf("error patching Secret %s with /data/%s field: %v", s.secretName, sanitizeKey(id), err) + if err := s.client.JSONPatchResource(ctx, s.stateSecretName, kubeclient.TypeSecrets, m); err != nil { + return fmt.Errorf("error patching Secret %s with /data/%s field: %v", s.stateSecretName, sanitizeKey(id), err) } return nil } @@ -155,7 +190,7 @@ func (s *Store) loadState() (err error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - secret, err := s.client.GetSecret(ctx, s.secretName) + secret, err := s.client.GetSecret(ctx, s.stateSecretName) if err != nil { if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 { return ipn.ErrStateNotExist @@ -182,3 +217,153 @@ func sanitizeKey(k ipn.StateKey) string { return '_' }, string(k)) } + +// WriteTLSCertAndKey atomically writes both the certificate and private key for domain. +func (s *Store) WriteTLSCertAndKey(domain string, cert, key []byte) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + secretName := s.stateSecretName + if s.certSecretName != "" { + log.Printf("kubestore: writing certificates for %q to separate cert secret %q", domain, s.certSecretName) + secretName = s.certSecretName + } + + secret, err := s.client.GetSecret(ctx, secretName) + if err != nil { + if kubeclient.IsNotFoundErr(err) { + log.Printf("kubestore: creating new secret %q for certificates", secretName) + return s.client.CreateSecret(ctx, &kubeapi.Secret{ + TypeMeta: kubeapi.TypeMeta{ + APIVersion: "v1", + Kind: "Secret", + }, + ObjectMeta: kubeapi.ObjectMeta{ + Name: secretName, + }, + Data: map[string][]byte{ + sanitizeKey(ipn.StateKey(domain + ".crt")): cert, + sanitizeKey(ipn.StateKey(domain + ".key")): key, + }, + }) + } + return fmt.Errorf("getting secret %q: %w", secretName, err) + } + + if s.canPatch { + if len(secret.Data) == 0 { + log.Printf("kubestore: initializing empty secret %q with certificates", secretName) + m := []kubeclient.JSONPatch{ + { + Op: "add", + Path: "/data", + Value: map[string][]byte{ + sanitizeKey(ipn.StateKey(domain + ".crt")): cert, + sanitizeKey(ipn.StateKey(domain + ".key")): key, + }, + }, + } + return s.client.JSONPatchResource(ctx, secretName, kubeclient.TypeSecrets, m) + } + log.Printf("kubestore: patching certificates into secret %q", secretName) + m := []kubeclient.JSONPatch{ + { + Op: "add", + Path: "/data/" + sanitizeKey(ipn.StateKey(domain+".crt")), + Value: cert, + }, + { + Op: "add", + Path: "/data/" + sanitizeKey(ipn.StateKey(domain+".key")), + Value: key, + }, + } + return s.client.JSONPatchResource(ctx, secretName, kubeclient.TypeSecrets, m) + } + + log.Printf("kubestore: updating certificates in secret %q", secretName) + if secret.Data == nil { + secret.Data = make(map[string][]byte) + } + secret.Data[sanitizeKey(ipn.StateKey(domain+".crt"))] = cert + secret.Data[sanitizeKey(ipn.StateKey(domain+".key"))] = key + return s.client.UpdateSecret(ctx, secret) +} + +// loadCertsFromDir reads certificates from the configured directory into memory. +func (s *Store) loadCertsFromDir() error { + if s.certDir == "" { + return nil + } + + entries, err := os.ReadDir(s.certDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + count := 0 + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasSuffix(name, ".crt") && !strings.HasSuffix(name, ".key") { + continue + } + + data, err := os.ReadFile(filepath.Join(s.certDir, name)) + if err != nil { + return fmt.Errorf("reading cert file %q: %w", name, err) + } + + // Store in memory + s.memory.WriteState(ipn.StateKey(name), data) + count++ + } + + log.Printf("kubestore: loaded %d certificate files from %s", count, s.certDir) + return nil +} + +// watchCertDir watches the cert directory for changes and reloads certificates into memory +// when changes are detected. It exits when the context is canceled. +func (s *Store) watchCertDir(ctx context.Context) { + if s.certDir == "" { + return + } + + var tickChan <-chan time.Time + var eventChan <-chan fsnotify.Event + if w, err := fsnotify.NewWatcher(); err != nil { + log.Printf("kubestore: failed to create fsnotify watcher for %q, falling back to timer-only mode: %v", s.certDir, err) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + tickChan = ticker.C + } else { + defer w.Close() + if err := w.Add(s.certDir); err != nil { + log.Printf("kubestore: failed to add fsnotify watch for %q: %v", s.certDir, err) + return + } + log.Printf("kubestore: watching %q for certificate changes", s.certDir) + eventChan = w.Events + } + + for { + select { + case <-ctx.Done(): + return + case <-tickChan: + case <-eventChan: + // We can't do any reasonable filtering on the event because of how + // k8s handles these mounts. So just re-read the directory and + // update memory if needed. + } + if err := s.loadCertsFromDir(); err != nil { + log.Printf("kubestore: error reloading certs from directory: %v", err) + } + } +}