WIP: Connector multi-replica

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2024-11-06 07:22:02 +00:00
parent 7de684e71a
commit 11532daad9
14 changed files with 339 additions and 194 deletions

View File

@ -0,0 +1,52 @@
# HA failover
This is an experimental prototype for fast failover for subnet routers via Kubernetes operator.
Problem: how can we ensure that if multiple subnet router replicas are ran and a replica is about to be deleted (i.e StatefulSet upgrade), peers that currently route via this subnet router will switch to another subnet router instance _before_ the first one is deleted.
This code change:
- adds a lameduck local API endpoint that can be called to shut down control client and thus force control to consider this node inactive
- adds a prestop hook definition to Connector StatefulSet that calls terminate endpoint
- bumps termination grace period seconds on Connector Pod spec 30s -> 120s to ensure that the /terminate endpoint gets a chance to finish
This change also includes WIP work to run Connector in multi-replica mode.
### How to try it:
```
$ helm upgrade --install operator tailscale-dev/tailscale-operator -n tailscale --create-namespace --set operatorConfig.image.repo=gcr.io/csi-test-290908/operator --set operatorConfig.image.tag=0.12connmultir --set proxyConfig.image.repo=gcr.io/csi-test-290908/proxy --set proxyConfig.image.tag=v0.0.15connmultir -n tailscale --create-namespace --set oauth.clientId=<id> --set oauth.clientSecret=<>
```
```
$ kubectl delete crd connectors.tailscale.com // need to re-apply CRD from this branch
```
(from this branch)
```
$ kubectl apply -f cmd/k8s-operator/deploy/crds/tailscale.com_connectors.yaml
```
Apply a multi-replica Connector with some route:
```
apiVersion: tailscale.com/v1alpha1
kind: Connector
metadata:
name: prod
spec:
tags:
- "tag:prod"
hostname: ts-prod
subnetRouter:
- <route>
replicas: 3
```
Test failover during deletion, i.e curl the backend in a tight-ish loop and delete the primary Pod, you should be able to observe that within ~a minute traffic switches over to the second Pod, meanwhile the connection should keep working without an obvious hitch.
(I was curl-ing with 1s interval and saw a RST, then it switched over)

View File

@ -6,10 +6,14 @@
package main package main
import ( import (
"context"
"log" "log"
"net" "net"
"net/http" "net/http"
"sync" "sync"
"time"
"tailscale.com/client/tailscale"
) )
// healthz is a simple health check server, if enabled it returns 200 OK if // healthz is a simple health check server, if enabled it returns 200 OK if
@ -33,14 +37,20 @@ func (h *healthz) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// runHealthz runs a simple HTTP health endpoint on /healthz, listening on the // runHealthz runs a simple HTTP health endpoint on /healthz, listening on the
// provided address. A containerized tailscale instance is considered healthy if // provided address. A containerized tailscale instance is considered healthy if
// it has at least one tailnet IP address. // it has at least one tailnet IP address.
func runHealthz(addr string, h *healthz) { func run(addr string, h *healthz, lc *tailscale.LocalClient) {
lis, err := net.Listen("tcp", addr) lis, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
log.Fatalf("error listening on the provided health endpoint address %q: %v", addr, err) log.Fatalf("error listening on the provided health endpoint address %q: %v", addr, err)
} }
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/healthz", h) mux.Handle("/healthz", h)
log.Printf("Running healthcheck endpoint at %s/healthz", addr) t := terminator{lc: lc}
// /terminate is an endpoint that can be called from a prestop hook of this containerboot instance.
// https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/#container-hooks It drops all
// connections to and from Tailscale control plane. This can be used for containerboot instances that are HA
// subnet routers. Control plane will consider the instance that is not responding as 'inactive' and prompt
// peers to switch to another subnet router. Whilst this happens the existing connections will remain functional.
mux.Handle("/terminate", t)
hs := &http.Server{Handler: mux} hs := &http.Server{Handler: mux}
go func() { go func() {
@ -49,3 +59,38 @@ func runHealthz(addr string, h *healthz) {
} }
}() }()
} }
type terminator struct {
// nfr linuxfw.NetfilterRunner
lc *tailscale.LocalClient
}
func (t terminator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("prestopBlockNetmapUpdates triggered")
if err := t.lc.LameDuck(context.Background()); err != nil {
log.Fatalf("error enabling lameduck: %v", err)
}
// tailscaleIPs, err := resolveDNS(context.Background(), "controlplane.tailscale.com")
// if err != nil {
// log.Printf("prestopBlockNetmapUpdates errored: %v", err)
// return
// }
// var (
// addrs []netip.Addr
// )
// for _, ip := range tailscaleIPs {
// if ip.To4() != nil {
// addrs = append(addrs, netip.AddrFrom4([4]byte(ip.To4())))
// }
// // just v4 for this prototype
// }
// for _, addr := range addrs {
// log.Printf("dropping traffic to %v", addr)
// if err := t.nfr.AddDropRule(addr); err != nil {
// log.Printf("error adding drop rule for %v: %v", addr, err)
// }
// }
log.Printf("sleeping to give control plane a chance to update...")
time.Sleep(time.Second * 100)
log.Printf("finished sleeping")
}

View File

@ -307,6 +307,12 @@ authLoop:
if err != nil { if err != nil {
log.Fatalf("rewatching tailscaled for updates after auth: %v", err) log.Fatalf("rewatching tailscaled for updates after auth: %v", err)
} }
var nfr linuxfw.NetfilterRunner
// for this prototype
nfr, err = newNetfilterRunner(log.Printf)
if err != nil {
log.Fatalf("error creating new netfilter runner: %v", err)
}
var ( var (
startupTasksDone = false startupTasksDone = false
@ -323,19 +329,11 @@ authLoop:
certDomainChanged = make(chan bool, 1) certDomainChanged = make(chan bool, 1)
h = &healthz{} // http server for the healthz endpoint h = &healthz{} // http server for the healthz endpoint
healthzRunner = sync.OnceFunc(func() { runHealthz(cfg.HealthCheckAddrPort, h) }) healthzRunner = sync.OnceFunc(func() { run(cfg.HealthCheckAddrPort, h, client) })
) )
if cfg.ServeConfigPath != "" { if cfg.ServeConfigPath != "" {
go watchServeConfigChanges(ctx, cfg.ServeConfigPath, certDomainChanged, certDomain, client) go watchServeConfigChanges(ctx, cfg.ServeConfigPath, certDomainChanged, certDomain, client)
} }
var nfr linuxfw.NetfilterRunner
if isL3Proxy(cfg) {
nfr, err = newNetfilterRunner(log.Printf)
if err != nil {
log.Fatalf("error creating new netfilter runner: %v", err)
}
}
// Setup for proxies that are configured to proxy to a target specified // Setup for proxies that are configured to proxy to a target specified
// by a DNS name (TS_EXPERIMENTAL_DEST_DNS_NAME). // by a DNS name (TS_EXPERIMENTAL_DEST_DNS_NAME).
const defaultCheckPeriod = time.Minute * 10 // how often to check what IPs the DNS name resolves to const defaultCheckPeriod = time.Minute * 10 // how often to check what IPs the DNS name resolves to
@ -744,3 +742,7 @@ func tailscaledConfigFilePath() string {
log.Printf("Using tailscaled config file %q for capability version %q", maxCompatVer, tailcfg.CurrentCapabilityVersion) log.Printf("Using tailscaled config file %q for capability version %q", maxCompatVer, tailcfg.CurrentCapabilityVersion)
return path.Join(dir, kubeutils.TailscaledConfigFileName(maxCompatVer)) return path.Join(dir, kubeutils.TailscaledConfigFileName(maxCompatVer))
} }
func preStopBlockNetmapUpdates(ctx context.Context, nfr linuxfw.NetfilterRunner) {
// figure out if we are a subnet router in HA mode
}

View File

@ -170,11 +170,6 @@ func (s *settings) validate() error {
if s.EnableForwardingOptimizations && s.UserspaceMode { if s.EnableForwardingOptimizations && s.UserspaceMode {
return errors.New("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS is not supported in userspace mode") return errors.New("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS is not supported in userspace mode")
} }
if s.HealthCheckAddrPort != "" {
if _, err := netip.ParseAddrPort(s.HealthCheckAddrPort); err != nil {
return fmt.Errorf("error parsing TS_HEALTH_CHECK_ADDR_PORT value %q: %w", s.HealthCheckAddrPort, err)
}
}
return nil return nil
} }

View File

@ -183,6 +183,10 @@ func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logge
isExitNode: cn.Spec.ExitNode, isExitNode: cn.Spec.ExitNode,
}, },
ProxyClassName: proxyClass, ProxyClassName: proxyClass,
Replicas: 1,
}
if cn.Spec.Replicas != nil {
sts.Replicas = int32(*cn.Spec.Replicas)
} }
if cn.Spec.SubnetRouter != nil && len(cn.Spec.SubnetRouter.AdvertiseRoutes) > 0 { if cn.Spec.SubnetRouter != nil && len(cn.Spec.SubnetRouter.AdvertiseRoutes) > 0 {
@ -213,21 +217,21 @@ func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logge
return err return err
} }
_, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl) // _, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl)
if err != nil { // if err != nil {
return err // return err
} // }
if tsHost == "" { // if tsHost == "" {
logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth") // logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth")
// No hostname yet. Wait for the connector pod to auth. // // No hostname yet. Wait for the connector pod to auth.
cn.Status.TailnetIPs = nil // cn.Status.TailnetIPs = nil
cn.Status.Hostname = "" // cn.Status.Hostname = ""
return nil // return nil
} // }
cn.Status.TailnetIPs = ips // cn.Status.TailnetIPs = ips
cn.Status.Hostname = tsHost // cn.Status.Hostname = tsHost
return nil return nil
} }

View File

@ -65,6 +65,8 @@ spec:
More info: More info:
https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
type: object type: object
required:
- replicas
properties: properties:
exitNode: exitNode:
description: |- description: |-
@ -88,6 +90,8 @@ spec:
resources created for this Connector. If unset, the operator will resources created for this Connector. If unset, the operator will
create resources with the default configuration. create resources with the default configuration.
type: string type: string
replicas:
type: integer
subnetRouter: subnetRouter:
description: |- description: |-
SubnetRouter defines subnet routes that the Connector node should SubnetRouter defines subnet routes that the Connector node should

View File

@ -113,6 +113,8 @@ spec:
resources created for this Connector. If unset, the operator will resources created for this Connector. If unset, the operator will
create resources with the default configuration. create resources with the default configuration.
type: string type: string
replicas:
type: integer
subnetRouter: subnetRouter:
description: |- description: |-
SubnetRouter defines subnet routes that the Connector node should SubnetRouter defines subnet routes that the Connector node should
@ -149,6 +151,8 @@ spec:
pattern: ^tag:[a-zA-Z][a-zA-Z0-9-]*$ pattern: ^tag:[a-zA-Z][a-zA-Z0-9-]*$
type: string type: string
type: array type: array
required:
- replicas
type: object type: object
x-kubernetes-validations: x-kubernetes-validations:
- message: A Connector needs to be either an exit node or a subnet router, or both. - message: A Connector needs to be either an exit node or a subnet router, or both.

View File

@ -9,6 +9,7 @@ spec:
metadata: metadata:
deletionGracePeriodSeconds: 10 deletionGracePeriodSeconds: 10
spec: spec:
terminationGracePeriodSeconds: 120
serviceAccountName: proxies serviceAccountName: proxies
initContainers: initContainers:
- name: sysctler - name: sysctler
@ -22,6 +23,11 @@ spec:
memory: 1Mi memory: 1Mi
containers: containers:
- name: tailscale - name: tailscale
lifecycle:
preStop:
httpGet:
path: /terminate
port: 8081
imagePullPolicy: Always imagePullPolicy: Always
env: env:
- name: TS_USERSPACE - name: TS_USERSPACE

View File

@ -129,6 +129,7 @@ type tailscaleSTSConfig struct {
ProxyClassName string // name of ProxyClass if one needs to be applied to the proxy ProxyClassName string // name of ProxyClass if one needs to be applied to the proxy
ProxyClass *tsapi.ProxyClass // ProxyClass that needs to be applied to the proxy (if there is one) ProxyClass *tsapi.ProxyClass // ProxyClass that needs to be applied to the proxy (if there is one)
Replicas int32
} }
type connector struct { type connector struct {
@ -186,11 +187,11 @@ func (a *tailscaleSTSReconciler) Provision(ctx context.Context, logger *zap.Suga
} }
sts.ProxyClass = proxyClass sts.ProxyClass = proxyClass
secretName, tsConfigHash, configs, err := a.createOrGetSecret(ctx, logger, sts, hsvc) tsConfigHash, configs, err := a.createOrGetSecrets(ctx, logger, sts, hsvc)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create or get API key secret: %w", err) return nil, fmt.Errorf("failed to create or get API key secret: %w", err)
} }
_, err = a.reconcileSTS(ctx, logger, sts, hsvc, secretName, tsConfigHash, configs) _, err = a.reconcileSTS(ctx, logger, sts, hsvc, tsConfigHash, configs)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to reconcile statefulset: %w", err) return nil, fmt.Errorf("failed to reconcile statefulset: %w", err)
} }
@ -226,22 +227,27 @@ func (a *tailscaleSTSReconciler) Cleanup(ctx context.Context, logger *zap.Sugare
logger.Debugf("started deletion of statefulset %s/%s", sts.GetNamespace(), sts.GetName()) logger.Debugf("started deletion of statefulset %s/%s", sts.GetNamespace(), sts.GetName())
return false, nil return false, nil
} }
stateSecrets := &corev1.SecretList{}
id, _, _, err := a.DeviceInfo(ctx, labels) if err := a.List(ctx, stateSecrets); err != nil {
if err != nil { return false, err
return false, fmt.Errorf("getting device info: %w", err)
} }
if id != "" { for _, sec := range stateSecrets.Items {
logger.Debugf("deleting device %s from control", string(id)) id, _, _, err := deviceInfo(&sec)
if err := a.tsClient.DeleteDevice(ctx, string(id)); err != nil { if err != nil {
errResp := &tailscale.ErrResponse{} return false, fmt.Errorf("error cleaning up state: %v", err)
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound { }
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id)) if id != "" {
logger.Debugf("deleting device %s from control", string(id))
if err := a.tsClient.DeleteDevice(ctx, string(id)); err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id))
} else {
return false, fmt.Errorf("deleting device: %w", err)
}
} else { } else {
return false, fmt.Errorf("deleting device: %w", err) logger.Debugf("device %s deleted from control", string(id))
} }
} else {
logger.Debugf("device %s deleted from control", string(id))
} }
} }
@ -304,96 +310,96 @@ func (a *tailscaleSTSReconciler) reconcileHeadlessService(ctx context.Context, l
return createOrUpdate(ctx, a.Client, a.operatorNamespace, hsvc, func(svc *corev1.Service) { svc.Spec = hsvc.Spec }) return createOrUpdate(ctx, a.Client, a.operatorNamespace, hsvc, func(svc *corev1.Service) { svc.Spec = hsvc.Spec })
} }
func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger *zap.SugaredLogger, stsC *tailscaleSTSConfig, hsvc *corev1.Service) (secretName, hash string, configs tailscaledConfigs, _ error) { // tailscaled config secrets
secret := &corev1.Secret{ func (a *tailscaleSTSReconciler) createOrGetSecrets(ctx context.Context, logger *zap.SugaredLogger, stsC *tailscaleSTSConfig, hsvc *corev1.Service) (hash string, configs tailscaledConfigs, _ error) {
ObjectMeta: metav1.ObjectMeta{ var allConfigs []tailscaledConfigs
// Hardcode a -0 suffix so that in future, if we support
// multiple StatefulSet replicas, we can provision -N for // TODO: deal with pre-existing secrets so we don't recreate _all_ auth keys on upgrade to this version.
// those. for i := range stsC.Replicas {
Name: hsvc.Name + "-0", secret := &corev1.Secret{
Namespace: a.operatorNamespace, ObjectMeta: metav1.ObjectMeta{
Labels: stsC.ChildResourceLabels, Name: fmt.Sprintf("%s-%d-config", hsvc.Name, i),
}, Namespace: a.operatorNamespace,
} Labels: stsC.ChildResourceLabels,
var orig *corev1.Secret // unmodified copy of secret },
if err := a.Get(ctx, client.ObjectKeyFromObject(secret), secret); err == nil { }
logger.Debugf("secret %s/%s already exists", secret.GetNamespace(), secret.GetName()) var orig *corev1.Secret // unmodified copy of secret
orig = secret.DeepCopy() if err := a.Get(ctx, client.ObjectKeyFromObject(secret), secret); err == nil {
} else if !apierrors.IsNotFound(err) { logger.Debugf("secret %s/%s already exists", secret.GetNamespace(), secret.GetName())
return "", "", nil, err orig = secret.DeepCopy()
} else if !apierrors.IsNotFound(err) {
return "", nil, err
}
var authKey string
if orig == nil {
sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, stsC.ChildResourceLabels)
if err != nil {
return "", nil, err
}
if sts != nil {
// StatefulSet exists, so we have already created the secret.
// If the secret is missing, they should delete the StatefulSet.
logger.Errorf("Tailscale proxy secret doesn't exist, but the corresponding StatefulSet %s/%s already does. Something is wrong, please delete the StatefulSet.", sts.GetNamespace(), sts.GetName())
return "", nil, nil
}
// Create auth key Secret which is going to be used by the Statefulset to authenticate with Tailscale.
logger.Debugf("creating authkey for new tailscale proxy")
tags := stsC.Tags
if len(tags) == 0 {
tags = a.defaultTags
}
authKey, err = newAuthKey(ctx, a.tsClient, tags)
if err != nil {
return "", nil, err
}
}
configs, err := tailscaledConfig(stsC, authKey, orig, i)
if err != nil {
return "", nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
allConfigs = append(allConfigs, configs)
latest := tailcfg.CapabilityVersion(-1)
var latestConfig ipn.ConfigVAlpha
for key, val := range configs {
fn := tsoperator.TailscaledConfigFileName(key)
b, err := json.Marshal(val)
if err != nil {
return "", nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
}
mak.Set(&secret.StringData, fn, string(b))
if key > latest {
latest = key
latestConfig = val
}
}
if stsC.ServeConfig != nil {
j, err := json.Marshal(stsC.ServeConfig)
if err != nil {
return "", nil, err
}
mak.Set(&secret.StringData, "serve-config", string(j))
}
if orig != nil {
logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Patch(ctx, secret, client.MergeFrom(orig)); err != nil {
return "", nil, err
}
} else {
logger.Debugf("creating a new Secret for the proxy with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Create(ctx, secret); err != nil {
return "", nil, err
}
}
} }
var authKey string hash, err := tailscaledConfigHash(allConfigs)
if orig == nil {
// Initially it contains only tailscaled config, but when the
// proxy starts, it will also store there the state, certs and
// ACME account key.
sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, stsC.ChildResourceLabels)
if err != nil {
return "", "", nil, err
}
if sts != nil {
// StatefulSet exists, so we have already created the secret.
// If the secret is missing, they should delete the StatefulSet.
logger.Errorf("Tailscale proxy secret doesn't exist, but the corresponding StatefulSet %s/%s already does. Something is wrong, please delete the StatefulSet.", sts.GetNamespace(), sts.GetName())
return "", "", nil, nil
}
// Create API Key secret which is going to be used by the statefulset
// to authenticate with Tailscale.
logger.Debugf("creating authkey for new tailscale proxy")
tags := stsC.Tags
if len(tags) == 0 {
tags = a.defaultTags
}
authKey, err = newAuthKey(ctx, a.tsClient, tags)
if err != nil {
return "", "", nil, err
}
}
configs, err := tailscaledConfig(stsC, authKey, orig)
if err != nil { if err != nil {
return "", "", nil, fmt.Errorf("error creating tailscaled config: %w", err) return "", nil, fmt.Errorf("error calculating hash of tailscaled configs: %w", err)
}
hash, err = tailscaledConfigHash(configs)
if err != nil {
return "", "", nil, fmt.Errorf("error calculating hash of tailscaled configs: %w", err)
} }
latest := tailcfg.CapabilityVersion(-1) return hash, configs, nil
var latestConfig ipn.ConfigVAlpha
for key, val := range configs {
fn := tsoperator.TailscaledConfigFileName(key)
b, err := json.Marshal(val)
if err != nil {
return "", "", nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
}
mak.Set(&secret.StringData, fn, string(b))
if key > latest {
latest = key
latestConfig = val
}
}
if stsC.ServeConfig != nil {
j, err := json.Marshal(stsC.ServeConfig)
if err != nil {
return "", "", nil, err
}
mak.Set(&secret.StringData, "serve-config", string(j))
}
if orig != nil {
logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Patch(ctx, secret, client.MergeFrom(orig)); err != nil {
return "", "", nil, err
}
} else {
logger.Debugf("creating a new Secret for the proxy with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Create(ctx, secret); err != nil {
return "", "", nil, err
}
}
return secret.Name, hash, configs, nil
} }
// sanitizeConfigBytes returns ipn.ConfigVAlpha in string form with redacted // sanitizeConfigBytes returns ipn.ConfigVAlpha in string form with redacted
@ -473,7 +479,7 @@ var proxyYaml []byte
//go:embed deploy/manifests/userspace-proxy.yaml //go:embed deploy/manifests/userspace-proxy.yaml
var userspaceProxyYaml []byte var userspaceProxyYaml []byte
func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, proxySecret, tsConfigHash string, configs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha) (*appsv1.StatefulSet, error) { func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, tsConfigHash string, configs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha) (*appsv1.StatefulSet, error) {
ss := new(appsv1.StatefulSet) ss := new(appsv1.StatefulSet)
if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding
if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil { if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil {
@ -507,6 +513,7 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
"app": sts.ParentResourceUID, "app": sts.ParentResourceUID,
}, },
} }
ss.Spec.Replicas = &sts.Replicas
mak.Set(&pod.Labels, "app", sts.ParentResourceUID) mak.Set(&pod.Labels, "app", sts.ParentResourceUID)
for key, val := range sts.ChildResourceLabels { for key, val := range sts.ChildResourceLabels {
pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod
@ -514,21 +521,34 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
// Generic containerboot configuration options. // Generic containerboot configuration options.
container.Env = append(container.Env, container.Env = append(container.Env,
corev1.EnvVar{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
// Secret is named after the pod.
FieldPath: "metadata.name",
},
},
},
corev1.EnvVar{ corev1.EnvVar{
Name: "TS_KUBE_SECRET", Name: "TS_KUBE_SECRET",
Value: proxySecret, Value: "$(POD_NAME)",
}, },
corev1.EnvVar{ corev1.EnvVar{
// Old tailscaled config key is still used for backwards compatibility. Name: "TS_STATE",
Name: "EXPERIMENTAL_TS_CONFIGFILE_PATH", Value: "kube:$(POD_NAME)",
Value: "/etc/tsconfig/tailscaled",
}, },
corev1.EnvVar{ corev1.EnvVar{
// New style is in the form of cap-<capability-version>.hujson.
Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR", Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR",
Value: "/etc/tsconfig", Value: "/etc/tsconfig/$(POD_NAME)",
}, },
) )
if sts.ServeConfig != nil {
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_SERVE_CONFIG",
Value: "/etc/tsconfig/$(POD_NAME)/serve-config",
})
}
if sts.ForwardClusterTrafficViaL7IngressProxy { if sts.ForwardClusterTrafficViaL7IngressProxy {
container.Env = append(container.Env, corev1.EnvVar{ container.Env = append(container.Env, corev1.EnvVar{
Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS", Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS",
@ -538,27 +558,31 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
// Configure containeboot to run tailscaled with a configfile read from the state Secret. // Configure containeboot to run tailscaled with a configfile read from the state Secret.
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash, tsConfigHash) mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash, tsConfigHash)
configVolume := corev1.Volume{ for i := range sts.Replicas {
Name: "tailscaledconfig", ss.Spec.Template.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{
VolumeSource: corev1.VolumeSource{ Name: fmt.Sprintf("tailscaledconfig-%d", i),
Secret: &corev1.SecretVolumeSource{ VolumeSource: corev1.VolumeSource{
SecretName: proxySecret, Secret: &corev1.SecretVolumeSource{
SecretName: fmt.Sprintf("%s-%d-config", ss.Name, i),
},
}, },
}, })
} container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, configVolume) Name: fmt.Sprintf("tailscaledconfig-%d", i),
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ ReadOnly: true,
Name: "tailscaledconfig", MountPath: fmt.Sprintf("/etc/tsconfig/%s-%d", ss.Name, i),
ReadOnly: true,
MountPath: "/etc/tsconfig",
})
if a.tsFirewallMode != "" {
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_DEBUG_FIREWALL_MODE",
Value: a.tsFirewallMode,
}) })
} }
// for this prototype
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_DEBUG_FIREWALL_MODE",
Value: "iptables",
},
corev1.EnvVar{Name: "TS_HEALTHCHECK_ADDR_PORT",
Value: ":8081",
},
)
pod.Spec.PriorityClassName = a.proxyPriorityClassName pod.Spec.PriorityClassName = a.proxyPriorityClassName
// Ingress/egress proxy configuration options. // Ingress/egress proxy configuration options.
@ -586,25 +610,6 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
Value: sts.TailnetTargetFQDN, Value: sts.TailnetTargetFQDN,
}) })
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetFQDN, sts.TailnetTargetFQDN) mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetFQDN, sts.TailnetTargetFQDN)
} else if sts.ServeConfig != nil {
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_SERVE_CONFIG",
Value: "/etc/tailscaled/serve-config",
})
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "serve-config",
ReadOnly: true,
MountPath: "/etc/tailscaled",
})
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{
Name: "serve-config",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: proxySecret,
Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}},
},
},
})
} }
app, err := appInfoForProxy(sts) app, err := appInfoForProxy(sts)
if err != nil { if err != nil {
@ -618,7 +623,7 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
Value: app, Value: app,
}) })
} }
logger.Debugf("reconciling statefulset %s/%s", ss.GetNamespace(), ss.GetName()) logger.Debugf("reconciling Statefulset %s/%s", ss.GetNamespace(), ss.GetName())
if sts.ProxyClassName != "" { if sts.ProxyClassName != "" {
logger.Debugf("configuring proxy resources with ProxyClass %s", sts.ProxyClassName) logger.Debugf("configuring proxy resources with ProxyClass %s", sts.ProxyClassName)
ss = applyProxyClassToStatefulSet(sts.ProxyClass, ss, sts, logger) ss = applyProxyClassToStatefulSet(sts.ProxyClass, ss, sts, logger)
@ -786,6 +791,14 @@ func readAuthKey(secret *corev1.Secret, key string) (*string, error) {
return origConf.AuthKey, nil return origConf.AuthKey, nil
} }
func hostNameForReplica(hostNamePrefix string, idx int32) *string {
if idx == 0 {
return &hostNamePrefix
}
s := fmt.Sprintf("%s-%d", hostNamePrefix, idx)
return &s
}
// tailscaledConfig takes a proxy config, a newly generated auth key if // tailscaledConfig takes a proxy config, a newly generated auth key if
// generated and a Secret with the previous proxy state and auth key and // generated and a Secret with the previous proxy state and auth key and
// returns tailscaled configuration and a hash of that configuration. // returns tailscaled configuration and a hash of that configuration.
@ -795,13 +808,13 @@ func readAuthKey(secret *corev1.Secret, key string) (*string, error) {
// TODO (irbekrm): remove the legacy config once we no longer need to support // TODO (irbekrm): remove the legacy config once we no longer need to support
// versions older than cap94, // versions older than cap94,
// https://tailscale.com/kb/1236/kubernetes-operator#operator-and-proxies // https://tailscale.com/kb/1236/kubernetes-operator#operator-and-proxies
func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) { func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *corev1.Secret, idx int32) (tailscaledConfigs, error) {
conf := &ipn.ConfigVAlpha{ conf := &ipn.ConfigVAlpha{
Version: "alpha0", Version: "alpha0",
AcceptDNS: "false", AcceptDNS: "false",
AcceptRoutes: "false", // AcceptRoutes defaults to true AcceptRoutes: "false", // AcceptRoutes defaults to true
Locked: "false", Locked: "false",
Hostname: &stsC.Hostname, Hostname: hostNameForReplica(stsC.Hostname, idx),
NoStatefulFiltering: "false", NoStatefulFiltering: "false",
} }
@ -896,7 +909,7 @@ type tailscaledConfigs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha
// thing that changed is operator version (the hash is also exposed to users via // thing that changed is operator version (the hash is also exposed to users via
// an annotation and might be confusing if it changes without the config having // an annotation and might be confusing if it changes without the config having
// changed). // changed).
func tailscaledConfigHash(c tailscaledConfigs) (string, error) { func tailscaledConfigHash(c []tailscaledConfigs) (string, error) {
b, err := json.Marshal(c) b, err := json.Marshal(c)
if err != nil { if err != nil {
return "", fmt.Errorf("error marshalling tailscaled configs: %w", err) return "", fmt.Errorf("error marshalling tailscaled configs: %w", err)

View File

@ -83,11 +83,12 @@ _Appears in:_
| Field | Description | Default | Validation | | Field | Description | Default | Validation |
| --- | --- | --- | --- | | --- | --- | --- | --- |
| `subnetRouter` _[SubnetRouter](#subnetrouter)_ | SubnetRouter defines subnet routes that the Connector node should<br />expose to tailnet. If unset, none are exposed.<br />https://tailscale.com/kb/1019/subnets/ | | |
| `exitNode` _boolean_ | ExitNode defines whether the Connector node should act as a<br />Tailscale exit node. Defaults to false.<br />https://tailscale.com/kb/1103/exit-nodes | | |
| `replicas` _integer_ | | | |
| `tags` _[Tags](#tags)_ | Tags that the Tailscale node will be tagged with.<br />Defaults to [tag:k8s].<br />To autoapprove the subnet routes or exit node defined by a Connector,<br />you can configure Tailscale ACLs to give these tags the necessary<br />permissions.<br />See https://tailscale.com/kb/1337/acl-syntax#autoapprovers.<br />If you specify custom tags here, you must also make the operator an owner of these tags.<br />See https://tailscale.com/kb/1236/kubernetes-operator/#setting-up-the-kubernetes-operator.<br />Tags cannot be changed once a Connector node has been created.<br />Tag values must be in form ^tag:[a-zA-Z][a-zA-Z0-9-]*$. | | Pattern: `^tag:[a-zA-Z][a-zA-Z0-9-]*$` <br />Type: string <br /> | | `tags` _[Tags](#tags)_ | Tags that the Tailscale node will be tagged with.<br />Defaults to [tag:k8s].<br />To autoapprove the subnet routes or exit node defined by a Connector,<br />you can configure Tailscale ACLs to give these tags the necessary<br />permissions.<br />See https://tailscale.com/kb/1337/acl-syntax#autoapprovers.<br />If you specify custom tags here, you must also make the operator an owner of these tags.<br />See https://tailscale.com/kb/1236/kubernetes-operator/#setting-up-the-kubernetes-operator.<br />Tags cannot be changed once a Connector node has been created.<br />Tag values must be in form ^tag:[a-zA-Z][a-zA-Z0-9-]*$. | | Pattern: `^tag:[a-zA-Z][a-zA-Z0-9-]*$` <br />Type: string <br /> |
| `hostname` _[Hostname](#hostname)_ | Hostname is the tailnet hostname that should be assigned to the<br />Connector node. If unset, hostname defaults to <connector<br />name>-connector. Hostname can contain lower case letters, numbers and<br />dashes, it must not start or end with a dash and must be between 2<br />and 63 characters long. | | Pattern: `^[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$` <br />Type: string <br /> | | `hostname` _[Hostname](#hostname)_ | Hostname is the tailnet hostname that should be assigned to the<br />Connector node. If unset, hostname defaults to <connector<br />name>-connector. Hostname can contain lower case letters, numbers and<br />dashes, it must not start or end with a dash and must be between 2<br />and 63 characters long. | | Pattern: `^[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$` <br />Type: string <br /> |
| `proxyClass` _string_ | ProxyClass is the name of the ProxyClass custom resource that<br />contains configuration options that should be applied to the<br />resources created for this Connector. If unset, the operator will<br />create resources with the default configuration. | | | | `proxyClass` _string_ | ProxyClass is the name of the ProxyClass custom resource that<br />contains configuration options that should be applied to the<br />resources created for this Connector. If unset, the operator will<br />create resources with the default configuration. | | |
| `subnetRouter` _[SubnetRouter](#subnetrouter)_ | SubnetRouter defines subnet routes that the Connector node should<br />expose to tailnet. If unset, none are exposed.<br />https://tailscale.com/kb/1019/subnets/ | | |
| `exitNode` _boolean_ | ExitNode defines whether the Connector node should act as a<br />Tailscale exit node. Defaults to false.<br />https://tailscale.com/kb/1103/exit-nodes | | |
#### ConnectorStatus #### ConnectorStatus

View File

@ -57,6 +57,17 @@ type ConnectorList struct {
// ConnectorSpec describes a Tailscale node to be deployed in the cluster. // ConnectorSpec describes a Tailscale node to be deployed in the cluster.
// +kubebuilder:validation:XValidation:rule="has(self.subnetRouter) || self.exitNode == true",message="A Connector needs to be either an exit node or a subnet router, or both." // +kubebuilder:validation:XValidation:rule="has(self.subnetRouter) || self.exitNode == true",message="A Connector needs to be either an exit node or a subnet router, or both."
type ConnectorSpec struct { type ConnectorSpec struct {
// SubnetRouter defines subnet routes that the Connector node should
// expose to tailnet. If unset, none are exposed.
// https://tailscale.com/kb/1019/subnets/
// +optional
SubnetRouter *SubnetRouter `json:"subnetRouter"`
// ExitNode defines whether the Connector node should act as a
// Tailscale exit node. Defaults to false.
// https://tailscale.com/kb/1103/exit-nodes
// +optional
ExitNode bool `json:"exitNode"`
Replicas *int `json:"replicas"`
// Tags that the Tailscale node will be tagged with. // Tags that the Tailscale node will be tagged with.
// Defaults to [tag:k8s]. // Defaults to [tag:k8s].
// To autoapprove the subnet routes or exit node defined by a Connector, // To autoapprove the subnet routes or exit node defined by a Connector,
@ -82,16 +93,6 @@ type ConnectorSpec struct {
// create resources with the default configuration. // create resources with the default configuration.
// +optional // +optional
ProxyClass string `json:"proxyClass,omitempty"` ProxyClass string `json:"proxyClass,omitempty"`
// SubnetRouter defines subnet routes that the Connector node should
// expose to tailnet. If unset, none are exposed.
// https://tailscale.com/kb/1019/subnets/
// +optional
SubnetRouter *SubnetRouter `json:"subnetRouter"`
// ExitNode defines whether the Connector node should act as a
// Tailscale exit node. Defaults to false.
// https://tailscale.com/kb/1103/exit-nodes
// +optional
ExitNode bool `json:"exitNode"`
} }
// SubnetRouter defines subnet routes that should be exposed to tailnet via a // SubnetRouter defines subnet routes that should be exposed to tailnet via a

View File

@ -75,16 +75,21 @@ func (in *ConnectorList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ConnectorSpec) DeepCopyInto(out *ConnectorSpec) { func (in *ConnectorSpec) DeepCopyInto(out *ConnectorSpec) {
*out = *in *out = *in
if in.Tags != nil {
in, out := &in.Tags, &out.Tags
*out = make(Tags, len(*in))
copy(*out, *in)
}
if in.SubnetRouter != nil { if in.SubnetRouter != nil {
in, out := &in.SubnetRouter, &out.SubnetRouter in, out := &in.SubnetRouter, &out.SubnetRouter
*out = new(SubnetRouter) *out = new(SubnetRouter)
(*in).DeepCopyInto(*out) (*in).DeepCopyInto(*out)
} }
if in.Replicas != nil {
in, out := &in.Replicas, &out.Replicas
*out = new(int)
**out = **in
}
if in.Tags != nil {
in, out := &in.Tags, &out.Tags
*out = make(Tags, len(*in))
copy(*out, *in)
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectorSpec. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectorSpec.

View File

@ -372,6 +372,14 @@ func (i *iptablesRunner) AddDNATRule(origDst, dst netip.Addr) error {
return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String()) return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String())
} }
func (i *iptablesRunner) AddDropRule(dst netip.Addr) error {
table := i.getIPTByAddr(dst)
if err := table.Insert("filter", "OUTPUT", 1, "--destination", dst.String(), "-j", "DROP"); err != nil {
return err
}
return table.Insert("filter", "INPUT", 1, "--source", dst.String(), "-j", "DROP")
}
// EnsureSNATForDst sets up firewall to ensure that all traffic aimed for dst, has its source ip set to src: // EnsureSNATForDst sets up firewall to ensure that all traffic aimed for dst, has its source ip set to src:
// - creates a SNAT rule if not already present // - creates a SNAT rule if not already present
// - ensures that any no longer valid SNAT rules for the same dst are removed // - ensures that any no longer valid SNAT rules for the same dst are removed

View File

@ -570,6 +570,8 @@ type NetfilterRunner interface {
// DelMagicsockPortRule removes the rule created by AddMagicsockPortRule, // DelMagicsockPortRule removes the rule created by AddMagicsockPortRule,
// if it exists. // if it exists.
DelMagicsockPortRule(port uint16, network string) error DelMagicsockPortRule(port uint16, network string) error
AddDropRule(dst netip.Addr) error
} }
// New creates a NetfilterRunner, auto-detecting whether to use // New creates a NetfilterRunner, auto-detecting whether to use
@ -692,6 +694,9 @@ func (n *nftablesRunner) HasIPV6NAT() bool {
func (n *nftablesRunner) HasIPV6Filter() bool { func (n *nftablesRunner) HasIPV6Filter() bool {
return n.v6Available return n.v6Available
} }
func (n *nftablesRunner) AddDropRule(addr netip.Addr) error {
return nil
}
// findRule iterates through the rules to find the rule with matching expressions. // findRule iterates through the rules to find the rule with matching expressions.
func findRule(conn *nftables.Conn, rule *nftables.Rule) (*nftables.Rule, error) { func findRule(conn *nftables.Conn, rule *nftables.Rule) (*nftables.Rule, error) {