From c154f3676c8544953b36a17d3b32cbd00314ce49 Mon Sep 17 00:00:00 2001 From: Irbe Krumina Date: Thu, 29 Aug 2024 13:25:13 +0300 Subject: [PATCH] egress HA: WIP --- Makefile | 3 + build_docker.sh | 16 + cmd/containerboot/main.go | 135 +++++++- cmd/k8s-fwegress/deploy.yaml | 58 ++++ cmd/k8s-fwegress/eps.yaml | 11 + cmd/k8s-fwegress/fwegress.go | 253 +++++++++++++++ cmd/k8s-operator/connector.go | 126 ++++---- .../deploy/chart/templates/fwegress-rbac.yaml | 45 +++ .../deploy/chart/templates/operator-rbac.yaml | 4 +- .../deploy/chart/templates/proxy-rbac.yaml | 3 + .../deploy/manifests/fwegress-eps.yaml | 5 + .../deploy/manifests/fwegress.yaml | 51 +++ cmd/k8s-operator/egress-cm.go | 155 +++++++++ cmd/k8s-operator/egress-ha.go | 225 +++++++++++++ cmd/k8s-operator/fwegress-pods.go | 127 ++++++++ cmd/k8s-operator/ingress.go | 105 +++--- cmd/k8s-operator/operator.go | 163 +++------- cmd/k8s-operator/sts.go | 299 ++++++++++-------- cmd/k8s-operator/svc.go | 193 ++++++----- ipn/store/kubestore/store_kube.go | 32 +- k8s-operator/egress-services.go | 20 ++ kube/api.go | 7 + kube/client.go | 28 +- kube/fake_client.go | 3 + util/linuxfw/iptables_runner.go | 5 + util/linuxfw/nftables_runner.go | 49 +++ 26 files changed, 1632 insertions(+), 489 deletions(-) create mode 100644 cmd/k8s-fwegress/deploy.yaml create mode 100644 cmd/k8s-fwegress/eps.yaml create mode 100644 cmd/k8s-fwegress/fwegress.go create mode 100644 cmd/k8s-operator/deploy/chart/templates/fwegress-rbac.yaml create mode 100644 cmd/k8s-operator/deploy/manifests/fwegress-eps.yaml create mode 100644 cmd/k8s-operator/deploy/manifests/fwegress.yaml create mode 100644 cmd/k8s-operator/egress-cm.go create mode 100644 cmd/k8s-operator/egress-ha.go create mode 100644 cmd/k8s-operator/fwegress-pods.go create mode 100644 k8s-operator/egress-services.go diff --git a/Makefile b/Makefile index 98c3d36cc..991a4b734 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,9 @@ depaware: ## Run depaware checks tailscale.com/cmd/k8s-operator \ tailscale.com/cmd/stund +publishfwegress: ## Build and publish k8s-fwegress image to location specified by ${REPO} + TAGS="${TAGS}" REPOS=${REPO} PLATFORM=${PLATFORM} PUSH=true TARGET=k8s-fwegress ./build_docker.sh + buildwindows: ## Build tailscale CLI for windows/amd64 GOOS=windows GOARCH=amd64 ./tool/go install tailscale.com/cmd/tailscale tailscale.com/cmd/tailscaled diff --git a/build_docker.sh b/build_docker.sh index 1cbdc4b9e..3551eec19 100755 --- a/build_docker.sh +++ b/build_docker.sh @@ -77,6 +77,22 @@ case "$TARGET" in --target="${PLATFORM}" \ /usr/local/bin/k8s-nameserver ;; + k8s-fwegress) + DEFAULT_REPOS="tailscale/k8s-fwegress" + REPOS="${REPOS:-${DEFAULT_REPOS}}" + go run github.com/tailscale/mkctr \ + --gopaths="tailscale.com/cmd/k8s-fwegress:/usr/local/bin/k8s-fwegress" \ + --ldflags=" \ + -X tailscale.com/version.longStamp=${VERSION_LONG} \ + -X tailscale.com/version.shortStamp=${VERSION_SHORT} \ + -X tailscale.com/version.gitCommitStamp=${VERSION_GIT_HASH}" \ + --base="${BASE}" \ + --tags="${TAGS}" \ + --repos="${REPOS}" \ + --push="${PUSH}" \ + --target="${PLATFORM}" \ + /usr/local/bin/k8s-fwegress + ;; *) echo "unknown target: $TARGET" exit 1 diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index fdf71c3ea..12ea4c63b 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -81,6 +81,10 @@ // cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy) // as a non-cluster workload on tailnet. // This is only meant to be configured by the Kubernetes operator. +// - TS_EGRESS_SERVICES_CONFIG_PATH: can be set to a path to config for how to route HA egress services. +// If set, containerboot will monitor the filepath, update firewall rules on changes and update +// 'egressServicesStatus' in the ConfigMap. +// - TS_EGRESS_SERVICES_CONFIGMAP_NAME // // When running on Kubernetes, containerboot defaults to storing state in the // "tailscale" kube secret. To store state on local disk instead, set @@ -123,6 +127,7 @@ "tailscale.com/ipn" "tailscale.com/ipn/conffile" kubeutils "tailscale.com/k8s-operator" + "tailscale.com/kube" "tailscale.com/tailcfg" "tailscale.com/types/logger" "tailscale.com/types/ptr" @@ -137,6 +142,121 @@ func newNetfilterRunner(logf logger.Logf) (linuxfw.NetfilterRunner, error) { return linuxfw.New(logf, "") } +// TODO: for now we assume that tailnet IPs of this node won't change. +// TODO: for now we assume an IPv4 cluster +func ensureEgressServiceConfig(ctx context.Context, path, cmName string, nfr linuxfw.NetfilterRunner, tsIPs []netip.Prefix) { + log.Printf("ensureEgressServiceConfig") + var tickChan <-chan time.Time + var eventChan <-chan fsnotify.Event + if w, err := fsnotify.NewWatcher(); err != nil { + log.Printf("ensureEgressServiceConfig: failed to create fsnotify watcher, timer-only mode: %v", err) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + tickChan = ticker.C + } else { + defer w.Close() + if err := w.Add(filepath.Dir(path)); err != nil { + log.Fatalf("ensureEgressServiceConfig: failed to add fsnotify watch: %v", err) + } + eventChan = w.Events + } + + // TODO: for now we assume an IPv4 cluster + var local netip.Addr + for _, pfx := range tsIPs { + if !(pfx.IsSingleIP() && pfx.Addr().Is4()) { + continue + } + local = pfx.Addr() + break + } + + var prevESConfig *kubeutils.EgressServices + f := func() { + svcCfg, err := readEgressServiceConfig(path) + if err != nil { + log.Fatalf("error reading egress service config path %s: %v", path, err) + } + if prevESConfig != nil && reflect.DeepEqual(prevESConfig, svcCfg) { + log.Printf("ensureEgressServiceConfig: nothing changed") + return + } + prevESConfig = svcCfg + // TODO: delete any no-longer needed routes, only apply new routes if they don't already exist + // for each egress service + if svcCfg == nil || len(svcCfg.Services) == 0 { + return + } + statusHint := make(map[string][]string, 0) + for svcName, svcConfig := range svcCfg.Services { + log.Printf("setting up routes for egress service %s", svcName) + // parse tailnet target + dst, err := netip.ParseAddr(svcConfig.TailnetTargetIP) + if err != nil { + log.Fatalf("error parsing tailnet target IP: %v", err) + } + if !dst.Is4() { + log.Fatalf("dst %v is not IPv4 address", dst) + } + statusHint[svcName] = make([]string, 0) + if err := nfr.AddSNATRuleForDst(local, dst); err != nil { + log.Fatalf("error adding SNAT rule: %v", err) + } + + log.Printf("cluster sources: %v", svcConfig.ClusterSources) + + for _, clusterSourceS := range svcConfig.ClusterSources { + if clusterSourceS == "" { + break + } + clusterSource, err := netip.ParseAddr(clusterSourceS) + if err != nil { + log.Fatalf("error parsing cluster source: %q", clusterSourceS) + } + if !clusterSource.Is4() { + log.Fatalf("cluster source %v is not an IPv4 address", clusterSource) + } + if err := nfr.AddDNATRuleWithSrc(clusterSource, dst); err != nil { + log.Fatalf("error adding DNAT rule with src %v dst %v: %v", clusterSource, dst, err) + } + statusHint[svcName] = append(statusHint[svcName], clusterSourceS) + } + } + statusHintBs, err := json.Marshal(statusHint) + if err != nil { + log.Fatalf("error marshalling status hint: %v", err) + } + jsonPatches := []kube.JSONPatch{{Op: "replace", Value: statusHintBs, Path: "/data/statusHint"}} + if err := kc.JSONPatchSecret(ctx, cmName, jsonPatches); err != nil { + log.Fatalf("error updating configmap %s: %v", cmName, err) + } + } + f() + for { + select { + case <-tickChan: + case <-eventChan: + log.Printf("egress service config update event") + f() + } + } +} + +func readEgressServiceConfig(path string) (*kubeutils.EgressServices, error) { + if path == "" { + return nil, nil + } + j, err := os.ReadFile(path) + if err != nil { + return nil, err + } + var sc kubeutils.EgressServices + if err := json.Unmarshal(j, &sc); err != nil { + return nil, err + } + return &sc, nil +} + func main() { log.SetPrefix("boot: ") tailscale.I_Acknowledge_This_API_Is_Unstable = true @@ -166,6 +286,8 @@ func main() { PodIP: defaultEnv("POD_IP", ""), EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false), HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""), + EgressServicesConfigPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""), + EgressServicesConfigMapName: defaultEnv("TS_EGRESS_SERVICES_CONFIGMAP_NAME", ""), } if err := cfg.validate(); err != nil { @@ -585,6 +707,7 @@ func main() { } } if !startupTasksDone { + log.Printf("not startup tasks done") // For containerboot instances that act as TCP // proxies (proxying traffic to an endpoint // passed via one of the env vars that @@ -600,6 +723,10 @@ func main() { // post-auth configuration is done. log.Println("Startup complete, waiting for shutdown signal") startupTasksDone = true + if cfg.EgressServicesConfigMapName != "" && cfg.EgressServicesConfigPath != "" { + log.Println("about to ensure service config") + go ensureEgressServiceConfig(ctx, cfg.EgressServicesConfigPath, cfg.EgressServicesConfigMapName, nfr, addrs) + } // Wait on tailscaled process. It won't // be cleaned up by default when the @@ -1170,8 +1297,10 @@ type settings struct { // PodIP is the IP of the Pod if running in Kubernetes. This is used // when setting up rules to proxy cluster traffic to cluster ingress // target. - PodIP string - HealthCheckAddrPort string + PodIP string + HealthCheckAddrPort string + EgressServicesConfigPath string + EgressServicesConfigMapName string } func (s *settings) validate() error { @@ -1351,7 +1480,7 @@ func isOneStepConfig(cfg *settings) bool { // as an L3 proxy, proxying to an endpoint provided via one of the config env // vars. func isL3Proxy(cfg *settings) bool { - return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress + return cfg.EgressServicesConfigPath != "" || cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress } // hasKubeStateStore returns true if the state must be stored in a Kubernetes diff --git a/cmd/k8s-fwegress/deploy.yaml b/cmd/k8s-fwegress/deploy.yaml new file mode 100644 index 000000000..e0a8b9ffd --- /dev/null +++ b/cmd/k8s-fwegress/deploy.yaml @@ -0,0 +1,58 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: fwegress + namespace: tailscale +spec: + # TODO: experiment with upgrade strategy + replicas: 2 + selector: + matchLabels: + app: fwegress + template: + metadata: + labels: + app: fwegress + spec: + serviceAccount: fwegress + readinessGates: + - conditionType: TailscaleRoutesReady + initContainers: + - name: sysctler + image: tailscale/alpine-base:3.18 + securityContext: + privileged: true + command: ["/bin/sh", "-c"] + args: [sysctl -w net.ipv4.ip_forward=1 && if sysctl net.ipv6.conf.all.forwarding; then sysctl -w net.ipv6.conf.all.forwarding=1; fi] + containers: + - image: gcr.io/csi-test-290908/k8s-fwegress:v0.0.20 + imagePullPolicy: IfNotPresent + name: fwegress + env: + - name: TS_DEBUG_FIREWALL_MODE + value: "auto" + - name: TS_EGRESS_SVC + value: "kuard" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + securityContext: + capabilities: + add: + - NET_ADMIN +--- +apiVersion: v1 +kind: Service +metadata: + name: fwegress + namespace: tailscale +spec: + selector: + app: fwegress + clusterIP: None + type: ClusterIP \ No newline at end of file diff --git a/cmd/k8s-fwegress/eps.yaml b/cmd/k8s-fwegress/eps.yaml new file mode 100644 index 000000000..65f4df90b --- /dev/null +++ b/cmd/k8s-fwegress/eps.yaml @@ -0,0 +1,11 @@ +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 10.12.0.27 +kind: EndpointSlice +metadata: + labels: + tailscale.com/fwegress-ip: "kuard" + name: fwegress-2 + namespace: tailscale \ No newline at end of file diff --git a/cmd/k8s-fwegress/fwegress.go b/cmd/k8s-fwegress/fwegress.go new file mode 100644 index 000000000..0f7584284 --- /dev/null +++ b/cmd/k8s-fwegress/fwegress.go @@ -0,0 +1,253 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + "errors" + "fmt" + "net/netip" + "os" + "strings" + "sync" + + "github.com/go-logr/zapr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + xslices "golang.org/x/exp/slices" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" + logf "sigs.k8s.io/controller-runtime/pkg/log" + kzap "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + tsapi "tailscale.com/k8s-operator/apis/v1alpha1" + "tailscale.com/util/linuxfw" +) + +func main() { + var opts []kzap.Opts + opts = append(opts, kzap.UseDevMode(true), kzap.Level(zapcore.DebugLevel)) + zlog := kzap.NewRaw(opts...).Sugar() + logf.SetLogger(zapr.NewLogger(zlog.Desugar())) + restConfig := config.GetConfigOrDie() + // One EndpointSlice marked by egress service name for the Service + egressSvc := os.Getenv("TS_EGRESS_SVC") + if egressSvc == "" { + zlog.Fatalf("empty egress service name") + } + + podIP := os.Getenv("POD_IP") + if podIP == "" { + zlog.Fatalf("empty POD_IP") + } + podUID := os.Getenv("POD_UID") + if podUID == "" { + zlog.Fatalf("empty Pod UID") + } + labelReq, err := labels.NewRequirement("tailscale.com/fwegress", selection.Equals, []string{egressSvc}) + if err != nil { + zlog.Fatalf("error creating a label requirement: %v", err) + } + labelFilter := cache.ByObject{ + Label: labels.NewSelector().Add(*labelReq), + } + nsFilter := cache.ByObject{ + Namespaces: map[string]cache.Config{"tailscale": {LabelSelector: labelFilter.Label}}, + } + nsFilter1 := cache.ByObject{ + Field: client.InNamespace("tailscale").AsSelector(), + } + mgr, err := manager.New(restConfig, manager.Options{Scheme: tsapi.GlobalScheme, + Cache: cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &discoveryv1.EndpointSlice{}: nsFilter, + &corev1.Pod{}: nsFilter1, + }, + }}) + if err != nil { + zlog.Fatalf("could not create manager: %v", err) + } + // TODO: does this result in setting up unnecessary default firewall + // rules for tailscale? + nfRunner, err := linuxfw.New(zlog.Debugf, "") + if err != nil { + zlog.Fatalf("could not create netfilter runner: %v", err) + } + podName := os.Getenv("POD_NAME") + if podName == "" { + zlog.Fatal("empty Pod name") + } + err = builder. + ControllerManagedBy(mgr). + For(&discoveryv1.EndpointSlice{}). // label filter + Complete(&FWEgressReconciler{ + Client: mgr.GetClient(), + logger: zlog.Named("FWEgress-reconciler"), + nfRunner: nfRunner, + podIP: netip.MustParseAddr(podIP), + podName: podName, + podUID: podUID, + state: &state{routes: make([]netip.Addr, 0)}, + egressSvc: egressSvc, + }) + if err != nil { + zlog.Fatalf("error creating FWEgress reconciler: %v", err) + } + if mgr.Start(signals.SetupSignalHandler()); err != nil { + zlog.Fatalf("error starting controller manager: %v", err) + } +} + +type FWEgressReconciler struct { + client.Client + state *state + logger *zap.SugaredLogger + nfRunner linuxfw.NetfilterRunner + podIP netip.Addr + podName string + podUID string + egressSvc string +} + +// The operator creates the EndpointSlice as that makes it easier to co-ordinate the IP family thing. +func (r *FWEgressReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + r.logger.Debugf("starting reconcile") + defer r.logger.Debugf("reconcile finished") + newRoutes := make([]netip.Addr, 0) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.podName, + Namespace: "tailscale", + }, + } + if err := r.Get(ctx, client.ObjectKeyFromObject(pod), pod); err != nil { + return res, err + } + ready := corev1.ConditionFalse + defer func() { + r.logger.Debugf("setting new routes %v", newRoutes) + r.state.set(newRoutes) + oldPodStatus := pod.Status.DeepCopy() + podSetTailscaleReady(ready, pod) + if !apiequality.Semantic.DeepEqual(pod.Status, oldPodStatus) { + r.logger.Debugf("updating Pod status", newRoutes) + if updateErr := r.Status().Update(ctx, pod); updateErr != nil { + err = errors.Join(err, fmt.Errorf("error updating proxy headless Service metadata: %w", err)) + } + } + // custom pod status condition is only used for readiness check, it is not reliable for internal use because it is possible that container restarted and we lost routes, but pod status is still the same + }() + eps := new(discoveryv1.EndpointSlice) + err = r.Get(ctx, req.NamespacedName, eps) + if apierrors.IsNotFound(err) { + r.logger.Debugf("EndpointSlice not found, assuming it was deleted") + return reconcile.Result{}, nil + } else if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get EndpointSlice: %w", err) + } + egressSvc := eps.Labels["tailscale.com/fwegress"] + if !strings.EqualFold(egressSvc, r.egressSvc) { + r.logger.Debugf("got EndpointSlice for service %s interested in %s", egressSvc, r.egressSvc) + return res, nil + } + + // TODO: this is round robin for iptables, but not nftables- must fix + // nftables + addrs := make([]netip.Addr, 0) + for _, ep := range eps.Endpoints { + if !strings.EqualFold(*ep.Hostname, r.podUID) { + r.logger.Debugf("skip endpoint for fwegress Pod %s", *ep.Hostname) + break + } + for _, addrS := range ep.Addresses { + addr, err := netip.ParseAddr(addrS) + if err != nil { + return res, fmt.Errorf("error parsing EndpointSlice address %s: %v", addrS, err) + } + // duplicates aren't expected + addrs = append(addrs, addr) + } + } + if !r.state.routesNeedUpdate(addrs) { + r.logger.Debugf("routes don't need update") + ready = corev1.ConditionTrue + return + } + r.logger.Debugf("routes need update, new routes are %v", addrs) + + // TODO: also add a mark + // we could mark packets for this service so don't have to reconfigure as these Pods go up and down + if err := r.nfRunner.DNATWithLoadBalancer(r.podIP, addrs); err != nil { + r.logger.Errorf("error updating routes: %v", err) + return res, fmt.Errorf("error setting up load balancer rules: %v", err) + } + for _, addr := range addrs { + if err := r.nfRunner.AddSNATRuleForDst(r.podIP, addr); err != nil { + return res, fmt.Errorf("error setting up SNAT rules %w", err) + } + } + + newRoutes = addrs + ready = corev1.ConditionTrue + return res, nil +} + +type state struct { + sync.RWMutex + routes []netip.Addr +} + +func (s *state) routesNeedUpdate(newRoutes []netip.Addr) bool { + s.Lock() + defer s.Unlock() + if len(newRoutes) != len(s.routes) { + return true + } + // TODO: bart.Table would be more efficient maybe + // Routes should be sorted + for i, r := range s.routes { + if newRoutes[i].Compare(r) != 0 { + return true + } + } + return false +} + +// we need to store routes internally - they can be lost during container +// restarts and container restarts can happen in a way that cannot be tied to +// resource garbage collection etc +func (s *state) set(routes []netip.Addr) { + s.Lock() + s.routes = routes + s.Unlock() +} + +func podSetTailscaleReady(status corev1.ConditionStatus, pod *corev1.Pod) { + newCondition := corev1.PodCondition{ + Type: corev1.PodConditionType("TailscaleRoutesReady"), + Status: status, + } + + idx := xslices.IndexFunc(pod.Status.Conditions, func(cond corev1.PodCondition) bool { + return cond.Type == corev1.PodConditionType("TailscaleRoutesReady") + }) + if idx == -1 { + pod.Status.Conditions = append(pod.Status.Conditions, newCondition) + return + } + pod.Status.Conditions[idx] = newCondition +} diff --git a/cmd/k8s-operator/connector.go b/cmd/k8s-operator/connector.go index 4586dfdbf..3831f8276 100644 --- a/cmd/k8s-operator/connector.go +++ b/cmd/k8s-operator/connector.go @@ -156,77 +156,77 @@ func (a *ConnectorReconciler) Reconcile(ctx context.Context, req reconcile.Reque // maybeProvisionConnector ensures that any new resources required for this // Connector instance are deployed to the cluster. func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logger *zap.SugaredLogger, cn *tsapi.Connector) error { - hostname := cn.Name + "-connector" - if cn.Spec.Hostname != "" { - hostname = string(cn.Spec.Hostname) - } - crl := childResourceLabels(cn.Name, a.tsnamespace, "connector") + // hostname := cn.Name + "-connector" + // if cn.Spec.Hostname != "" { + // hostname = string(cn.Spec.Hostname) + // } + // crl := childResourceLabels(cn.Name, a.tsnamespace, "connector") - proxyClass := cn.Spec.ProxyClass - if proxyClass != "" { - if ready, err := proxyClassIsReady(ctx, proxyClass, a.Client); err != nil { - return fmt.Errorf("error verifying ProxyClass for Connector: %w", err) - } else if !ready { - logger.Infof("ProxyClass %s specified for the Connector, but is not (yet) Ready, waiting..", proxyClass) - return nil - } - } + // proxyClass := cn.Spec.ProxyClass + // if proxyClass != "" { + // if ready, err := proxyClassIsReady(ctx, proxyClass, a.Client); err != nil { + // return fmt.Errorf("error verifying ProxyClass for Connector: %w", err) + // } else if !ready { + // logger.Infof("ProxyClass %s specified for the Connector, but is not (yet) Ready, waiting..", proxyClass) + // return nil + // } + // } - sts := &tailscaleSTSConfig{ - ParentResourceName: cn.Name, - ParentResourceUID: string(cn.UID), - Hostname: hostname, - ChildResourceLabels: crl, - Tags: cn.Spec.Tags.Stringify(), - Connector: &connector{ - isExitNode: cn.Spec.ExitNode, - }, - ProxyClassName: proxyClass, - } + // sts := &tailscaleSTSConfig{ + // ParentResourceName: cn.Name, + // ParentResourceUID: string(cn.UID), + // Hostname: hostname, + // ChildResourceLabels: crl, + // Tags: cn.Spec.Tags.Stringify(), + // Connector: &connector{ + // isExitNode: cn.Spec.ExitNode, + // }, + // ProxyClassName: proxyClass, + // } - if cn.Spec.SubnetRouter != nil && len(cn.Spec.SubnetRouter.AdvertiseRoutes) > 0 { - sts.Connector.routes = cn.Spec.SubnetRouter.AdvertiseRoutes.Stringify() - } + // if cn.Spec.SubnetRouter != nil && len(cn.Spec.SubnetRouter.AdvertiseRoutes) > 0 { + // sts.Connector.routes = cn.Spec.SubnetRouter.AdvertiseRoutes.Stringify() + // } - a.mu.Lock() - if sts.Connector.isExitNode { - a.exitNodes.Add(cn.UID) - } else { - a.exitNodes.Remove(cn.UID) - } - if sts.Connector.routes != "" { - a.subnetRouters.Add(cn.GetUID()) - } else { - a.subnetRouters.Remove(cn.GetUID()) - } - a.mu.Unlock() - gaugeConnectorSubnetRouterResources.Set(int64(a.subnetRouters.Len())) - gaugeConnectorExitNodeResources.Set(int64(a.exitNodes.Len())) - var connectors set.Slice[types.UID] - connectors.AddSlice(a.exitNodes.Slice()) - connectors.AddSlice(a.subnetRouters.Slice()) - gaugeConnectorResources.Set(int64(connectors.Len())) + // a.mu.Lock() + // if sts.Connector.isExitNode { + // a.exitNodes.Add(cn.UID) + // } else { + // a.exitNodes.Remove(cn.UID) + // } + // if sts.Connector.routes != "" { + // a.subnetRouters.Add(cn.GetUID()) + // } else { + // a.subnetRouters.Remove(cn.GetUID()) + // } + // a.mu.Unlock() + // gaugeConnectorSubnetRouterResources.Set(int64(a.subnetRouters.Len())) + // gaugeConnectorExitNodeResources.Set(int64(a.exitNodes.Len())) + // var connectors set.Slice[types.UID] + // connectors.AddSlice(a.exitNodes.Slice()) + // connectors.AddSlice(a.subnetRouters.Slice()) + // gaugeConnectorResources.Set(int64(connectors.Len())) - _, err := a.ssr.Provision(ctx, logger, sts) - if err != nil { - return err - } + // _, err := a.ssr.Provision(ctx, logger, sts) + // if err != nil { + // return err + // } - _, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl) - if err != nil { - return err - } + // _, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl) + // if err != nil { + // return err + // } - if tsHost == "" { - logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth") - // No hostname yet. Wait for the connector pod to auth. - cn.Status.TailnetIPs = nil - cn.Status.Hostname = "" - return nil - } + // if tsHost == "" { + // logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth") + // // No hostname yet. Wait for the connector pod to auth. + // cn.Status.TailnetIPs = nil + // cn.Status.Hostname = "" + // return nil + // } - cn.Status.TailnetIPs = ips - cn.Status.Hostname = tsHost + // cn.Status.TailnetIPs = ips + // cn.Status.Hostname = tsHost return nil } diff --git a/cmd/k8s-operator/deploy/chart/templates/fwegress-rbac.yaml b/cmd/k8s-operator/deploy/chart/templates/fwegress-rbac.yaml new file mode 100644 index 000000000..9be49b4f8 --- /dev/null +++ b/cmd/k8s-operator/deploy/chart/templates/fwegress-rbac.yaml @@ -0,0 +1,45 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: fwegress + namespace: tailscale +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: fwegress + namespace: tailscale +rules: +- apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - pods + - pods/status + verbs: + - get + - list + - watch + - update + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: fwegress + namespace: tailscale +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: fwegress +subjects: + - kind: ServiceAccount + name: fwegress + namespace: tailscale \ No newline at end of file diff --git a/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml b/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml index 9f2a4c2f0..89eb1f1e7 100644 --- a/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml +++ b/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml @@ -48,14 +48,14 @@ metadata: namespace: {{ .Release.Namespace }} rules: - apiGroups: [""] - resources: ["secrets", "serviceaccounts", "configmaps"] + resources: ["secrets", "serviceaccounts", "configmaps", "pods"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] - apiGroups: ["apps"] resources: ["statefulsets", "deployments"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] - apiGroups: ["discovery.k8s.io"] resources: ["endpointslices"] - verbs: ["get", "list", "watch"] + verbs: ["get", "list", "watch", "patch", "update", "create"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml b/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml index 1c15c9119..428818b76 100644 --- a/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml +++ b/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml @@ -16,6 +16,9 @@ rules: - apiGroups: [""] resources: ["secrets"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/k8s-operator/deploy/manifests/fwegress-eps.yaml b/cmd/k8s-operator/deploy/manifests/fwegress-eps.yaml new file mode 100644 index 000000000..b85f1353f --- /dev/null +++ b/cmd/k8s-operator/deploy/manifests/fwegress-eps.yaml @@ -0,0 +1,5 @@ +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice +metadata: + namespace: tailscale \ No newline at end of file diff --git a/cmd/k8s-operator/deploy/manifests/fwegress.yaml b/cmd/k8s-operator/deploy/manifests/fwegress.yaml new file mode 100644 index 000000000..632578009 --- /dev/null +++ b/cmd/k8s-operator/deploy/manifests/fwegress.yaml @@ -0,0 +1,51 @@ +# This file is not a complete manifest, it's a skeleton that the operator embeds +# at build time and then uses to construct Tailscale proxy pods. +apiVersion: apps/v1 +kind: StatefulSet +metadata: + namespace: tailscale +spec: + replicas: 2 # hardcoded for this prototype + template: + metadata: + deletionGracePeriodSeconds: 10 + spec: + serviceAccountName: fwegress + readinessGates: + - conditionType: TailscaleRoutesReady + initContainers: + - name: sysctler + image: tailscale/alpine-base:3.18 + securityContext: + privileged: true + command: ["/bin/sh", "-c"] + args: [sysctl -w net.ipv4.ip_forward=1 && if sysctl net.ipv6.conf.all.forwarding; then sysctl -w net.ipv6.conf.all.forwarding=1; fi] + resources: + requests: + cpu: 1m + memory: 1Mi + containers: + - name: tailscale + image: gcr.io/csi-test-290908/k8s-fwegress:v0.0.22 + imagePullPolicy: IfNotPresent + env: + - name: TS_DEBUG_FIREWALL_MODE + value: "auto" + - name: TS_EGRESS_SVC + value: "kuard" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + securityContext: + capabilities: + add: + - NET_ADMIN diff --git a/cmd/k8s-operator/egress-cm.go b/cmd/k8s-operator/egress-cm.go new file mode 100644 index 000000000..e1c9a3514 --- /dev/null +++ b/cmd/k8s-operator/egress-cm.go @@ -0,0 +1,155 @@ +//go:build !plan9 + +package main + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "strings" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// reconciles egress proxy state secret. +type EgressSvcConfigReconciler struct { + client.Client + logger *zap.SugaredLogger +} + +// For this prototype only (make it determine what has change more intelligently later): +// on any state Secret change: +// - read the service status hints +// - for each status hint: +// - get all Pods for that Service +// - get EndpointSlice for that Service +// - if the status hint suggests that this proxy routes from fwegress Pods to backend, ensure endpoint addresses contain it +// - apply any changes to EndpointSlice +// - TODO: add finalizer to the proxy Secret to ensure cleanup when proxy deleted +func (er *EgressSvcConfigReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + logger := er.logger.With("secret-ns", req.Namespace, "secret-name", req.Name) + logger.Debugf("starting reconcile") + defer logger.Debugf("reconcile finished") + + // request should be for a state Secret + s := new(corev1.Secret) + err = er.Get(ctx, req.NamespacedName, s) + if apierrors.IsNotFound(err) { + // Request object not found, could have been deleted after reconcile request. + logger.Debugf("service not found, assuming it was deleted") + return reconcile.Result{}, nil + } else if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get Secret: %w", err) + } + if !s.DeletionTimestamp.IsZero() { + logger.Debugf("Secret is being deleted") + return + } + + // For now - we reconcile all Secrets in tailscale namespace and ignore those without statusHint + // get status hint + if _, ok := s.Data["statusHint"]; !ok { + logger.Debugf("secret does not have a statusHint field, ignore %+#v", s.Data) + return + } + + statusHint := make(map[string][]string) + if err := json.Unmarshal(s.Data["statusHint"], &statusHint); err != nil { + return res, fmt.Errorf("error unmarshalling status hint: %w\n status hint is %q", err, s.Data["statusHint"]) + } + if len(statusHint) == 0 { + logger.Debugf("no status hint") + return + } + // get the associated Pod + proxyPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: s.Name, + Namespace: "tailscale", + }} + err = er.Get(ctx, client.ObjectKeyFromObject(proxyPod), proxyPod) + if apierrors.IsNotFound(err) { + logger.Debugf("Pod %s does not yet exist", s.Name) + return + } + if err != nil { + return res, fmt.Errorf("error retrieving Pod %s: %w", s.Name, err) + } + // for each of the services in status hint + for svcName, clusterSources := range statusHint { + eps := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: "tailscale", + }, + } + err := er.Get(ctx, client.ObjectKeyFromObject(eps), eps) + if apierrors.IsNotFound(err) { + logger.Debugf("EndpointSlice %s not found", svcName) + return res, nil + } + if err != nil { + return res, fmt.Errorf("error retrieving EndpointSlice %s: %w", svcName, err) + } + fweEgressPods := &corev1.PodList{} + if err := er.List(ctx, fweEgressPods, client.InNamespace("tailscale"), client.MatchingLabels(map[string]string{"app": svcName})); err != nil { + return res, fmt.Errorf("error listing fwegress Pods for %s", svcName) + } + if len(fweEgressPods.Items) == 0 { + logger.Debugf("no fwegress pods for %s yet", svcName) + } + oldEps := eps.DeepCopy() + for _, pod := range fweEgressPods.Items { + podIP := pod.Status.PodIP + podUID := pod.UID + var ep *discoveryv1.Endpoint + for _, maybeEP := range eps.Endpoints { + if strings.EqualFold(string(podUID), *maybeEP.Hostname) { + ep = &maybeEP + break + } + } + if ep == nil { + logger.Debugf("no endpoint created for Pod with uid %s yet", podUID) + break + } + hasIP := false + for _, ip := range clusterSources { + if strings.EqualFold(ip, podIP) { + hasIP = true + break + } + } + if !hasIP { + logger.Debugf("proxy has NOT set up route from Pod %s, do nothing", podUID) + break + } + logger.Debugf("proxy has set up route from Pod %s, ensuring this is refected in EndpointSlice", podUID) + hasProxyPodIP := false + for _, addr := range ep.Addresses { + if strings.EqualFold(addr, proxyPod.Status.PodIP) { + hasProxyPodIP = true + break + } + } + if hasProxyPodIP { + logger.Debugf("proxy IP already present in EndpointSlice endpoint %s", proxyPod.Status.PodIP, podUID) + break + } + logger.Debugf("proxy IP not yet present in EndpointSlice endpoint %s", proxyPod.Status.PodIP, podUID) + ep.Addresses = append(ep.Addresses, proxyPod.Status.PodIP) + } + if !reflect.DeepEqual(oldEps, eps) { + if err := er.Update(ctx, eps); err != nil { + return res, fmt.Errorf("error updating EndpointSlice") + } + } + } + return +} diff --git a/cmd/k8s-operator/egress-ha.go b/cmd/k8s-operator/egress-ha.go new file mode 100644 index 000000000..4ad0b1b89 --- /dev/null +++ b/cmd/k8s-operator/egress-ha.go @@ -0,0 +1,225 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + _ "embed" + "encoding/json" + "fmt" + "os" + "strconv" + "sync" + + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/yaml" + kube "tailscale.com/k8s-operator" + "tailscale.com/types/ptr" + "tailscale.com/util/mak" +) + +// This reconciler reconciles Tailscale egress Services configured to be exposed +// on a pre-existing ProxyGroup. For each it: +// - sets up a forwarding StatefulSet +// - updates egress service config for the ProxyGroup with a mapping of +// forwarding StatefulSet Pod IPs to tailnet target IP. +type EgressHAReconciler struct { + client.Client + ssr *tailscaleSTSReconciler + logger *zap.SugaredLogger + + mu sync.Mutex // protects following + // Temporary for this prototype - the amount of replicas for a StatefulSet + tempCurrentProxyGroupReplicas int +} + +func (er *EgressHAReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + logger := er.logger.With("service-ns", req.Namespace, "service-name", req.Name) + logger.Debugf("starting reconcile") + defer logger.Debugf("reconcile finished") + + svc := new(corev1.Service) + err = er.Get(ctx, req.NamespacedName, svc) + if apierrors.IsNotFound(err) { + // Request object not found, could have been deleted after reconcile request. + logger.Debugf("service not found, assuming it was deleted") + return reconcile.Result{}, nil + } else if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get svc: %w", err) + } + + // For this prototype only IP target is supported + tsIP := tailnetTargetAnnotation(svc) + if tsIP == "" { + return res, nil + } + + // For this prototype only. + // Otherwise services will need tailscale.com/proxy-group label + // ensure a proxy group fronted with a headless Service (?) + if err := er.tempCreateProxyGroup(ctx); err != nil { + return res, fmt.Errorf("error ensuring proxy group: %v", err) + } + egressSvcName := fmt.Sprintf("%s-%s", svc.Name, svc.Namespace) + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "egress-proxies", + Namespace: "tailscale", + }, + } + if err := er.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil { + return res, fmt.Errorf("error getting egress-proxies cm: %w", err) + } + svcCfg := &kube.EgressServices{} + if err := json.Unmarshal([]byte(cm.Data["services"]), svcCfg); err != nil { + return res, fmt.Errorf("error unmarshalling config: %w", err) + } + _, ok := svcCfg.Services[egressSvcName] + if !ok { + mak.Set(&svcCfg.Services, egressSvcName, kube.EgressService{TailnetTargetIP: tsIP}) + } + bs, err := json.Marshal(svcCfg) + if err != nil { + return res, fmt.Errorf("error marhalling service config: %w", err) + } + cm.Data["services"] = string(bs) + if err := er.Update(ctx, cm); err != nil { + return res, fmt.Errorf("error updating configmap: %w", err) + } + if _, err := er.fwegressHeadlessSvc(ctx, egressSvcName); err != nil { + return res, fmt.Errorf("error reconciling headless svc:%w", err) + } + if _, err := er.fwegressSTS(ctx, egressSvcName); err != nil { + return res, fmt.Errorf("error reconciling StatefulSet: %w", err) + } + return +} + +//go:embed deploy/manifests/fwegress.yaml +var fwegressDeploy []byte + +func (er *EgressHAReconciler) fwegressSTS(ctx context.Context, name string) (*appsv1.StatefulSet, error) { + ss := new(appsv1.StatefulSet) + if err := yaml.Unmarshal(fwegressDeploy, &ss); err != nil { + return nil, fmt.Errorf("failed to unmarshal fwegress STS: %w", err) + } + ss.ObjectMeta.Name = name + ss.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": name}, + } + pod := &ss.Spec.Template + pod.Labels = map[string]string{"app": name, "name": name, "tailscale.com/fwegress": name} + return createOrUpdate(ctx, er.Client, "tailscale", ss, func(s *appsv1.StatefulSet) { s.Spec = ss.Spec }) +} + +func (er *EgressHAReconciler) fwegressHeadlessSvc(ctx context.Context, name string) (*corev1.Service, error) { + hsvc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "tailscale", + Labels: map[string]string{"app": name}, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Selector: map[string]string{ + "app": name, + }, + IPFamilyPolicy: ptr.To(corev1.IPFamilyPolicyPreferDualStack), + }, + } + return createOrUpdate(ctx, er.Client, "tailscale", hsvc, func(svc *corev1.Service) { svc.Spec = hsvc.Spec }) +} + +// create or get +func (er *EgressHAReconciler) fwegressEPS(ctx context.Context, name string) (*discoveryv1.EndpointSlice, error) { + eps := &discoveryv1.EndpointSlice{ + AddressType: discoveryv1.AddressTypeIPv4, // for this prototype + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "tailscale", + Labels: map[string]string{"tailscale.com/fwegress": name}, + }, + } + // only create if not exists as the other reconciler will be updating this + err := er.Get(ctx, client.ObjectKeyFromObject(eps), eps) + if apierrors.IsNotFound(err) { + if err := er.Create(ctx, eps); err != nil { + return nil, fmt.Errorf("error creating Endpointslice: %w", err) + } + } + if err != nil { + return nil, fmt.Errorf("error getting EndpointSlice: %w", err) + } + return eps, nil +} + +func (er *EgressHAReconciler) tempCreateProxyGroup(ctx context.Context) error { + er.mu.Lock() + defer er.mu.Unlock() + replicas := defaultIntEnv("REPLICAS", 3) + if replicas == er.tempCurrentProxyGroupReplicas { + er.logger.Debugf("Proxy group with %d replicas already exists", replicas) + } + er.logger.Debugf("Wants a proxy group with %d replicas, currently has %d replicas, updating", replicas, er.tempCurrentProxyGroupReplicas) + conf := &tailscaleSTSConfig{ + name: "egress-proxies", + replicas: int32(replicas), + } + if err := er.createConfigMap(ctx, "egress-proxies"); err != nil { + return fmt.Errorf("error creating ConfigMap: %w", err) + } + if _, err := er.ssr.Provision(ctx, er.logger, conf); err != nil { + return fmt.Errorf("error provision proxy group: %w", err) + } + er.tempCurrentProxyGroupReplicas = replicas + return nil +} + +// create if not exists only, no update as another reconciler updates spec +// TODO: SSA +func (er *EgressHAReconciler) createConfigMap(ctx context.Context, name string) error { + cfg := kube.EgressServices{ + Version: "v1alpha1", + } + cfgBS, err := json.Marshal(cfg) + if err != nil { + return fmt.Errorf("error marshalling config: %w", err) + } + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "tailscale", + Labels: map[string]string{"tailscale.com/proxy-group": name}, + }, + Data: map[string]string{"services": string(cfgBS)}, + } + err = er.Get(ctx, client.ObjectKeyFromObject(cm), cm) + if apierrors.IsNotFound(err) { + return er.Create(ctx, cm) + } + if err != nil { + return fmt.Errorf("error creating ConfigMap: %w", err) + } + return nil +} + +// defaultEnv returns the value of the given envvar name, or defVal if +// unset. +func defaultIntEnv(name string, defVal int) int { + v := os.Getenv(name) + i, err := strconv.Atoi(v) + if err != nil { + return defVal + } + return i +} diff --git a/cmd/k8s-operator/fwegress-pods.go b/cmd/k8s-operator/fwegress-pods.go new file mode 100644 index 000000000..1d81e31c9 --- /dev/null +++ b/cmd/k8s-operator/fwegress-pods.go @@ -0,0 +1,127 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "strings" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + kube "tailscale.com/k8s-operator" + "tailscale.com/types/ptr" +) + +// reconciles fwegress Pods +type FWEgressReconciler struct { + client.Client + logger *zap.SugaredLogger +} + +func (er *FWEgressReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + logger := er.logger.With("service-ns", req.Namespace, "service-name", req.Name) + logger.Debugf("starting reconcile") + defer logger.Debugf("reconcile finished") + + p := new(corev1.Pod) + err = er.Get(ctx, req.NamespacedName, p) + if apierrors.IsNotFound(err) { + // Request object not found, could have been deleted after reconcile request. + logger.Debugf("Pod not found, assuming it was deleted") + return reconcile.Result{}, nil + } else if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get pod: %w", err) + } + if !p.DeletionTimestamp.IsZero() { + logger.Debugf("Pod is being deleted") + return + } + egressSvcName := p.Labels["tailscale.com/fwegress"] + if egressSvcName == "" { + logger.Debugf("[unexpected] Pod is not for egress service") + } + eps, err := er.fwegressEPS(ctx, egressSvcName) + if err != nil { + return res, fmt.Errorf("error ensuring EndpointSlice: %w", err) + } + oldEndpoints := eps.DeepCopy() + found := false + for _, e := range eps.Endpoints { + if strings.EqualFold(*e.Hostname, string(p.UID)) { + found = true + break + } + } + if !found { + eps.Endpoints = append(eps.Endpoints, discoveryv1.Endpoint{ + Hostname: ptr.To(string(p.UID)), + }) + } + if !reflect.DeepEqual(oldEndpoints, eps) { + if err := er.Update(ctx, eps); err != nil { + return res, fmt.Errorf("error updating EndpointSlice: %w", err) + } + } + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "egress-proxies", + Namespace: "tailscale", + }, + } + if err := er.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil { + return res, fmt.Errorf("error getting egress-proxies cm: %w", err) + } + svcCfg := &kube.EgressServices{} + if err := json.Unmarshal([]byte(cm.Data["services"]), svcCfg); err != nil { + return res, fmt.Errorf("error unmarshalling config: %w", err) + } + found = false + egSvc := svcCfg.Services[egressSvcName] + for _, ip := range egSvc.ClusterSources { + if strings.EqualFold(ip, p.Status.PodIP) { + found = true + break + } + } + if !found { + egSvc.ClusterSources = append(egSvc.ClusterSources, p.Status.PodIP) + svcCfg.Services[egressSvcName] = egSvc + if err := er.Update(ctx, cm); err != nil { + return res, fmt.Errorf("error updating ConfigMap: %w", err) + } + } + return res, nil +} + +func (er *FWEgressReconciler) fwegressEPS(ctx context.Context, name string) (*discoveryv1.EndpointSlice, error) { + eps := &discoveryv1.EndpointSlice{ + AddressType: discoveryv1.AddressTypeIPv4, // for this prototype + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "tailscale", + Labels: map[string]string{"tailscale.com/fwegress": name}, + }, + } + // only create if not exists as the other reconciler will be updating this + err := er.Get(ctx, client.ObjectKeyFromObject(eps), eps) + if apierrors.IsNotFound(err) { + if err := er.Create(ctx, eps); err != nil { + return nil, fmt.Errorf("error creating Endpointslice: %w", err) + } + } + if err != nil { + return nil, fmt.Errorf("error getting EndpointSlice: %w", err) + } + return eps, nil +} diff --git a/cmd/k8s-operator/ingress.go b/cmd/k8s-operator/ingress.go index badc1e7a4..a7f804773 100644 --- a/cmd/k8s-operator/ingress.go +++ b/cmd/k8s-operator/ingress.go @@ -9,7 +9,6 @@ "context" "fmt" "slices" - "strings" "sync" "github.com/pkg/errors" @@ -249,63 +248,63 @@ func (a *IngressReconciler) maybeProvision(ctx context.Context, logger *zap.Suga return nil } - crl := childResourceLabels(ing.Name, ing.Namespace, "ingress") - var tags []string - if tstr, ok := ing.Annotations[AnnotationTags]; ok { - tags = strings.Split(tstr, ",") - } - hostname := ing.Namespace + "-" + ing.Name + "-ingress" - if tlsHost != "" { - hostname, _, _ = strings.Cut(tlsHost, ".") - } + // crl := childResourceLabels(ing.Name, ing.Namespace, "ingress") + // var tags []string + // if tstr, ok := ing.Annotations[AnnotationTags]; ok { + // tags = strings.Split(tstr, ",") + // } + // hostname := ing.Namespace + "-" + ing.Name + "-ingress" + // if tlsHost != "" { + // hostname, _, _ = strings.Cut(tlsHost, ".") + // } - sts := &tailscaleSTSConfig{ - Hostname: hostname, - ParentResourceName: ing.Name, - ParentResourceUID: string(ing.UID), - ServeConfig: sc, - Tags: tags, - ChildResourceLabels: crl, - ProxyClassName: proxyClass, - } + // sts := &tailscaleSTSConfig{ + // Hostname: hostname, + // ParentResourceName: ing.Name, + // ParentResourceUID: string(ing.UID), + // ServeConfig: sc, + // Tags: tags, + // ChildResourceLabels: crl, + // ProxyClassName: proxyClass, + // } - if val := ing.GetAnnotations()[AnnotationExperimentalForwardClusterTrafficViaL7IngresProxy]; val == "true" { - sts.ForwardClusterTrafficViaL7IngressProxy = true - } + // if val := ing.GetAnnotations()[AnnotationExperimentalForwardClusterTrafficViaL7IngresProxy]; val == "true" { + // sts.ForwardClusterTrafficViaL7IngressProxy = true + // } - if _, err := a.ssr.Provision(ctx, logger, sts); err != nil { - return fmt.Errorf("failed to provision: %w", err) - } + // if _, err := a.ssr.Provision(ctx, logger, sts); err != nil { + // return fmt.Errorf("failed to provision: %w", err) + // } - _, tsHost, _, err := a.ssr.DeviceInfo(ctx, crl) - if err != nil { - return fmt.Errorf("failed to get device ID: %w", err) - } - if tsHost == "" { - logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth") - // No hostname yet. Wait for the proxy pod to auth. - ing.Status.LoadBalancer.Ingress = nil - if err := a.Status().Update(ctx, ing); err != nil { - return fmt.Errorf("failed to update ingress status: %w", err) - } - return nil - } + // _, tsHost, _, err := a.ssr.DeviceInfo(ctx, crl) + // if err != nil { + // return fmt.Errorf("failed to get device ID: %w", err) + // } + // if tsHost == "" { + // logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth") + // // No hostname yet. Wait for the proxy pod to auth. + // ing.Status.LoadBalancer.Ingress = nil + // if err := a.Status().Update(ctx, ing); err != nil { + // return fmt.Errorf("failed to update ingress status: %w", err) + // } + // return nil + // } - logger.Debugf("setting ingress hostname to %q", tsHost) - ing.Status.LoadBalancer.Ingress = []networkingv1.IngressLoadBalancerIngress{ - { - Hostname: tsHost, - Ports: []networkingv1.IngressPortStatus{ - { - Protocol: "TCP", - Port: 443, - }, - }, - }, - } - if err := a.Status().Update(ctx, ing); err != nil { - return fmt.Errorf("failed to update ingress status: %w", err) - } + // logger.Debugf("setting ingress hostname to %q", tsHost) + // ing.Status.LoadBalancer.Ingress = []networkingv1.IngressLoadBalancerIngress{ + // { + // Hostname: tsHost, + // Ports: []networkingv1.IngressPortStatus{ + // { + // Protocol: "TCP", + // Port: 443, + // }, + // }, + // }, + // } + // if err := a.Status().Update(ctx, ing); err != nil { + // return fmt.Errorf("failed to update ingress status: %w", err) + // } return nil } diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index 18665bd8f..163ac3676 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -40,7 +40,6 @@ "tailscale.com/ipn/store/kubestore" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/tsnet" - "tailscale.com/tstime" "tailscale.com/types/logger" "tailscale.com/version" ) @@ -240,6 +239,7 @@ func runReconcilers(opts reconcilerOpts) { &appsv1.StatefulSet{}: nsFilter, &appsv1.Deployment{}: nsFilter, &discoveryv1.EndpointSlice{}: nsFilter, + &corev1.Pod{}: nsFilter, }, }, Scheme: tsapi.GlobalScheme, @@ -249,13 +249,13 @@ func runReconcilers(opts reconcilerOpts) { startlog.Fatalf("could not create manager: %v", err) } - svcFilter := handler.EnqueueRequestsFromMapFunc(serviceHandler) - svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc")) - // If a ProxyClass changes, enqueue all Services labeled with that - // ProxyClass's name. - proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog)) + // svcFilter := handler.EnqueueRequestsFromMapFunc(serviceHandler) + // svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc")) + // // If a ProxyClass changes, enqueue all Services labeled with that + // // ProxyClass's name. + // proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog)) - eventRecorder := mgr.GetEventRecorderFor("tailscale-operator") + // eventRecorder := mgr.GetEventRecorderFor("tailscale-operator") ssr := &tailscaleSTSReconciler{ Client: mgr.GetClient(), tsnetServer: opts.tsServer, @@ -268,125 +268,37 @@ func runReconcilers(opts reconcilerOpts) { } err = builder. ControllerManagedBy(mgr). - Named("service-reconciler"). - Watches(&corev1.Service{}, svcFilter). - Watches(&appsv1.StatefulSet{}, svcChildFilter). - Watches(&corev1.Secret{}, svcChildFilter). - Watches(&tsapi.ProxyClass{}, proxyClassFilterForSvc). - Complete(&ServiceReconciler{ - ssr: ssr, - Client: mgr.GetClient(), - logger: opts.log.Named("service-reconciler"), - isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer, - recorder: eventRecorder, - tsNamespace: opts.tailscaleNamespace, - clock: tstime.DefaultClock{}, - proxyDefaultClass: opts.proxyDefaultClass, + Named("ha-egress-svc-reconciler"). + Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(egressHAServiceFilter)). + Complete(&EgressHAReconciler{ + Client: mgr.GetClient(), + ssr: ssr, + logger: opts.log.Named("egress-ha-svc-reconciler"), }) if err != nil { - startlog.Fatalf("could not create service reconciler: %v", err) + startlog.Fatalf("could not create egress-ha service reconciler: %v", err) } - ingressChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("ingress")) - // If a ProxyClassChanges, enqueue all Ingresses labeled with that - // ProxyClass's name. - proxyClassFilterForIngress := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForIngress(mgr.GetClient(), startlog)) - // Enque Ingress if a managed Service or backend Service associated with a tailscale Ingress changes. - svcHandlerForIngress := handler.EnqueueRequestsFromMapFunc(serviceHandlerForIngress(mgr.GetClient(), startlog)) err = builder. ControllerManagedBy(mgr). - For(&networkingv1.Ingress{}). - Watches(&appsv1.StatefulSet{}, ingressChildFilter). - Watches(&corev1.Secret{}, ingressChildFilter). - Watches(&corev1.Service{}, svcHandlerForIngress). - Watches(&tsapi.ProxyClass{}, proxyClassFilterForIngress). - Complete(&IngressReconciler{ - ssr: ssr, - recorder: eventRecorder, - Client: mgr.GetClient(), - logger: opts.log.Named("ingress-reconciler"), - proxyDefaultClass: opts.proxyDefaultClass, + Named("egress-ha-state-reconciler"). + For(&corev1.Secret{}). + Complete(&EgressSvcConfigReconciler{ + Client: mgr.GetClient(), + logger: opts.log.Named("egress-ha-state-reconciler"), }) if err != nil { - startlog.Fatalf("could not create ingress reconciler: %v", err) + startlog.Fatalf("could not create egress-ha-state service reconciler: %v", err) } - - connectorFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("connector")) - // If a ProxyClassChanges, enqueue all Connectors that have - // .spec.proxyClass set to the name of this ProxyClass. - proxyClassFilterForConnector := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForConnector(mgr.GetClient(), startlog)) - err = builder.ControllerManagedBy(mgr). - For(&tsapi.Connector{}). - Watches(&appsv1.StatefulSet{}, connectorFilter). - Watches(&corev1.Secret{}, connectorFilter). - Watches(&tsapi.ProxyClass{}, proxyClassFilterForConnector). - Complete(&ConnectorReconciler{ - ssr: ssr, - recorder: eventRecorder, - Client: mgr.GetClient(), - logger: opts.log.Named("connector-reconciler"), - clock: tstime.DefaultClock{}, + err = builder. + ControllerManagedBy(mgr). + Named("egress-fw-pods-reconciler"). + Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(fwegressPodFilter)). + Complete(&FWEgressReconciler{ + Client: mgr.GetClient(), + logger: opts.log.Named("fwegress-pod-reconciler"), }) if err != nil { - startlog.Fatalf("could not create connector reconciler: %v", err) - } - // TODO (irbekrm): switch to metadata-only watches for resources whose - // spec we don't need to inspect to reduce memory consumption. - // https://github.com/kubernetes-sigs/controller-runtime/issues/1159 - nameserverFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("nameserver")) - err = builder.ControllerManagedBy(mgr). - For(&tsapi.DNSConfig{}). - Watches(&appsv1.Deployment{}, nameserverFilter). - Watches(&corev1.ConfigMap{}, nameserverFilter). - Watches(&corev1.Service{}, nameserverFilter). - Watches(&corev1.ServiceAccount{}, nameserverFilter). - Complete(&NameserverReconciler{ - recorder: eventRecorder, - tsNamespace: opts.tailscaleNamespace, - Client: mgr.GetClient(), - logger: opts.log.Named("nameserver-reconciler"), - clock: tstime.DefaultClock{}, - }) - if err != nil { - startlog.Fatalf("could not create nameserver reconciler: %v", err) - } - err = builder.ControllerManagedBy(mgr). - For(&tsapi.ProxyClass{}). - Complete(&ProxyClassReconciler{ - Client: mgr.GetClient(), - recorder: eventRecorder, - logger: opts.log.Named("proxyclass-reconciler"), - clock: tstime.DefaultClock{}, - }) - if err != nil { - startlog.Fatal("could not create proxyclass reconciler: %v", err) - } - logger := startlog.Named("dns-records-reconciler-event-handlers") - // On EndpointSlice events, if it is an EndpointSlice for an - // ingress/egress proxy headless Service, reconcile the headless - // Service. - dnsRREpsOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerEndpointSliceHandler) - // On DNSConfig changes, reconcile all headless Services for - // ingress/egress proxies in operator namespace. - dnsRRDNSConfigOpts := handler.EnqueueRequestsFromMapFunc(enqueueAllIngressEgressProxySvcsInNS(opts.tailscaleNamespace, mgr.GetClient(), logger)) - // On Service events, if it is an ingress/egress proxy headless Service, reconcile it. - dnsRRServiceOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerServiceHandler) - // On Ingress events, if it is a tailscale Ingress or if tailscale is the default ingress controller, reconcile the proxy - // headless Service. - dnsRRIngressOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerIngressHandler(opts.tailscaleNamespace, opts.proxyActAsDefaultLoadBalancer, mgr.GetClient(), logger)) - err = builder.ControllerManagedBy(mgr). - Named("dns-records-reconciler"). - Watches(&corev1.Service{}, dnsRRServiceOpts). - Watches(&networkingv1.Ingress{}, dnsRRIngressOpts). - Watches(&discoveryv1.EndpointSlice{}, dnsRREpsOpts). - Watches(&tsapi.DNSConfig{}, dnsRRDNSConfigOpts). - Complete(&dnsRecordsReconciler{ - Client: mgr.GetClient(), - tsNamespace: opts.tailscaleNamespace, - logger: opts.log.Named("dns-records-reconciler"), - isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer, - }) - if err != nil { - startlog.Fatalf("could not create DNS records reconciler: %v", err) + startlog.Fatalf("could not create fwegress-pod reconciler: %v", err) } startlog.Infof("Startup complete, operator running, version: %s", version.Long()) if err := mgr.Start(signals.SetupSignalHandler()); err != nil { @@ -659,6 +571,25 @@ func serviceHandlerForIngress(cl client.Client, logger *zap.SugaredLogger) handl } } +func egressHAServiceFilter(_ context.Context, o client.Object) []reconcile.Request { + if o.GetAnnotations()["tailscale.com/tailnet-ip"] != "" { + return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(o)}} + } + return nil +} +func fwegressPodFilter(_ context.Context, o client.Object) []reconcile.Request { + if o.GetLabels()["tailscale.com/fwegress"] != "" { + return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(o)}} + } + return nil +} +func egressProxyStateSecretFilter(_ context.Context, o client.Object) []reconcile.Request { + if o.GetLabels()["tailscale.com/proxy-group-egress"] != "" { + return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(o)}} + } + return nil +} + func serviceHandler(_ context.Context, o client.Object) []reconcile.Request { if isManagedByType(o, "svc") { // If this is a Service managed by a Service we want to enqueue its parent diff --git a/cmd/k8s-operator/sts.go b/cmd/k8s-operator/sts.go index 17cc047d0..4a6a42c52 100644 --- a/cmd/k8s-operator/sts.go +++ b/cmd/k8s-operator/sts.go @@ -31,7 +31,6 @@ "tailscale.com/ipn" tsoperator "tailscale.com/k8s-operator" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" - "tailscale.com/net/netutil" "tailscale.com/tailcfg" "tailscale.com/types/opt" "tailscale.com/types/ptr" @@ -101,27 +100,29 @@ ) type tailscaleSTSConfig struct { - ParentResourceName string - ParentResourceUID string - ChildResourceLabels map[string]string + // ParentResourceName string + // ParentResourceUID string + // ChildResourceLabels map[string]string - ServeConfig *ipn.ServeConfig // if serve config is set, this is a proxy for Ingress - ClusterTargetIP string // ingress target IP - ClusterTargetDNSName string // ingress target DNS name - // If set to true, operator should configure containerboot to forward - // cluster traffic via the proxy set up for Kubernetes Ingress. - ForwardClusterTrafficViaL7IngressProxy bool + // ServeConfig *ipn.ServeConfig // if serve config is set, this is a proxy for Ingress + // ClusterTargetIP string // ingress target IP + // ClusterTargetDNSName string // ingress target DNS name + // // If set to true, operator should configure containerboot to forward + // // cluster traffic via the proxy set up for Kubernetes Ingress. + // ForwardClusterTrafficViaL7IngressProxy bool - TailnetTargetIP string // egress target IP + // TailnetTargetIP string // egress target IP - TailnetTargetFQDN string // egress target FQDN + // TailnetTargetFQDN string // egress target FQDN + replicas int32 + name string - Hostname string - Tags []string // if empty, use defaultTags + // Hostname string + Tags []string // if empty, use defaultTags // Connector specifies a configuration of a Connector instance if that's // what this StatefulSet should be created for. - Connector *connector + // Connector *connector ProxyClassName string // name of ProxyClass if one needs to be applied to the proxy @@ -282,17 +283,16 @@ func statefulSetNameBase(parent string) string { } func (a *tailscaleSTSReconciler) reconcileHeadlessService(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig) (*corev1.Service, error) { - nameBase := statefulSetNameBase(sts.ParentResourceName) hsvc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: nameBase, - Namespace: a.operatorNamespace, - Labels: sts.ChildResourceLabels, + Name: sts.name, + Namespace: a.operatorNamespace, + Labels: map[string]string{"app": sts.name}, }, Spec: corev1.ServiceSpec{ ClusterIP: "None", Selector: map[string]string{ - "app": sts.ParentResourceUID, + "app": sts.name, }, IPFamilyPolicy: ptr.To(corev1.IPFamilyPolicyPreferDualStack), }, @@ -307,9 +307,9 @@ func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger * // Hardcode a -0 suffix so that in future, if we support // multiple StatefulSet replicas, we can provision -N for // those. - Name: hsvc.Name + "-0", + Name: hsvc.Name, Namespace: a.operatorNamespace, - Labels: stsC.ChildResourceLabels, + Labels: map[string]string{"name": stsC.name}, }, } var orig *corev1.Secret // unmodified copy of secret @@ -325,7 +325,7 @@ func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger * // Initially it contains only tailscaled config, but when the // proxy starts, it will also store there the state, certs and // ACME account key. - sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, stsC.ChildResourceLabels) + sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, map[string]string{"name": stsC.name}) if err != nil { return "", "", nil, err } @@ -371,13 +371,13 @@ func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger * } } - if stsC.ServeConfig != nil { - j, err := json.Marshal(stsC.ServeConfig) - if err != nil { - return "", "", nil, err - } - mak.Set(&secret.StringData, "serve-config", string(j)) - } + // if stsC.ServeConfig != nil { + // j, err := json.Marshal(stsC.ServeConfig) + // if err != nil { + // return "", "", nil, err + // } + // mak.Set(&secret.StringData, "serve-config", string(j)) + // } if orig != nil { logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig)) @@ -445,7 +445,7 @@ func (a *tailscaleSTSReconciler) newAuthKey(ctx context.Context, tags []string) caps := tailscale.KeyCapabilities{ Devices: tailscale.KeyDeviceCapabilities{ Create: tailscale.KeyDeviceCreateCapabilities{ - Reusable: false, + Reusable: true, Preauthorized: true, Tags: tags, }, @@ -467,22 +467,24 @@ func (a *tailscaleSTSReconciler) newAuthKey(ctx context.Context, tags []string) func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, proxySecret, tsConfigHash string, configs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha) (*appsv1.StatefulSet, error) { ss := new(appsv1.StatefulSet) - if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding - if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil { - return nil, fmt.Errorf("failed to unmarshal userspace proxy spec: %v", err) - } - } else { - if err := yaml.Unmarshal(proxyYaml, &ss); err != nil { - return nil, fmt.Errorf("failed to unmarshal proxy spec: %w", err) - } - for i := range ss.Spec.Template.Spec.InitContainers { - c := &ss.Spec.Template.Spec.InitContainers[i] - if c.Name == "sysctler" { - c.Image = a.proxyImage - break - } + // if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding + // if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil { + // return nil, fmt.Errorf("failed to unmarshal userspace proxy spec: %v", err) + // } + // } else { + if err := yaml.Unmarshal(proxyYaml, &ss); err != nil { + return nil, fmt.Errorf("failed to unmarshal proxy spec: %w", err) + } + for i := range ss.Spec.Template.Spec.InitContainers { + c := &ss.Spec.Template.Spec.InitContainers[i] + if c.Name == "sysctler" { + c.Image = a.proxyImage + break } } + // } + + ss.Spec.Replicas = ptr.To(sts.replicas) pod := &ss.Spec.Template container := &pod.Spec.Containers[0] container.Image = a.proxyImage @@ -490,25 +492,29 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S Name: headlessSvc.Name, Namespace: a.operatorNamespace, } - for key, val := range sts.ChildResourceLabels { - mak.Set(&ss.ObjectMeta.Labels, key, val) - } + mak.Set(&ss.ObjectMeta.Labels, "tailscale.com/proxy-group", "egress-ha") ss.Spec.ServiceName = headlessSvc.Name ss.Spec.Selector = &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": sts.ParentResourceUID, + "app": sts.name, }, } - mak.Set(&pod.Labels, "app", sts.ParentResourceUID) - for key, val := range sts.ChildResourceLabels { - pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod - } + mak.Set(&pod.Labels, "app", sts.name) + mak.Set(&pod.Labels, "name", sts.name) + mak.Set(&pod.Labels, "tailscale.com/proxy-group", "egress-ha") + // for key, val := range sts.ChildResourceLabels { + // pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod + // } // Generic containerboot configuration options. container.Env = append(container.Env, corev1.EnvVar{ - Name: "TS_KUBE_SECRET", - Value: proxySecret, + Name: "TS_KUBE_SECRET", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, }, corev1.EnvVar{ // Old tailscaled config key is still used for backwards compatibility. @@ -520,16 +526,29 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR", Value: "/etc/tsconfig", }, + corev1.EnvVar{ + Name: "TS_EGRESS_SERVICES_CONFIGMAP_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + corev1.EnvVar{ + Name: "TS_EGRESS_SERVICES_CONFIG_PATH", + Value: "/etc/egress-proxies/services", + }, ) - if sts.ForwardClusterTrafficViaL7IngressProxy { - container.Env = append(container.Env, corev1.EnvVar{ - Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS", - Value: "true", - }) - } + // if sts.ForwardClusterTrafficViaL7IngressProxy { + // container.Env = append(container.Env, corev1.EnvVar{ + // Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS", + // Value: "true", + // }) + // } // Configure containeboot to run tailscaled with a configfile read from the state Secret. mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash, tsConfigHash) + // Tailscaled config configVolume := corev1.Volume{ Name: "tailscaledconfig", VolumeSource: corev1.VolumeSource{ @@ -545,6 +564,22 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S MountPath: "/etc/tsconfig", }) + // Egress services config + egressSvcCfg := corev1.Volume{ + Name: "egress-proxies", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "egress-proxies"}, + }, + }, + } + pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, egressSvcCfg) + container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ + Name: "egress-proxies", + ReadOnly: true, + MountPath: "/etc/egress-proxies", + }) + if a.tsFirewallMode != "" { container.Env = append(container.Env, corev1.EnvVar{ Name: "TS_DEBUG_FIREWALL_MODE", @@ -554,50 +589,50 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S pod.Spec.PriorityClassName = a.proxyPriorityClassName // Ingress/egress proxy configuration options. - if sts.ClusterTargetIP != "" { - container.Env = append(container.Env, corev1.EnvVar{ - Name: "TS_DEST_IP", - Value: sts.ClusterTargetIP, - }) - mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterIP, sts.ClusterTargetIP) - } else if sts.ClusterTargetDNSName != "" { - container.Env = append(container.Env, corev1.EnvVar{ - Name: "TS_EXPERIMENTAL_DEST_DNS_NAME", - Value: sts.ClusterTargetDNSName, - }) - mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterDNSName, sts.ClusterTargetDNSName) - } else if sts.TailnetTargetIP != "" { - container.Env = append(container.Env, corev1.EnvVar{ - Name: "TS_TAILNET_TARGET_IP", - Value: sts.TailnetTargetIP, - }) - mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetIP, sts.TailnetTargetIP) - } else if sts.TailnetTargetFQDN != "" { - container.Env = append(container.Env, corev1.EnvVar{ - Name: "TS_TAILNET_TARGET_FQDN", - Value: sts.TailnetTargetFQDN, - }) - mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetFQDN, sts.TailnetTargetFQDN) - } else if sts.ServeConfig != nil { - container.Env = append(container.Env, corev1.EnvVar{ - Name: "TS_SERVE_CONFIG", - Value: "/etc/tailscaled/serve-config", - }) - container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ - Name: "serve-config", - ReadOnly: true, - MountPath: "/etc/tailscaled", - }) - pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{ - Name: "serve-config", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: proxySecret, - Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}}, - }, - }, - }) - } + // if sts.ClusterTargetIP != "" { + // container.Env = append(container.Env, corev1.EnvVar{ + // Name: "TS_DEST_IP", + // Value: sts.ClusterTargetIP, + // }) + // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterIP, sts.ClusterTargetIP) + // } else if sts.ClusterTargetDNSName != "" { + // container.Env = append(container.Env, corev1.EnvVar{ + // Name: "TS_EXPERIMENTAL_DEST_DNS_NAME", + // Value: sts.ClusterTargetDNSName, + // }) + // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterDNSName, sts.ClusterTargetDNSName) + // } else if sts.TailnetTargetIP != "" { + // container.Env = append(container.Env, corev1.EnvVar{ + // Name: "TS_TAILNET_TARGET_IP", + // Value: sts.TailnetTargetIP, + // }) + // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetIP, sts.TailnetTargetIP) + // } else if sts.TailnetTargetFQDN != "" { + // container.Env = append(container.Env, corev1.EnvVar{ + // Name: "TS_TAILNET_TARGET_FQDN", + // Value: sts.TailnetTargetFQDN, + // }) + // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetFQDN, sts.TailnetTargetFQDN) + // } else if sts.ServeConfig != nil { + // container.Env = append(container.Env, corev1.EnvVar{ + // Name: "TS_SERVE_CONFIG", + // Value: "/etc/tailscaled/serve-config", + // }) + // container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ + // Name: "serve-config", + // ReadOnly: true, + // MountPath: "/etc/tailscaled", + // }) + // pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{ + // Name: "serve-config", + // VolumeSource: corev1.VolumeSource{ + // Secret: &corev1.SecretVolumeSource{ + // SecretName: proxySecret, + // Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}}, + // }, + // }, + // }) + // } logger.Debugf("reconciling statefulset %s/%s", ss.GetNamespace(), ss.GetName()) if sts.ProxyClassName != "" { logger.Debugf("configuring proxy resources with ProxyClass %s", sts.ProxyClassName) @@ -636,22 +671,22 @@ func applyProxyClassToStatefulSet(pc *tsapi.ProxyClass, ss *appsv1.StatefulSet, if pc == nil || ss == nil { return ss } - if pc.Spec.Metrics != nil && pc.Spec.Metrics.Enable { - if stsCfg.TailnetTargetFQDN == "" && stsCfg.TailnetTargetIP == "" && !stsCfg.ForwardClusterTrafficViaL7IngressProxy { - enableMetrics(ss, pc) - } else if stsCfg.ForwardClusterTrafficViaL7IngressProxy { - // TODO (irbekrm): fix this - // For Ingress proxies that have been configured with - // tailscale.com/experimental-forward-cluster-traffic-via-ingress - // annotation, all cluster traffic is forwarded to the - // Ingress backend(s). - logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.") - } else { - // TODO (irbekrm): fix this - // For egress proxies, currently all cluster traffic is forwarded to the tailnet target. - logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.") - } - } + // if pc.Spec.Metrics != nil && pc.Spec.Metrics.Enable { + // if stsCfg.TailnetTargetFQDN == "" && stsCfg.TailnetTargetIP == "" && !stsCfg.ForwardClusterTrafficViaL7IngressProxy { + // enableMetrics(ss, pc) + // } else if stsCfg.ForwardClusterTrafficViaL7IngressProxy { + // // TODO (irbekrm): fix this + // // For Ingress proxies that have been configured with + // // tailscale.com/experimental-forward-cluster-traffic-via-ingress + // // annotation, all cluster traffic is forwarded to the + // // Ingress backend(s). + // logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.") + // } else { + // // TODO (irbekrm): fix this + // // For egress proxies, currently all cluster traffic is forwarded to the tailnet target. + // logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.") + // } + // } if pc.Spec.StatefulSet == nil { return ss @@ -764,23 +799,23 @@ func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *co AcceptDNS: "false", AcceptRoutes: "false", // AcceptRoutes defaults to true Locked: "false", - Hostname: &stsC.Hostname, + Hostname: &stsC.name, NoStatefulFiltering: "false", } // For egress proxies only, we need to ensure that stateful filtering is // not in place so that traffic from cluster can be forwarded via // Tailscale IPs. - if stsC.TailnetTargetFQDN != "" || stsC.TailnetTargetIP != "" { - conf.NoStatefulFiltering = "true" - } - if stsC.Connector != nil { - routes, err := netutil.CalcAdvertiseRoutes(stsC.Connector.routes, stsC.Connector.isExitNode) - if err != nil { - return nil, fmt.Errorf("error calculating routes: %w", err) - } - conf.AdvertiseRoutes = routes - } + // if stsC.TailnetTargetFQDN != "" || stsC.TailnetTargetIP != "" { + // conf.NoStatefulFiltering = "true" + // } + // if stsC.Connector != nil { + // routes, err := netutil.CalcAdvertiseRoutes(stsC.Connector.routes, stsC.Connector.isExitNode) + // if err != nil { + // return nil, fmt.Errorf("error calculating routes: %w", err) + // } + // conf.AdvertiseRoutes = routes + // } if shouldAcceptRoutes(stsC.ProxyClass) { conf.AcceptRoutes = "true" } diff --git a/cmd/k8s-operator/svc.go b/cmd/k8s-operator/svc.go index 9e7c6f4f3..d1a153c97 100644 --- a/cmd/k8s-operator/svc.go +++ b/cmd/k8s-operator/svc.go @@ -9,7 +9,6 @@ "context" "errors" "fmt" - "net/netip" "slices" "strings" "sync" @@ -237,109 +236,109 @@ func (a *ServiceReconciler) maybeProvision(ctx context.Context, logger *zap.Suga return errMsg } } - crl := childResourceLabels(svc.Name, svc.Namespace, "svc") - var tags []string - if tstr, ok := svc.Annotations[AnnotationTags]; ok { - tags = strings.Split(tstr, ",") - } + // crl := childResourceLabels(svc.Name, svc.Namespace, "svc") + // var tags []string + // if tstr, ok := svc.Annotations[AnnotationTags]; ok { + // tags = strings.Split(tstr, ",") + // } - sts := &tailscaleSTSConfig{ - ParentResourceName: svc.Name, - ParentResourceUID: string(svc.UID), - Hostname: nameForService(svc), - Tags: tags, - ChildResourceLabels: crl, - ProxyClassName: proxyClass, - } + // sts := &tailscaleSTSConfig{ + // ParentResourceName: svc.Name, + // ParentResourceUID: string(svc.UID), + // Hostname: nameForService(svc), + // Tags: tags, + // ChildResourceLabels: crl, + // ProxyClassName: proxyClass, + // } - a.mu.Lock() - if a.shouldExposeClusterIP(svc) { - sts.ClusterTargetIP = svc.Spec.ClusterIP - a.managedIngressProxies.Add(svc.UID) - gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) - } else if a.shouldExposeDNSName(svc) { - sts.ClusterTargetDNSName = svc.Spec.ExternalName - a.managedIngressProxies.Add(svc.UID) - gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) - } else if ip := tailnetTargetAnnotation(svc); ip != "" { - sts.TailnetTargetIP = ip - a.managedEgressProxies.Add(svc.UID) - gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) - } else if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" { - fqdn := svc.Annotations[AnnotationTailnetTargetFQDN] - if !strings.HasSuffix(fqdn, ".") { - fqdn = fqdn + "." - } - sts.TailnetTargetFQDN = fqdn - a.managedEgressProxies.Add(svc.UID) - gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) - } - a.mu.Unlock() + // a.mu.Lock() + // if a.shouldExposeClusterIP(svc) { + // sts.ClusterTargetIP = svc.Spec.ClusterIP + // a.managedIngressProxies.Add(svc.UID) + // gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) + // } else if a.shouldExposeDNSName(svc) { + // sts.ClusterTargetDNSName = svc.Spec.ExternalName + // a.managedIngressProxies.Add(svc.UID) + // gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) + // } else if ip := tailnetTargetAnnotation(svc); ip != "" { + // sts.TailnetTargetIP = ip + // a.managedEgressProxies.Add(svc.UID) + // gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) + // } else if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" { + // fqdn := svc.Annotations[AnnotationTailnetTargetFQDN] + // if !strings.HasSuffix(fqdn, ".") { + // fqdn = fqdn + "." + // } + // sts.TailnetTargetFQDN = fqdn + // a.managedEgressProxies.Add(svc.UID) + // gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) + // } + // a.mu.Unlock() - var hsvc *corev1.Service - if hsvc, err = a.ssr.Provision(ctx, logger, sts); err != nil { - errMsg := fmt.Errorf("failed to provision: %w", err) - tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger) - return errMsg - } + // var hsvc *corev1.Service + // if hsvc, err = a.ssr.Provision(ctx, logger, sts); err != nil { + // errMsg := fmt.Errorf("failed to provision: %w", err) + // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger) + // return errMsg + // } - if sts.TailnetTargetIP != "" || sts.TailnetTargetFQDN != "" { // if an egress proxy - clusterDomain := retrieveClusterDomain(a.tsNamespace, logger) - headlessSvcName := hsvc.Name + "." + hsvc.Namespace + ".svc." + clusterDomain - if svc.Spec.ExternalName != headlessSvcName || svc.Spec.Type != corev1.ServiceTypeExternalName { - svc.Spec.ExternalName = headlessSvcName - svc.Spec.Selector = nil - svc.Spec.Type = corev1.ServiceTypeExternalName - if err := a.Update(ctx, svc); err != nil { - errMsg := fmt.Errorf("failed to update service: %w", err) - tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger) - return errMsg - } - } - tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) - return nil - } + // if sts.TailnetTargetIP != "" || sts.TailnetTargetFQDN != "" { // if an egress proxy + // clusterDomain := retrieveClusterDomain(a.tsNamespace, logger) + // headlessSvcName := hsvc.Name + "." + hsvc.Namespace + ".svc." + clusterDomain + // if svc.Spec.ExternalName != headlessSvcName || svc.Spec.Type != corev1.ServiceTypeExternalName { + // svc.Spec.ExternalName = headlessSvcName + // svc.Spec.Selector = nil + // svc.Spec.Type = corev1.ServiceTypeExternalName + // if err := a.Update(ctx, svc); err != nil { + // errMsg := fmt.Errorf("failed to update service: %w", err) + // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger) + // return errMsg + // } + // } + // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) + // return nil + // } - if !isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) { - logger.Debugf("service is not a LoadBalancer, so not updating ingress") - tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) - return nil - } + // if !isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) { + // logger.Debugf("service is not a LoadBalancer, so not updating ingress") + // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) + // return nil + // } - _, tsHost, tsIPs, err := a.ssr.DeviceInfo(ctx, crl) - if err != nil { - return fmt.Errorf("failed to get device ID: %w", err) - } - if tsHost == "" { - msg := "no Tailscale hostname known yet, waiting for proxy pod to finish auth" - logger.Debug(msg) - // No hostname yet. Wait for the proxy pod to auth. - svc.Status.LoadBalancer.Ingress = nil - tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyPending, msg, a.clock, logger) - return nil - } + // _, tsHost, tsIPs, err := a.ssr.DeviceInfo(ctx, crl) + // if err != nil { + // return fmt.Errorf("failed to get device ID: %w", err) + // } + // if tsHost == "" { + // msg := "no Tailscale hostname known yet, waiting for proxy pod to finish auth" + // logger.Debug(msg) + // // No hostname yet. Wait for the proxy pod to auth. + // svc.Status.LoadBalancer.Ingress = nil + // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyPending, msg, a.clock, logger) + // return nil + // } - logger.Debugf("setting Service LoadBalancer status to %q, %s", tsHost, strings.Join(tsIPs, ", ")) - ingress := []corev1.LoadBalancerIngress{ - {Hostname: tsHost}, - } - clusterIPAddr, err := netip.ParseAddr(svc.Spec.ClusterIP) - if err != nil { - msg := fmt.Sprintf("failed to parse cluster IP: %v", err) - tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, msg, a.clock, logger) - return errors.New(msg) - } - for _, ip := range tsIPs { - addr, err := netip.ParseAddr(ip) - if err != nil { - continue - } - if addr.Is4() == clusterIPAddr.Is4() { // only add addresses of the same family - ingress = append(ingress, corev1.LoadBalancerIngress{IP: ip}) - } - } - svc.Status.LoadBalancer.Ingress = ingress - tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) + // logger.Debugf("setting Service LoadBalancer status to %q, %s", tsHost, strings.Join(tsIPs, ", ")) + // ingress := []corev1.LoadBalancerIngress{ + // {Hostname: tsHost}, + // } + // clusterIPAddr, err := netip.ParseAddr(svc.Spec.ClusterIP) + // if err != nil { + // msg := fmt.Sprintf("failed to parse cluster IP: %v", err) + // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, msg, a.clock, logger) + // return errors.New(msg) + // } + // for _, ip := range tsIPs { + // addr, err := netip.ParseAddr(ip) + // if err != nil { + // continue + // } + // if addr.Is4() == clusterIPAddr.Is4() { // only add addresses of the same family + // ingress = append(ingress, corev1.LoadBalancerIngress{IP: ip}) + // } + // } + // svc.Status.LoadBalancer.Ingress = ingress + // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) return nil } diff --git a/ipn/store/kubestore/store_kube.go b/ipn/store/kubestore/store_kube.go index 0c90d06b3..582f962a9 100644 --- a/ipn/store/kubestore/store_kube.go +++ b/ipn/store/kubestore/store_kube.go @@ -58,10 +58,7 @@ func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { secret, err := s.client.GetSecret(ctx, s.secretName) if err != nil { - if st, ok := err.(*kube.Status); ok && st.Code == 404 { - return nil, ipn.ErrStateNotExist - } - return nil, err + return nil, ipn.ErrStateNotExist } b, ok := secret.Data[sanitizeKey(id)] if !ok { @@ -88,21 +85,18 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { secret, err := s.client.GetSecret(ctx, s.secretName) if err != nil { - if kube.IsNotFoundErr(err) { - return s.client.CreateSecret(ctx, &kube.Secret{ - TypeMeta: kube.TypeMeta{ - APIVersion: "v1", - Kind: "Secret", - }, - ObjectMeta: kube.ObjectMeta{ - Name: s.secretName, - }, - Data: map[string][]byte{ - sanitizeKey(id): bs, - }, - }) - } - return err + return s.client.CreateSecret(ctx, &kube.Secret{ + TypeMeta: kube.TypeMeta{ + APIVersion: "v1", + Kind: "Secret", + }, + ObjectMeta: kube.ObjectMeta{ + Name: s.secretName, + }, + Data: map[string][]byte{ + sanitizeKey(id): bs, + }, + }) } if s.canPatch { if len(secret.Data) == 0 { // if user has pre-created a blank Secret diff --git a/k8s-operator/egress-services.go b/k8s-operator/egress-services.go new file mode 100644 index 000000000..ca0ac8182 --- /dev/null +++ b/k8s-operator/egress-services.go @@ -0,0 +1,20 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +// Package kube contains types and utilities for the Tailscale Kubernetes Operator. +package kube + +const EgressServiceAlphaV = "v1alpha1" + +type EgressServices struct { + Version string `json:"version"` + Services map[string]EgressService `json:"services"` +} + +type EgressService struct { + // fwegress pod IPs + ClusterSources []string `json:"clusterSources"` + TailnetTargetIP string `json:"tailnetTargetIP"` +} diff --git a/kube/api.go b/kube/api.go index b49b76c34..058f7d30e 100644 --- a/kube/api.go +++ b/kube/api.go @@ -146,6 +146,13 @@ type Secret struct { // +optional Data map[string][]byte `json:"data,omitempty"` } +type ConfigMap struct { + TypeMeta `json:",inline"` + ObjectMeta `json:"metadata"` + + // +optional + Data map[string][]byte `json:"data,omitempty"` +} // Status is a return value for calls that don't return other objects. type Status struct { diff --git a/kube/client.go b/kube/client.go index 62daa366e..0f155b6fc 100644 --- a/kube/client.go +++ b/kube/client.go @@ -20,6 +20,7 @@ "net/url" "os" "path/filepath" + "strings" "sync" "time" @@ -55,6 +56,7 @@ type Client interface { CreateSecret(context.Context, *Secret) error StrategicMergePatchSecret(context.Context, string, *Secret, string) error JSONPatchSecret(context.Context, string, []JSONPatch) error + JSONPatchConfigMap(context.Context, string, []JSONPatch) error CheckSecretPermissions(context.Context, string) (bool, bool, error) SetDialer(dialer func(context.Context, string, string) (net.Conn, error)) SetURL(string) @@ -138,6 +140,10 @@ func (c *client) secretURL(name string) string { return fmt.Sprintf("%s/api/v1/namespaces/%s/secrets/%s", c.url, c.ns, name) } +func (c *client) configMapURL(name string) string { + return fmt.Sprintf("%s/api/v1/namespaces/%s/configmaps/%s", c.url, c.ns, name) +} + func getError(resp *http.Response) error { if resp.StatusCode == 200 || resp.StatusCode == 201 { // These are the only success codes returned by the Kubernetes API. @@ -167,21 +173,21 @@ func setHeader(key, value string) func(*http.Request) { func (c *client) doRequest(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error { req, err := c.newRequest(ctx, method, url, in) if err != nil { - return err + return fmt.Errorf("error making new request url %s in %v out %v method %v: %v", url, in, out, method, err) } for _, opt := range opts { opt(req) } resp, err := c.client.Do(req) if err != nil { - return err + return fmt.Errorf("error sending request: %+#v: %v", req, err) } defer resp.Body.Close() if err := getError(resp); err != nil { if st, ok := err.(*Status); ok && st.Code == 401 { c.expireToken() } - return err + return fmt.Errorf("error in response: url %s in %s req \n%+#v\n %v", url, in, req, err) } if out != nil { return json.NewDecoder(resp.Body).Decode(out) @@ -253,13 +259,24 @@ type JSONPatch struct { // It currently (2023-03-02) only supports "add" and "remove" operations. func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { for _, p := range patch { - if p.Op != "remove" && p.Op != "add" { + if p.Op != "remove" && p.Op != "add" && p.Op != "replace" { panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) } } return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) } +// JSONPatchConfigMap updates a configmap in the Kubernetes API using a JSON patch. +// It currently (2023-03-02) only supports "add" and "remove" operations. +func (c *client) JSONPatchConfigMap(ctx context.Context, name string, patch []JSONPatch) error { + for _, p := range patch { + if p.Op != "replace" { + panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) + } + } + return c.doRequest(ctx, "PATCH", c.configMapURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) +} + // StrategicMergePatchSecret updates a secret in the Kubernetes API using a // strategic merge patch. // If a fieldManager is provided, it will be used to track the patch. @@ -343,5 +360,8 @@ func IsNotFoundErr(err error) bool { if st, ok := err.(*Status); ok && st.Code == 404 { return true } + if strings.Contains(err.Error(), "not found") { + return true + } return false } diff --git a/kube/fake_client.go b/kube/fake_client.go index ad5e8201d..4c4c63faa 100644 --- a/kube/fake_client.go +++ b/kube/fake_client.go @@ -30,5 +30,8 @@ func (fc *FakeClient) StrategicMergePatchSecret(context.Context, string, *Secret func (fc *FakeClient) JSONPatchSecret(context.Context, string, []JSONPatch) error { return nil } +func (fc *FakeClient) JSONPatchConfigMap(context.Context, string, []JSONPatch) error { + return nil +} func (fc *FakeClient) UpdateSecret(context.Context, *Secret) error { return nil } func (fc *FakeClient) CreateSecret(context.Context, *Secret) error { return nil } diff --git a/util/linuxfw/iptables_runner.go b/util/linuxfw/iptables_runner.go index 507f6cd48..77e388c59 100644 --- a/util/linuxfw/iptables_runner.go +++ b/util/linuxfw/iptables_runner.go @@ -371,6 +371,11 @@ func (i *iptablesRunner) AddDNATRule(origDst, dst netip.Addr) error { return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String()) } +func (i *iptablesRunner) AddDNATRuleWithSrc(src, dst netip.Addr) error { + table := i.getIPTByAddr(dst) + return table.Insert("nat", "PREROUTING", 1, "--source", src.String(), "-j", "DNAT", "--to-destination", dst.String()) +} + func (i *iptablesRunner) AddSNATRuleForDst(src, dst netip.Addr) error { table := i.getIPTByAddr(dst) return table.Insert("nat", "POSTROUTING", 1, "--destination", dst.String(), "-j", "SNAT", "--to-source", src.String()) diff --git a/util/linuxfw/nftables_runner.go b/util/linuxfw/nftables_runner.go index 317d84c12..e6095034f 100644 --- a/util/linuxfw/nftables_runner.go +++ b/util/linuxfw/nftables_runner.go @@ -102,6 +102,52 @@ func (n *nftablesRunner) ensurePreroutingChain(dst netip.Addr) (*nftables.Table, return nat, preroutingCh, nil } +// DNAT traffic from src to dst +func (n *nftablesRunner) AddDNATRuleWithSrc(src, dst netip.Addr) error { + nat, preroutingCh, err := n.ensurePreroutingChain(dst) + if err != nil { + return err + } + var saddrOffset, fam, saddrLen uint32 + if src.Is4() { + saddrOffset = 8 + saddrLen = 4 + fam = unix.NFPROTO_IPV4 + } else { + saddrOffset = 8 + saddrLen = 16 + fam = unix.NFPROTO_IPV6 + } + dnatRule := &nftables.Rule{ + Table: nat, + Chain: preroutingCh, + Exprs: []expr.Any{ + &expr.Payload{ + DestRegister: 1, + Base: expr.PayloadBaseNetworkHeader, + Offset: saddrOffset, + Len: saddrLen, + }, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: src.AsSlice(), + }, + &expr.Immediate{ + Register: 1, + Data: dst.AsSlice(), + }, + &expr.NAT{ + Type: expr.NATTypeDestNAT, + Family: fam, + RegAddrMin: 1, + }, + }, + } + n.conn.InsertRule(dnatRule) + return n.conn.Flush() +} + func (n *nftablesRunner) AddDNATRule(origDst netip.Addr, dst netip.Addr) error { nat, preroutingCh, err := n.ensurePreroutingChain(dst) if err != nil { @@ -563,6 +609,9 @@ type NetfilterRunner interface { // the Tailscale interface, as used in the Kubernetes egress proxies. AddSNATRuleForDst(src, dst netip.Addr) error + // DNAT traffic from src to dst + AddDNATRuleWithSrc(src, dst netip.Addr) error + // DNATNonTailscaleTraffic adds a rule to the nat/PREROUTING chain to DNAT // all traffic inbound from any interface except exemptInterface to dst. // This is used to forward traffic destined for the local machine over