Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2025-02-25 11:32:49 -08:00
parent d7508b24c6
commit fd5ee311c3
4 changed files with 279 additions and 28 deletions

View File

@ -1,7 +1,7 @@
apiVersion: tailscale.com/v1alpha1
kind: ProxyGroup
metadata:
name: egress-proxies
name: ingress-proxies
spec:
type: egress
replicas: 3
type: ingress
replicas: 5

View File

@ -25,6 +25,12 @@ import (
// deletionGracePeriodSeconds is set to 6 minutes to ensure that the pre-stop hook of these proxies have enough chance to terminate gracefully.
const deletionGracePeriodSeconds int64 = 360
// Add this constant at the top with other constants
const (
// ... existing constants ...
certSecretSuffix = "-certs"
)
// Returns the base StatefulSet definition for a ProxyGroup. A ProxyClass may be
// applied over the top after.
func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string, proxyClass *tsapi.ProxyClass) (*appsv1.StatefulSet, error) {
@ -178,7 +184,32 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
corev1.EnvVar{
Name: "TS_SERVE_CONFIG",
Value: fmt.Sprintf("/etc/proxies/%s", serveConfigKey),
},
corev1.EnvVar{
Name: "TS_KUBE_CERT_SECRET",
Value: pg.Name + certSecretSuffix,
},
corev1.EnvVar{
Name: "TS_KUBE_CERT_DIR",
Value: "/var/run/tailscale/certs",
})
// Add cert secret volume
tmpl.Spec.Volumes = append(tmpl.Spec.Volumes, corev1.Volume{
Name: "certs",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: pg.Name + certSecretSuffix,
},
},
})
// Add cert volume mount
c.VolumeMounts = append(c.VolumeMounts, corev1.VolumeMount{
Name: "certs",
MountPath: "/var/run/tailscale/certs",
ReadOnly: true,
})
}
return append(c.Env, envs...)
}()
@ -217,7 +248,7 @@ func pgServiceAccount(pg *tsapi.ProxyGroup, namespace string) *corev1.ServiceAcc
}
func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role {
return &rbacv1.Role{
role := &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: pg.Name,
Namespace: namespace,
@ -240,6 +271,9 @@ func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role {
fmt.Sprintf("%s-%d", pg.Name, i), // State.
)
}
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
secrets = append(secrets, pg.Name+certSecretSuffix) // Cert secret
}
return secrets
}(),
},
@ -254,6 +288,7 @@ func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role {
},
},
}
return role
}
func pgRoleBinding(pg *tsapi.ProxyGroup, namespace string) *rbacv1.RoleBinding {
@ -290,6 +325,18 @@ func pgStateSecrets(pg *tsapi.ProxyGroup, namespace string) (secrets []*corev1.S
})
}
// For ingress ProxyGroups, create an additional secret for certificates
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
secrets = append(secrets, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pg.Name + certSecretSuffix,
Namespace: namespace,
Labels: pgSecretLabels(pg.Name, "cert"),
OwnerReferences: pgOwnerReference(pg),
},
})
}
return secrets
}

View File

@ -250,15 +250,13 @@ type certStore interface {
// for now. If they're expired, it returns errCertExpired.
// If they don't exist, it returns ipn.ErrStateNotExist.
Read(domain string, now time.Time) (*TLSCertKeyPair, error)
// WriteCert writes the cert for domain.
WriteCert(domain string, cert []byte) error
// WriteKey writes the key for domain.
WriteKey(domain string, key []byte) error
// ACMEKey returns the value previously stored via WriteACMEKey.
// It is a PEM encoded ECDSA key.
ACMEKey() ([]byte, error)
// WriteACMEKey stores the provided PEM encoded ECDSA key.
WriteACMEKey([]byte) error
// WriteTLSCertAndKey writes the cert and key for domain.
WriteTLSCertAndKey(domain string, cert, key []byte) error
}
var errCertExpired = errors.New("cert expired")
@ -344,6 +342,13 @@ func (f certFileStore) WriteKey(domain string, key []byte) error {
return atomicfile.WriteFile(keyFile(f.dir, domain), key, 0600)
}
func (f certFileStore) WriteTLSCertAndKey(domain string, cert, key []byte) error {
if err := f.WriteKey(domain, key); err != nil {
return err
}
return f.WriteCert(domain, cert)
}
// certStateStore implements certStore by storing the cert & key files in an ipn.StateStore.
type certStateStore struct {
ipn.StateStore
@ -384,6 +389,23 @@ func (s certStateStore) WriteACMEKey(key []byte) error {
return ipn.WriteState(s.StateStore, ipn.StateKey(acmePEMName), key)
}
// atomicCertWriter is an interface for stores that can write cert and key atomically.
type atomicCertWriter interface {
WriteTLSCertAndKey(domain string, cert, key []byte) error
}
func (s certStateStore) WriteTLSCertAndKey(domain string, cert, key []byte) error {
// If we're using a store that supports atomic writes, use that
if aw, ok := s.StateStore.(atomicCertWriter); ok {
return aw.WriteTLSCertAndKey(domain, cert, key)
}
// Otherwise fall back to separate writes
if err := s.WriteKey(domain, key); err != nil {
return err
}
return s.WriteCert(domain, cert)
}
// TLSCertKeyPair is a TLS public and private key, and whether they were obtained
// from cache or freshly obtained.
type TLSCertKeyPair struct {
@ -546,9 +568,6 @@ func (b *LocalBackend) getCertPEM(ctx context.Context, cs certStore, logf logger
if err := encodeECDSAKey(&privPEM, certPrivKey); err != nil {
return nil, err
}
if err := cs.WriteKey(domain, privPEM.Bytes()); err != nil {
return nil, err
}
csr, err := certRequest(certPrivKey, domain, nil)
if err != nil {
@ -570,7 +589,7 @@ func (b *LocalBackend) getCertPEM(ctx context.Context, cs certStore, logf logger
return nil, err
}
}
if err := cs.WriteCert(domain, certPEM.Bytes()); err != nil {
if err := cs.WriteTLSCertAndKey(domain, certPEM.Bytes(), privPEM.Bytes()); err != nil {
return nil, err
}
b.domainRenewed(domain)

View File

@ -10,9 +10,11 @@ import (
"log"
"net"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"tailscale.com/ipn"
"tailscale.com/ipn/store/mem"
"tailscale.com/kube/kubeapi"
@ -31,13 +33,20 @@ const (
reasonTailscaleStateLoadFailed = "TailscaleStateLoadFailed"
eventTypeWarning = "Warning"
eventTypeNormal = "Normal"
// envCertSecretName is the environment variable for specifying a separate Secret for certificates.
envCertSecretName = "TS_KUBE_CERT_SECRET"
// envCertDir is the environment variable for specifying a directory to load certificates from.
envCertDir = "TS_KUBE_CERT_DIR"
)
// Store is an ipn.StateStore that uses a Kubernetes Secret for persistence.
type Store struct {
client kubeclient.Client
canPatch bool
secretName string
client kubeclient.Client
canPatch bool
stateSecretName string
certSecretName string
certDir string
// memory holds the latest tailscale state. Writes write state to a kube Secret and memory, Reads read from
// memory.
@ -45,7 +54,7 @@ type Store struct {
}
// New returns a new Store that persists to the named Secret.
func New(_ logger.Logf, secretName string) (*Store, error) {
func New(logf logger.Logf, secretName string) (*Store, error) {
c, err := kubeclient.New("tailscale-state-store")
if err != nil {
return nil, err
@ -54,19 +63,45 @@ func New(_ logger.Logf, secretName string) (*Store, error) {
// Derive the API server address from the environment variables
c.SetURL(fmt.Sprintf("https://%s:%s", os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")))
}
canPatch, _, err := c.CheckSecretPermissions(context.Background(), secretName)
if err != nil {
return nil, err
}
s := &Store{
client: c,
canPatch: canPatch,
secretName: secretName,
certSecretName := os.Getenv(envCertSecretName)
if certSecretName != "" {
logf("kubestore: using separate secret %q for certificates", certSecretName)
// Also check permissions for cert secret
_, _, err := c.CheckSecretPermissions(context.Background(), certSecretName)
if err != nil {
return nil, fmt.Errorf("checking cert secret permissions: %w", err)
}
}
// Load latest state from kube Secret if it already exists.
s := &Store{
client: c,
canPatch: canPatch,
stateSecretName: secretName,
certSecretName: certSecretName,
certDir: os.Getenv(envCertDir),
}
// Load latest state from kube Secret if it already exists
if err := s.loadState(); err != nil && err != ipn.ErrStateNotExist {
return nil, fmt.Errorf("error loading state from kube Secret: %w", err)
}
// If cert directory is specified, load certs into secret
if s.certDir != "" {
logf("kubestore: loading certificates from directory %q", s.certDir)
if err := s.loadCertsFromDir(); err != nil {
return nil, fmt.Errorf("error loading certs from directory: %w", err)
}
logf("kubestore: starting certificate directory watcher")
go s.watchCertDir(context.Background())
}
return s, nil
}
@ -100,7 +135,7 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) {
cancel()
}()
secret, err := s.client.GetSecret(ctx, s.secretName)
secret, err := s.client.GetSecret(ctx, s.stateSecretName)
if err != nil {
if kubeclient.IsNotFoundErr(err) {
return s.client.CreateSecret(ctx, &kubeapi.Secret{
@ -109,7 +144,7 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) {
Kind: "Secret",
},
ObjectMeta: kubeapi.ObjectMeta{
Name: s.secretName,
Name: s.stateSecretName,
},
Data: map[string][]byte{
sanitizeKey(id): bs,
@ -127,8 +162,8 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) {
Value: map[string][]byte{sanitizeKey(id): bs},
},
}
if err := s.client.JSONPatchResource(ctx, s.secretName, kubeclient.TypeSecrets, m); err != nil {
return fmt.Errorf("error patching Secret %s with a /data field: %v", s.secretName, err)
if err := s.client.JSONPatchResource(ctx, s.stateSecretName, kubeclient.TypeSecrets, m); err != nil {
return fmt.Errorf("error patching Secret %s with a /data field: %v", s.stateSecretName, err)
}
return nil
}
@ -139,8 +174,8 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) {
Value: bs,
},
}
if err := s.client.JSONPatchResource(ctx, s.secretName, kubeclient.TypeSecrets, m); err != nil {
return fmt.Errorf("error patching Secret %s with /data/%s field: %v", s.secretName, sanitizeKey(id), err)
if err := s.client.JSONPatchResource(ctx, s.stateSecretName, kubeclient.TypeSecrets, m); err != nil {
return fmt.Errorf("error patching Secret %s with /data/%s field: %v", s.stateSecretName, sanitizeKey(id), err)
}
return nil
}
@ -155,7 +190,7 @@ func (s *Store) loadState() (err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
secret, err := s.client.GetSecret(ctx, s.secretName)
secret, err := s.client.GetSecret(ctx, s.stateSecretName)
if err != nil {
if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 {
return ipn.ErrStateNotExist
@ -182,3 +217,153 @@ func sanitizeKey(k ipn.StateKey) string {
return '_'
}, string(k))
}
// WriteTLSCertAndKey atomically writes both the certificate and private key for domain.
func (s *Store) WriteTLSCertAndKey(domain string, cert, key []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
secretName := s.stateSecretName
if s.certSecretName != "" {
log.Printf("kubestore: writing certificates for %q to separate cert secret %q", domain, s.certSecretName)
secretName = s.certSecretName
}
secret, err := s.client.GetSecret(ctx, secretName)
if err != nil {
if kubeclient.IsNotFoundErr(err) {
log.Printf("kubestore: creating new secret %q for certificates", secretName)
return s.client.CreateSecret(ctx, &kubeapi.Secret{
TypeMeta: kubeapi.TypeMeta{
APIVersion: "v1",
Kind: "Secret",
},
ObjectMeta: kubeapi.ObjectMeta{
Name: secretName,
},
Data: map[string][]byte{
sanitizeKey(ipn.StateKey(domain + ".crt")): cert,
sanitizeKey(ipn.StateKey(domain + ".key")): key,
},
})
}
return fmt.Errorf("getting secret %q: %w", secretName, err)
}
if s.canPatch {
if len(secret.Data) == 0 {
log.Printf("kubestore: initializing empty secret %q with certificates", secretName)
m := []kubeclient.JSONPatch{
{
Op: "add",
Path: "/data",
Value: map[string][]byte{
sanitizeKey(ipn.StateKey(domain + ".crt")): cert,
sanitizeKey(ipn.StateKey(domain + ".key")): key,
},
},
}
return s.client.JSONPatchResource(ctx, secretName, kubeclient.TypeSecrets, m)
}
log.Printf("kubestore: patching certificates into secret %q", secretName)
m := []kubeclient.JSONPatch{
{
Op: "add",
Path: "/data/" + sanitizeKey(ipn.StateKey(domain+".crt")),
Value: cert,
},
{
Op: "add",
Path: "/data/" + sanitizeKey(ipn.StateKey(domain+".key")),
Value: key,
},
}
return s.client.JSONPatchResource(ctx, secretName, kubeclient.TypeSecrets, m)
}
log.Printf("kubestore: updating certificates in secret %q", secretName)
if secret.Data == nil {
secret.Data = make(map[string][]byte)
}
secret.Data[sanitizeKey(ipn.StateKey(domain+".crt"))] = cert
secret.Data[sanitizeKey(ipn.StateKey(domain+".key"))] = key
return s.client.UpdateSecret(ctx, secret)
}
// loadCertsFromDir reads certificates from the configured directory into memory.
func (s *Store) loadCertsFromDir() error {
if s.certDir == "" {
return nil
}
entries, err := os.ReadDir(s.certDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
count := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasSuffix(name, ".crt") && !strings.HasSuffix(name, ".key") {
continue
}
data, err := os.ReadFile(filepath.Join(s.certDir, name))
if err != nil {
return fmt.Errorf("reading cert file %q: %w", name, err)
}
// Store in memory
s.memory.WriteState(ipn.StateKey(name), data)
count++
}
log.Printf("kubestore: loaded %d certificate files from %s", count, s.certDir)
return nil
}
// watchCertDir watches the cert directory for changes and reloads certificates into memory
// when changes are detected. It exits when the context is canceled.
func (s *Store) watchCertDir(ctx context.Context) {
if s.certDir == "" {
return
}
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("kubestore: failed to create fsnotify watcher for %q, falling back to timer-only mode: %v", s.certDir, err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
defer w.Close()
if err := w.Add(s.certDir); err != nil {
log.Printf("kubestore: failed to add fsnotify watch for %q: %v", s.certDir, err)
return
}
log.Printf("kubestore: watching %q for certificate changes", s.certDir)
eventChan = w.Events
}
for {
select {
case <-ctx.Done():
return
case <-tickChan:
case <-eventChan:
// We can't do any reasonable filtering on the event because of how
// k8s handles these mounts. So just re-read the directory and
// update memory if needed.
}
if err := s.loadCertsFromDir(); err != nil {
log.Printf("kubestore: error reloading certs from directory: %v", err)
}
}
}