diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index fdf71c3ea..99ddfb0eb 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -81,6 +81,7 @@ // cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy) // as a non-cluster workload on tailnet. // This is only meant to be configured by the Kubernetes operator. +// - TS_EGRESS_SERVICES_PATH: mounted json formatted egress service config // // When running on Kubernetes, containerboot defaults to storing state in the // "tailscale" kube secret. To store state on local disk instead, set @@ -123,11 +124,13 @@ "tailscale.com/ipn" "tailscale.com/ipn/conffile" kubeutils "tailscale.com/k8s-operator" + "tailscale.com/kube" "tailscale.com/tailcfg" "tailscale.com/types/logger" "tailscale.com/types/ptr" "tailscale.com/util/deephash" "tailscale.com/util/linuxfw" + "tailscale.com/util/mak" ) func newNetfilterRunner(logf logger.Logf) (linuxfw.NetfilterRunner, error) { @@ -166,6 +169,7 @@ func main() { PodIP: defaultEnv("POD_IP", ""), EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false), HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""), + EgressServicesPath: defaultEnv("TS_EGRESS_SERVICES_PATH", ""), } if err := cfg.validate(); err != nil { @@ -576,7 +580,6 @@ func main() { log.Fatalf("storing device IPs and FQDN in Kubernetes Secret: %v", err) } } - if cfg.HealthCheckAddrPort != "" { h.Lock() h.hasAddrs = len(addrs) != 0 @@ -601,6 +604,11 @@ func main() { log.Println("Startup complete, waiting for shutdown signal") startupTasksDone = true + if cfg.EgressServicesPath != "" { + log.Printf("ensure egress services are configured") + go ensureEgressServicePortMap(ctx, cfg.EgressServicesPath, nfr, addrs, cfg.PodIP, cfg.KubeSecret) + } + // Wait on tailscaled process. It won't // be cleaned up by default when the // container exits as it is not PID1. @@ -1172,6 +1180,183 @@ type settings struct { // target. PodIP string HealthCheckAddrPort string + EgressServicesPath string +} + +func ensureEgressServicePortMap(ctx context.Context, path string, nfr linuxfw.NetfilterRunner, addrs []netip.Prefix, podIPS, stateSecretName string) { + if path == "" { + panic("egress services config path is empty") + } + // TODO: also reconfigure if tailnet IPs have changed + var tickChan <-chan time.Time + var eventChan <-chan fsnotify.Event + if w, err := fsnotify.NewWatcher(); err != nil { + log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + tickChan = ticker.C + } else { + defer w.Close() + if err := w.Add(filepath.Dir(path)); err != nil { + log.Fatalf("failed to add fsnotify watch: %v", err) + } + eventChan = w.Events + } + f := func() { + log.Printf("running egress service reconfigure") + cfg, err := readEgressSvcConfig(path) + if err != nil { + log.Fatalf("error reading egress svc config: %v", err) + } + if cfg == nil || len(*cfg) == 0 { + log.Printf("no services configured yet") + return + } + // get the config from state secret + stateS, err := kc.GetSecret(ctx, stateSecretName) + if err != nil { + log.Fatalf("error retrieving state secret: %v", err) + } + prevCfg := &kubeutils.EgressServicesStatus{} + prevCfgS, ok := stateS.Data["egress-services"] + if ok { + if err := json.Unmarshal([]byte(prevCfgS), prevCfg); err != nil { + log.Fatalf("error unmarshalling previous config: %v", err) + } + } + + hasChanges := false + for svcName, svc := range *cfg { + // produce wanted config + if egressSvcUpToDate(prevCfg, svcName, svc, podIPS) { + log.Printf("%s up to date", svcName) + continue + } + log.Printf("svc %s changes detected", svcName) + hasChanges = true + // only IP is supported for this prototype + tailnetTarget, err := netip.ParseAddr(svc.TailnetTarget.IP) + if err != nil { + log.Fatalf("error parsing tailnet ip: %v", err) + } + var local netip.Addr + for _, pfx := range addrs { + if !pfx.IsSingleIP() { + continue + } + if pfx.Addr().Is4() != tailnetTarget.Is4() { + continue + } + local = pfx.Addr() + break + } + if !local.IsValid() { + // TODO: watch tailnet IPs and retry when a new one gets allocated + log.Fatalf("no valid local IP: %v", local) + } + // add snat + if err := nfr.AddSNATRuleForDst(local, tailnetTarget); err != nil { + log.Fatalf("error setting up SNAT: %v", err) + } + podIP, err := netip.ParseAddr(podIPS) + if err != nil { + log.Fatalf("provided Pod IP %s cannot be parsed as IP: %v", podIPS, err) + } + for _, mapping := range svc.Ports { + p, err := proto(mapping.Protocol) + if err != nil { + log.Fatalf("unable to parse protocol: %v", err) + } + if err := nfr.AddDNATRuleForSrcPrt(podIP, tailnetTarget, mapping.Dst, mapping.Src, p); err != nil { + log.Fatalf("error setting up DNAT rule for tailnet target %v port map %v:%v : %v", tailnetTarget, mapping.Src, mapping.Dst, err) + } + } + svcCfg := kubeutils.EgressServiceStatus{ + TailnetTarget: svc.TailnetTarget, + Ports: svc.Ports, + PodIP: podIPS, + } + mak.Set(prevCfg, svcName, svcCfg) + } + log.Printf("state Secret has changes: %t", hasChanges) + if hasChanges { + bs, err := json.Marshal(prevCfg) + if err != nil { + log.Fatalf("error marshalling service config: %v", err) + } + stateS.Data["egress-services"] = bs + patch := kube.JSONPatch{ + Op: "replace", + Path: "/data/egress-services", + Value: bs, + } + if err := kc.JSONPatchSecret(ctx, stateSecretName, []kube.JSONPatch{patch}); err != nil { + log.Fatalf("error patching state Secret: %v", err) + } + } + } + f() + for { + select { + case <-ctx.Done(): + return + case <-tickChan: + case <-eventChan: + f() + } + } +} + +func proto(s string) (uint8, error) { + switch strings.ToLower(s) { + case "tcp": + return 6, nil + case "udp": + return 11, nil + default: + return 0, fmt.Errorf("unrecognized protocol: %s", s) + } +} + +func egressSvcUpToDate(prevCfg *kubeutils.EgressServicesStatus, svcName string, svcCfg kubeutils.EgressService, podIP string) bool { + if prevCfg == nil { + return false + } + prev, ok := (*prevCfg)[svcName] + if !ok { + return false + } + if !strings.EqualFold(prev.TailnetTarget.IP, svcCfg.TailnetTarget.IP) { + return false + } + if !reflect.DeepEqual(prev.Ports, svcCfg.Ports) { + return false + } + if !strings.EqualFold(podIP, prev.PodIP) { + return false + } + return true +} + +func readEgressSvcConfig(path string) (*kubeutils.EgressServices, error) { + if path == "" { + return nil, nil + } + j, err := os.ReadFile(path) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + return nil, err + } + if len(j) == 0 || string(j) == "" { + return nil, nil + } + var cfg *kubeutils.EgressServices + if err := json.Unmarshal(j, &cfg); err != nil { + return nil, err + } + return cfg, nil } func (s *settings) validate() error { @@ -1351,7 +1536,7 @@ func isOneStepConfig(cfg *settings) bool { // as an L3 proxy, proxying to an endpoint provided via one of the config env // vars. func isL3Proxy(cfg *settings) bool { - return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress + return cfg.EgressServicesPath != "" || cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress } // hasKubeStateStore returns true if the state must be stored in a Kubernetes diff --git a/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml b/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml index 9f2a4c2f0..f10bff4c5 100644 --- a/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml +++ b/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml @@ -48,14 +48,14 @@ metadata: namespace: {{ .Release.Namespace }} rules: - apiGroups: [""] - resources: ["secrets", "serviceaccounts", "configmaps"] + resources: ["secrets", "serviceaccounts", "configmaps", "pods"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] - apiGroups: ["apps"] resources: ["statefulsets", "deployments"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] - apiGroups: ["discovery.k8s.io"] resources: ["endpointslices"] - verbs: ["get", "list", "watch"] + verbs: ["get", "list", "watch", "create", "update", "patch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/k8s-operator/deploy/manifests/operator.yaml b/cmd/k8s-operator/deploy/manifests/operator.yaml index 894ec1d69..166c3a65e 100644 --- a/cmd/k8s-operator/deploy/manifests/operator.yaml +++ b/cmd/k8s-operator/deploy/manifests/operator.yaml @@ -2506,6 +2506,7 @@ rules: - secrets - serviceaccounts - configmaps + - pods verbs: - create - delete @@ -2537,6 +2538,9 @@ rules: - get - list - watch + - create + - update + - patch --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role diff --git a/cmd/k8s-operator/egress-ha.md b/cmd/k8s-operator/egress-ha.md new file mode 100644 index 000000000..49c3a9e9c --- /dev/null +++ b/cmd/k8s-operator/egress-ha.md @@ -0,0 +1,214 @@ +# This is a Kubernetes Operator egress HA prototype based on portmapping + +It contains: +- containerboot/netfilter runner changes to parse egress services config and set up portmapping based DNAT +- two new operator reconcile loops to parse HA egress resources +- static manifests that imitate having a ProxyGroup deployed to cluster +- some other changes, additional RBAC etc + +## To try this out + +### Setup + +(The setup steps use images built from this branch available from a public GCR that I own) + +- set up a cluster WITHOUT the operator + +From this branch: +- `$ kubectl apply -f cmd/k8s-operator/crds` +- install operator: +``` +$ helm upgrade --install operator ./cmd/k8s-operator/deploy/chart/ --set operatorConfig.image.repo=gcr.io/csi-test-290908/operator --set operatorConfig.image.tag=v0.0.14egresshapm -n tailscale --set oauth.clientId= --set oauth.clientSecret= --set operatorConfig.logging=debug --create-namespace --set operatorConfig.image.pullPolicy=IfNotPresent +``` +- apply static manifests that imitate having a ProxyGroup: + +Create a REUSABLE Tailscale auth key and update ./cmd/k8s-operator/egress-ha.yaml with it. + +Run: + +``` +$ kubectl apply -f ./cmd/k8s-operator/egress-ha.yaml +``` +- observe that the 'proxy group' `Pods` have come up: +``` +$ kubectl get pods -n tailscale +NAME READY STATUS RESTARTS AGE +egress-proxies-0 1/1 Running 0 6m23s +egress-proxies-1 1/1 Running 0 6m22s +egress-proxies-2 1/1 Running 0 6m21s +... +``` + +### Test it out + +- ensure you have some service on your tailnet that you can access via egress + +#### Expose a tailnet service(s) on the ProxyGroup proxies + +- Apply some egress `Services` with `tailscale.com/proxy-group` label, and a `tailscale.com/tailnet-ip` annotation pointing at the tailnet service i.e: + +``` +apiVersion: v1 +kind: Service +metadata: + annotations: + tailscale.com/tailnet-ip: 100.64.1.230 + labels: + tailscale.com/proxy-group: egress-proxies + name: kuard-egress +spec: + externalName: placeholder + type: ExternalName + ports: + - port: 80 + protocol: TCP + name: http +--- +apiVersion: v1 +kind: Service +metadata: + annotations: + tailscale.com/tailnet-ip: 100.64.1.196 + labels: + tailscale.com/proxy-group: egress-proxies + name: dns-server +spec: + externalName: placeholder + type: ExternalName + ports: + - port: 53 + protocol: UDP + name: udp + - port: 53 + protocol: TCP + name: tcp +``` + +- Note- it will take a little while for the mounted ConfigMap to be updated. +To follow, you can take a look at whether the mounted config has been updated: +``` +$ kubectl exec -it egress-proxies-0 -n tailscale -- cat /etc/egress-services/cfg + +``` +.. as well as check proxy logs +``` +$ kubectl logs egress-proxies-0 -n tailscale +... +boot: 2024/09/04 07:35:48 running egress service reconfigure +boot: 2024/09/04 07:35:48 svc dns-server-default changes detected +boot: 2024/09/04 07:35:48 svc kuard-egress-default changes detected +... + +``` + +- Once the config has been updated, test that any cluster workload can access the egress service(s) +via the ExternalName Service(s): + +``` +$ kubectl exec -it proxy -- sh +/ # curl -vv kuard-egress +* Host kuard-egress:80 was resolved. +... +/ # dig @dns-server + +; <<>> DiG 9.18.24 <<>> @dns-server +; (1 server found) +... +``` + +- Verify that the EndpointSlice created for each egress service contains all ProxyGroup Pod IPs: + +``` +$ kubectl get po -n tailscale -owide +NAME READY STATUS RESTARTS AGE IP +egress-proxies-0 1/1 Running 0 31m 10.80.0.51 +egress-proxies-1 1/1 Running 0 31m 10.80.2.54 +egress-proxies-2 1/1 Running 0 31m 10.80.0.52 +... +$ kubectl get endpointslice -n tailscale +NAME ADDRESSTYPE PORTS ENDPOINTS AGE +dns-server-default IPv4 3160,2181 10.80.0.52,10.80.0.51,10.80.2.54 30m +kuard-egress-default IPv4 2688 10.80.0.51,10.80.2.54,10.80.0.52 30m +... +``` + +#### Add another Pod to 'proxy group' + +Scale replicas 3 -> 4: + +- `$ kubectl scale sts/egress-proxies -n tailscale --replicas=4` + +This change should be processed a lot faster as the proxy will read its config on start + +- Once the additional `Pod` is up, observe that it's IP address has been added to the EndpointSlice: + +``` +$ kubectl get po -n tailscale -owide +NAME READY STATUS RESTARTS AGE IP +egress-proxies-0 1/1 Running 0 41m 10.80.0.51 +egress-proxies-1 1/1 Running 0 41m 10.80.2.54 +egress-proxies-2 1/1 Running 0 41m 10.80.0.52 +egress-proxies-3 1/1 Running 0 69s 10.80.2.56 +... +$ kubectl get endpointslice -n tailscale +NAME ADDRESSTYPE PORTS ENDPOINTS AGE +dns-server-default IPv4 3160,2181 10.80.2.56,10.80.0.51,10.80.2.54 + 1 more... 40m +kuard-egress-default IPv4 2688 10.80.0.51,10.80.2.54,10.80.0.52 + 1 more... 40m +``` + +- You can also test that the new `Pod` knows how to route the traffic. + +Find the `Pod`'s target port from the ExternalName Service that you created: + +``` +$ kubectl get svc kuard-egress -oyaml +apiVersion: v1 +kind: Service +metadata: + annotations: + tailscale.com/tailnet-ip: 100.64.1.230 + labels: + tailscale.com/proxy-group: egress-proxies + name: kuard-egress + namespace: default +spec: + externalName: kuard-egress-default.tailscale.svc.cluster.local + ports: + - name: http + port: 80 + protocol: TCP + targetPort: 2688 + type: ExternalName +``` +Try to route to the tailnet service using the new `Pod`'s IP: + +``` +$ kubectl exec -it proxy -- sh +/ # curl -vv 10.80.2.56:2688 +* Trying 10.80.2.56:2688... +* Connected to 10.80.2.56 (10.80.2.56) port 2688 +... +``` + +#### Remove a Pod from the 'proxy group' + +Scale replicas 4 -> 3: + +- `$ kubectl scale sts/egress-proxies -n tailscale --replicas=3` + +This change should get processed fairly fast. + +- Observe that once the `Pod` is gone, it's IP address is removed from the `EndpointSlice`(s): + +``` +$ kubectl get po -n tailscale -owide +NAME READY STATUS RESTARTS AGE IP +egress-proxies-0 1/1 Running 0 49m 10.80.0.51 +egress-proxies-1 1/1 Running 0 49m 10.80.2.54 +egress-proxies-2 1/1 Running 0 49m 10.80.0.52 +... +$ kubectl get endpointslice -n tailscale +NAME ADDRESSTYPE PORTS ENDPOINTS AGE +dns-server-default IPv4 3160,2181 10.80.0.51,10.80.2.54,10.80.0.52 48m +kuard-egress-default IPv4 2688 10.80.0.51,10.80.2.54,10.80.0.52 48m +``` diff --git a/cmd/k8s-operator/egress-ha.yaml b/cmd/k8s-operator/egress-ha.yaml new file mode 100644 index 000000000..7eea1f287 --- /dev/null +++ b/cmd/k8s-operator/egress-ha.yaml @@ -0,0 +1,161 @@ +# This yaml contains roughly what the operator would create for ProxyGroup resource like: +# apiVersion: tailscale.com/v1alpha1 +# kind: ProxyGroup +# metadata: +# name: egress-proxies +# spec: +# replicas: 3 +# type: egress +# +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: egress-proxies + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: egress-proxies + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +rules: +- apiGroups: + - "" + resources: + - secrets + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: egress-proxies + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: egress-proxies +subjects: +- kind: ServiceAccount + name: egress-proxies +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: egress-proxies + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies + app: egress-proxies +spec: + replicas: 3 + selector: + matchLabels: + app: egress-proxies + serviceName: "" + template: + metadata: + labels: + app: egress-proxies + tailscale.com/proxy-group: egress-proxies + spec: + containers: + - env: + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: TS_USERSPACE + value: "false" + - name: TS_KUBE_SECRET + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: TS_AUTH_ONCE + value: "true" + - name: TS_AUTHKEY + value: + - name: TS_HOSTNAME + value: egress-proxies + - name: TS_DEBUG_FIREWALL_MODE + value: iptables + - name: TS_EGRESS_SERVICES_PATH + value: /etc/egress-services/cfg + image: gcr.io/csi-test-290908/proxy:v0.0.8egresshapm + imagePullPolicy: IfNotPresent + name: tailscale + securityContext: + capabilities: + add: + - NET_ADMIN + volumeMounts: + - mountPath: /etc/egress-services + name: egress-services + dnsPolicy: ClusterFirst + restartPolicy: Always + schedulerName: default-scheduler + serviceAccount: egress-proxies + serviceAccountName: egress-proxies + volumes: + - configMap: + defaultMode: 420 + name: egress-proxies-egress-services + name: egress-services +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: egress-proxies-egress-services + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +--- +kind: Secret +apiVersion: v1 +metadata: + name: egress-proxies-0 + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +type: Opaque +--- +kind: Secret +apiVersion: v1 +metadata: + name: egress-proxies-1 + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +type: Opaque +--- +kind: Secret +apiVersion: v1 +metadata: + name: egress-proxies-2 + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +type: Opaque +--- +kind: Secret +apiVersion: v1 +metadata: + name: egress-proxies-3 + namespace: tailscale + labels: + tailscale.com/proxy-group: egress-proxies +type: Opaque \ No newline at end of file diff --git a/cmd/k8s-operator/egressha-eps.go b/cmd/k8s-operator/egressha-eps.go new file mode 100644 index 000000000..d8ffa33b2 --- /dev/null +++ b/cmd/k8s-operator/egressha-eps.go @@ -0,0 +1,185 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "strings" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + kube "tailscale.com/k8s-operator" + "tailscale.com/types/ptr" +) + +type egressHAEndpointSliceReconciler struct { + client.Client + logger *zap.SugaredLogger +} + +// Get EndpointSlice +// Retrieve all proxy group Pods +func (ehr *egressHAEndpointSliceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + logger := ehr.logger.With("Service", req.NamespacedName) + logger.Debugf("starting reconcile") + defer logger.Debugf("reconcile finished") + + eps := new(discoveryv1.EndpointSlice) + err = ehr.Get(ctx, req.NamespacedName, eps) + if apierrors.IsNotFound(err) { + logger.Debugf("EndpointSlice not found") + return reconcile.Result{}, nil + } + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get EndpointSlice: %w", err) + } + if !eps.DeletionTimestamp.IsZero() { + logger.Debugf("EnpointSlice is being deleted") + return res, nil + } + oldEps := eps.DeepCopy() + proxyGroupName := eps.Labels["tailscale.com/proxy-group"] + egressServiceName := eps.Labels["tailscale.com/egress-service"] + + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-egress-services", proxyGroupName), + Namespace: "tailscale", + }, + } + err = ehr.Get(ctx, client.ObjectKeyFromObject(cm), cm) + if apierrors.IsNotFound(err) { + logger.Debugf("ConfigMap %s not found", cm.Name) + return res, nil + } + if err != nil { + return res, fmt.Errorf("error retrieving ConfigMap %s: %w", cm.Name, err) + } + wantsCfgBS, ok := cm.BinaryData["cfg"] + if !ok { + // nothing here + logger.Debugf("egress-services config is empty") + return res, nil + } + wantsCfg := &kube.EgressServices{} + if err := json.Unmarshal(wantsCfgBS, wantsCfg); err != nil { + return res, fmt.Errorf("error unmarshalling egress services config: %w", err) + } + wantsEgressCfg, ok := (*wantsCfg)[egressServiceName] + if !ok { + logger.Debugf("egress services config does not contain config for %s", egressServiceName) + return res, nil + } + // get all proxy pods + podList := &corev1.PodList{} + if err := ehr.List(ctx, podList, client.MatchingLabels(map[string]string{"tailscale.com/proxy-group": proxyGroupName})); err != nil { + return res, fmt.Errorf("error listing Pods for %s ProxyGroup: %w", proxyGroupName, err) + } + if len(podList.Items) == 0 { + logger.Debugf("no Pods") + return res, nil + } + // also remove any leftover ones + // for each pod + newEndpoints := make([]discoveryv1.Endpoint, 0) + for _, pod := range podList.Items { + if !pod.DeletionTimestamp.IsZero() { + logger.Debugf("Pod %s is being deleted, ignore", pod.Name) + continue + } + // TODO: maybe some more Pod readiness checks + podIP := pod.Status.PodIP + // get the associated state Secret + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + } + err := ehr.Get(ctx, client.ObjectKeyFromObject(secret), secret) + if apierrors.IsNotFound(err) { + logger.Debugf("state Secret %s not yet exists", secret.Name) + continue + } + if err != nil { + return res, fmt.Errorf("error getting state Secret %s: %w", secret.Name, err) + } + svcStatusBS := secret.Data["egress-services"] + if len(svcStatusBS) == 0 { + // nothing ready here + logger.Debugf("state Secret %s does not yet have egress services status", secret.Name) + continue + } + svcStatus := &kube.EgressServicesStatus{} + if err := json.Unmarshal(svcStatusBS, svcStatus); err != nil { + return res, fmt.Errorf("error unmarshalling service status: %v", err) + } + thisSvcStatus, ok := (*svcStatus)[egressServiceName] + if !ok { + logger.Debugf("state Secret %s does not yet have status for egress service %s", secret.Name, egressServiceName) + continue + } + if !strings.EqualFold(podIP, thisSvcStatus.PodIP) { + logger.Debugf("got Pod IP %s, want Pod IP %s, not yet ready", thisSvcStatus.PodIP, podIP) + continue + } + if !strings.EqualFold(wantsEgressCfg.TailnetTarget.IP, thisSvcStatus.TailnetTarget.IP) { + logger.Debugf("got tailnet target IP %s, want %s, not yet ready", thisSvcStatus.TailnetTarget.IP, wantsEgressCfg.TailnetTarget.IP) + continue + } + if !reflect.DeepEqual(wantsEgressCfg.Ports, thisSvcStatus.Ports) { + logger.Debugf("got ports %+#v, wants ports %+#v", thisSvcStatus.Ports, wantsEgressCfg.Ports) + continue + } + // appears like the proxy's firewall should be ready to route traffic for this egress service + newEndpoints = append(newEndpoints, discoveryv1.Endpoint{ + Hostname: (*string)(&pod.UID), + Addresses: []string{podIP}, + Conditions: discoveryv1.EndpointConditions{ + Ready: ptr.To(true), + Serving: ptr.To(true), + Terminating: ptr.To(false), + }, + }) + } + eps.Endpoints = newEndpoints + if !reflect.DeepEqual(eps, oldEps) { + if err := ehr.Update(ctx, eps); err != nil { + return res, fmt.Errorf("error updating EndpointSlice: %v", err) + } + } + // TODO: or maybe do this elsewhere + if len(eps.Endpoints) > 0 { + extSvcName := eps.Labels["tailscale.com/external-service-name"] + extSvcNamespace := eps.Labels["tailscale.com/external-service-namespace"] + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: extSvcName, + Namespace: extSvcNamespace, + }, + } + if err := ehr.Get(ctx, client.ObjectKeyFromObject(svc), svc); err != nil { + // unexpected + return res, fmt.Errorf("error getting ExternalName Service %s/%s: %w", extSvcName, extSvcNamespace, err) + } + clusterSvcFQDN := fmt.Sprintf("%s.tailscale.svc.cluster.local", egressServiceName) + if !strings.EqualFold(svc.Spec.ExternalName, clusterSvcFQDN) { + svc.Spec.ExternalName = clusterSvcFQDN + if err := ehr.Update(ctx, svc); err != nil { + return res, fmt.Errorf("error updating ExternalName service %s/%s: %w", extSvcName, extSvcNamespace, err) + } + } + } + return res, nil +} diff --git a/cmd/k8s-operator/egressha.go b/cmd/k8s-operator/egressha.go new file mode 100644 index 000000000..2d191f859 --- /dev/null +++ b/cmd/k8s-operator/egressha.go @@ -0,0 +1,209 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + "encoding/json" + "fmt" + "math/rand/v2" + "reflect" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + kube "tailscale.com/k8s-operator" + "tailscale.com/util/mak" +) + +// Reconciles Services with tailscale.com/tailnet-ip annotation and +// tailscale.com/proxy-group label. +type egressHAServiceReconciler struct { + client.Client + logger *zap.SugaredLogger +} + +func (ehr *egressHAServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + logger := ehr.logger.With("Service", req.NamespacedName) + logger.Debugf("starting reconcile") + defer logger.Debugf("reconcile finished") + + svc := new(corev1.Service) + err = ehr.Get(ctx, req.NamespacedName, svc) + if apierrors.IsNotFound(err) { + logger.Debugf("Service not found") + return reconcile.Result{}, nil + } + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get Service: %w", err) + } + if !svc.DeletionTimestamp.IsZero() { + logger.Debugf("Service is being deleted") + // TODO: cleanup + return res, nil + } + + // TODO: probably will have to switch to an annotation as else it's too confusing + proxyGroupName := svc.Labels["tailscale.com/proxy-group"] + if proxyGroupName == "" { + logger.Debugf("not reconciling Service without tailscale.com/proxy-group label") + return res, nil + } + // TODO: also validate that the ProxyGroup is for egress service type + + tailnetIP := svc.Annotations["tailscale.com/tailnet-ip"] + if tailnetIP == "" { + logger.Debugf("not reconciling Service without tailscale.com/tailnet-ip annotation") + return res, nil + } + // get the egress services config for these proxies + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-egress-services", proxyGroupName), + Namespace: "tailscale", // hardcoded for this prototype + }, + } + err = ehr.Get(ctx, client.ObjectKeyFromObject(cm), cm) + if apierrors.IsNotFound(err) { + logger.Debugf("egress services ConfigMap for %s not yet created, waiting", proxyGroupName) + return res, nil + } + if err != nil { + return res, fmt.Errorf("error retrieving egress service config map for %s", proxyGroupName) + } + oldCM := cm.DeepCopy() + config := &kube.EgressServices{} + if len(cm.BinaryData["cfg"]) != 0 { + if err := json.Unmarshal(cm.BinaryData["cfg"], config); err != nil { + return res, fmt.Errorf("error unmarshaling egress services config %v: %v", cm.BinaryData["cfg"], err) + } + } + + svcConfig := kube.EgressService{ + TailnetTarget: kube.TailnetTarget{ + IP: tailnetIP, + }, + Ports: []kube.PortMap{}, + } + + oldSvcSpec := svc.DeepCopy() + // TODO: only do this stuff if needed + svcList := &corev1.ServiceList{} + if err := ehr.List(ctx, svcList, client.MatchingLabels(map[string]string{"tailscale.com/proxy-group": proxyGroupName})); err != nil { + return res, fmt.Errorf("error listing Services: %v", err) + } + usedPorts := sets.NewInt32() + for _, s := range svcList.Items { + for _, p := range s.Spec.Ports { + usedPorts.Insert(p.Port) + } + } + // loop over ports, for each port that does not yet have a target port set, allocate one + epsPorts := []discoveryv1.EndpointPort{} + for i, portmap := range svc.Spec.Ports { + if portmap.TargetPort.String() == "" || portmap.TargetPort.IntVal == portmap.Port { + logger.Debugf("need to allocate target port for port %d", portmap.Port) + // TODO: this is why tailscale.com/proxy-group has to be a label- but we can instead add markers in cache and make it an annotation + // get a random port + foundFreePort := false + var suggestPort int32 = 0 + for !foundFreePort { + suggestPort = rand.Int32N(4000) + 1 // don't want 0, otherwise doesn't matter, we're root in the container and this is not going to be a sidecar + if !usedPorts.Has(suggestPort) { + foundFreePort = true + } + } + svc.Spec.Ports[i].TargetPort = intstr.FromInt32(suggestPort) + } + svcConfig.Ports = append(svcConfig.Ports, kube.PortMap{Src: uint16(portmap.Port), Protocol: string(portmap.Protocol), Dst: uint16(svc.Spec.Ports[i].TargetPort.IntVal)}) + epsPorts = append(epsPorts, discoveryv1.EndpointPort{Protocol: &portmap.Protocol, Port: &svc.Spec.Ports[i].TargetPort.IntVal, Name: &svc.Spec.Ports[i].Name}) + } + if !reflect.DeepEqual(oldSvcSpec, svc.Spec) { + // update ports only + if _, err := createOrUpdate(ctx, ehr.Client, svc.Namespace, svc, func(s *corev1.Service) { s.Spec.Ports = svc.Spec.Ports }); err != nil { + return res, fmt.Errorf("error updating Service: %v", err) + } + } else { + logger.Debugf("update to service not needed") + } + // update configmap + egressSvcName := fmt.Sprintf("%s-%s", svc.Name, svc.Namespace) // TODO: or hostname + mak.Set(config, egressSvcName, svcConfig) + bs, err := json.Marshal(config) + if err != nil { + return res, fmt.Errorf("error updating service config: %v", err) + } + mak.Set(&cm.BinaryData, "cfg", bs) + if !reflect.DeepEqual(cm, oldCM) { + if err := ehr.Update(ctx, cm); err != nil { + return res, fmt.Errorf("error updating ConfigMap: %v", err) + } + } + logger.Debugf("updating EndpointSlice, line 151") + // ensure EndpointSlice + // TODO: ports? + eps := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: egressSvcName, + Namespace: "tailscale", + Labels: map[string]string{ + "tailscale.com/egress-service": egressSvcName, + "tailscale.com/proxy-group": proxyGroupName, + "tailscale.com/external-service-name": svc.Name, + "tailscale.com/external-service-namespace": svc.Namespace, + "kubernetes.io/service-name": egressSvcName, + }, + }, + AddressType: "IPv4", + Ports: epsPorts, + } + err = ehr.Get(ctx, client.ObjectKeyFromObject(eps), &discoveryv1.EndpointSlice{}) + if apierrors.IsNotFound(err) { + logger.Debugf("creating EndpointSlice") + if err := ehr.Create(ctx, eps); err != nil { + logger.Debugf("error creating EndpointSlice: %v", err) + return res, fmt.Errorf("error creating EndpointSlice: %v", err) + } + } else if err != nil { + return res, fmt.Errorf("error retrieving EnpointSlice %s: %w", eps.Name, err) + } + // TODO: deal with port update + logger.Debugf("updating ClusterIP Service, line 174") + + // TODO: will need to generate a different name for the ClusterIP + // service as else this will prevent from creating egresses in ts + // namespace. ensure ClusterIP Service + clusterIPSvc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: egressSvcName, + Namespace: "tailscale", + Labels: map[string]string{"tailscale.com/egress-service": egressSvcName, + "tailscale.com/proxy-group": proxyGroupName, + "tailscale.com/external-service-name": svc.Name, + "tailscale.com/external-service-namespace": svc.Namespace, + }, + }, + Spec: corev1.ServiceSpec{Ports: svc.Spec.Ports, Type: corev1.ServiceTypeClusterIP}, + } + // TODO: deal with ports update + err = ehr.Client.Get(ctx, client.ObjectKeyFromObject(clusterIPSvc), &corev1.Service{}) + if apierrors.IsNotFound(err) { + logger.Debugf("creating ClusterIP Service") + if err := ehr.Create(ctx, clusterIPSvc); err != nil { + logger.Debugf("error creating ClusterIP Service: %v", err) + return res, fmt.Errorf("error creating ClusterIP Service: %v", err) + } + } else if err != nil { + return res, fmt.Errorf("error retrieving ClusterIP Service: %v", err) + } + return res, nil +} diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index 18665bd8f..d279358ec 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -40,7 +40,6 @@ "tailscale.com/ipn/store/kubestore" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/tsnet" - "tailscale.com/tstime" "tailscale.com/types/logger" "tailscale.com/version" ) @@ -235,6 +234,7 @@ func runReconcilers(opts reconcilerOpts) { Cache: cache.Options{ ByObject: map[client.Object]cache.ByObject{ &corev1.Secret{}: nsFilter, + &corev1.Pod{}: nsFilter, &corev1.ServiceAccount{}: nsFilter, &corev1.ConfigMap{}: nsFilter, &appsv1.StatefulSet{}: nsFilter, @@ -249,145 +249,35 @@ func runReconcilers(opts reconcilerOpts) { startlog.Fatalf("could not create manager: %v", err) } - svcFilter := handler.EnqueueRequestsFromMapFunc(serviceHandler) - svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc")) - // If a ProxyClass changes, enqueue all Services labeled with that - // ProxyClass's name. - proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog)) - - eventRecorder := mgr.GetEventRecorderFor("tailscale-operator") - ssr := &tailscaleSTSReconciler{ - Client: mgr.GetClient(), - tsnetServer: opts.tsServer, - tsClient: opts.tsClient, - defaultTags: strings.Split(opts.proxyTags, ","), - operatorNamespace: opts.tailscaleNamespace, - proxyImage: opts.proxyImage, - proxyPriorityClassName: opts.proxyPriorityClassName, - tsFirewallMode: opts.proxyFirewallMode, - } + svcFilter := handler.EnqueueRequestsFromMapFunc(egressHAServiceHandler) err = builder. ControllerManagedBy(mgr). - Named("service-reconciler"). + Named("egress-ha-service-reconciler"). Watches(&corev1.Service{}, svcFilter). - Watches(&appsv1.StatefulSet{}, svcChildFilter). - Watches(&corev1.Secret{}, svcChildFilter). - Watches(&tsapi.ProxyClass{}, proxyClassFilterForSvc). - Complete(&ServiceReconciler{ - ssr: ssr, - Client: mgr.GetClient(), - logger: opts.log.Named("service-reconciler"), - isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer, - recorder: eventRecorder, - tsNamespace: opts.tailscaleNamespace, - clock: tstime.DefaultClock{}, - proxyDefaultClass: opts.proxyDefaultClass, + Complete(&egressHAServiceReconciler{ + Client: mgr.GetClient(), + logger: opts.log.Named("egress-ha-service-reconciler"), }) if err != nil { - startlog.Fatalf("could not create service reconciler: %v", err) - } - ingressChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("ingress")) - // If a ProxyClassChanges, enqueue all Ingresses labeled with that - // ProxyClass's name. - proxyClassFilterForIngress := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForIngress(mgr.GetClient(), startlog)) - // Enque Ingress if a managed Service or backend Service associated with a tailscale Ingress changes. - svcHandlerForIngress := handler.EnqueueRequestsFromMapFunc(serviceHandlerForIngress(mgr.GetClient(), startlog)) - err = builder. - ControllerManagedBy(mgr). - For(&networkingv1.Ingress{}). - Watches(&appsv1.StatefulSet{}, ingressChildFilter). - Watches(&corev1.Secret{}, ingressChildFilter). - Watches(&corev1.Service{}, svcHandlerForIngress). - Watches(&tsapi.ProxyClass{}, proxyClassFilterForIngress). - Complete(&IngressReconciler{ - ssr: ssr, - recorder: eventRecorder, - Client: mgr.GetClient(), - logger: opts.log.Named("ingress-reconciler"), - proxyDefaultClass: opts.proxyDefaultClass, - }) - if err != nil { - startlog.Fatalf("could not create ingress reconciler: %v", err) + startlog.Fatalf("could not create egress-ha-service reconciler: %v", err) } - connectorFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("connector")) - // If a ProxyClassChanges, enqueue all Connectors that have - // .spec.proxyClass set to the name of this ProxyClass. - proxyClassFilterForConnector := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForConnector(mgr.GetClient(), startlog)) - err = builder.ControllerManagedBy(mgr). - For(&tsapi.Connector{}). - Watches(&appsv1.StatefulSet{}, connectorFilter). - Watches(&corev1.Secret{}, connectorFilter). - Watches(&tsapi.ProxyClass{}, proxyClassFilterForConnector). - Complete(&ConnectorReconciler{ - ssr: ssr, - recorder: eventRecorder, - Client: mgr.GetClient(), - logger: opts.log.Named("connector-reconciler"), - clock: tstime.DefaultClock{}, + epsFilter := handler.EnqueueRequestsFromMapFunc(egressHAEPSHandler) + assocResourceFilter := handler.EnqueueRequestsFromMapFunc(serviceHandlerForEgressProxyGroupPods(mgr.GetClient(), opts.log)) + err = builder. + ControllerManagedBy(mgr). + Named("egress-ha-eps-reconciler"). + Watches(&discoveryv1.EndpointSlice{}, epsFilter). + Watches(&corev1.Pod{}, assocResourceFilter). + Watches(&corev1.Secret{}, assocResourceFilter). + Complete(&egressHAEndpointSliceReconciler{ + Client: mgr.GetClient(), + logger: opts.log.Named("egress-ha-eps-reconciler"), }) if err != nil { - startlog.Fatalf("could not create connector reconciler: %v", err) - } - // TODO (irbekrm): switch to metadata-only watches for resources whose - // spec we don't need to inspect to reduce memory consumption. - // https://github.com/kubernetes-sigs/controller-runtime/issues/1159 - nameserverFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("nameserver")) - err = builder.ControllerManagedBy(mgr). - For(&tsapi.DNSConfig{}). - Watches(&appsv1.Deployment{}, nameserverFilter). - Watches(&corev1.ConfigMap{}, nameserverFilter). - Watches(&corev1.Service{}, nameserverFilter). - Watches(&corev1.ServiceAccount{}, nameserverFilter). - Complete(&NameserverReconciler{ - recorder: eventRecorder, - tsNamespace: opts.tailscaleNamespace, - Client: mgr.GetClient(), - logger: opts.log.Named("nameserver-reconciler"), - clock: tstime.DefaultClock{}, - }) - if err != nil { - startlog.Fatalf("could not create nameserver reconciler: %v", err) - } - err = builder.ControllerManagedBy(mgr). - For(&tsapi.ProxyClass{}). - Complete(&ProxyClassReconciler{ - Client: mgr.GetClient(), - recorder: eventRecorder, - logger: opts.log.Named("proxyclass-reconciler"), - clock: tstime.DefaultClock{}, - }) - if err != nil { - startlog.Fatal("could not create proxyclass reconciler: %v", err) - } - logger := startlog.Named("dns-records-reconciler-event-handlers") - // On EndpointSlice events, if it is an EndpointSlice for an - // ingress/egress proxy headless Service, reconcile the headless - // Service. - dnsRREpsOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerEndpointSliceHandler) - // On DNSConfig changes, reconcile all headless Services for - // ingress/egress proxies in operator namespace. - dnsRRDNSConfigOpts := handler.EnqueueRequestsFromMapFunc(enqueueAllIngressEgressProxySvcsInNS(opts.tailscaleNamespace, mgr.GetClient(), logger)) - // On Service events, if it is an ingress/egress proxy headless Service, reconcile it. - dnsRRServiceOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerServiceHandler) - // On Ingress events, if it is a tailscale Ingress or if tailscale is the default ingress controller, reconcile the proxy - // headless Service. - dnsRRIngressOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerIngressHandler(opts.tailscaleNamespace, opts.proxyActAsDefaultLoadBalancer, mgr.GetClient(), logger)) - err = builder.ControllerManagedBy(mgr). - Named("dns-records-reconciler"). - Watches(&corev1.Service{}, dnsRRServiceOpts). - Watches(&networkingv1.Ingress{}, dnsRRIngressOpts). - Watches(&discoveryv1.EndpointSlice{}, dnsRREpsOpts). - Watches(&tsapi.DNSConfig{}, dnsRRDNSConfigOpts). - Complete(&dnsRecordsReconciler{ - Client: mgr.GetClient(), - tsNamespace: opts.tailscaleNamespace, - logger: opts.log.Named("dns-records-reconciler"), - isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer, - }) - if err != nil { - startlog.Fatalf("could not create DNS records reconciler: %v", err) + startlog.Fatalf("could not create egress-ha-endpointslice reconciler: %v", err) } + startlog.Infof("Startup complete, operator running, version: %s", version.Long()) if err := mgr.Start(signals.SetupSignalHandler()); err != nil { startlog.Fatalf("could not start manager: %v", err) @@ -658,6 +548,65 @@ func serviceHandlerForIngress(cl client.Client, logger *zap.SugaredLogger) handl return reqs } } +func egressHAServiceHandler(_ context.Context, o client.Object) []reconcile.Request { + _, ok := o.GetLabels()["tailscale.com/proxy-group"] + if !ok { + return nil + } + _, ok = o.GetAnnotations()["tailscale.com/tailnet-ip"] + if !ok { + return nil + } + // If this is not a managed Service we want to enqueue it + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: o.GetNamespace(), + Name: o.GetName(), + }, + }, + } +} +func egressHAEPSHandler(_ context.Context, o client.Object) []reconcile.Request { + _, ok := o.GetLabels()["tailscale.com/egress-service"] + if !ok { + return nil + } + // If this is not a managed Service we want to enqueue it + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: o.GetNamespace(), + Name: o.GetName(), + }, + }, + } +} + +// On egress ProxyGroup Pod events, reconcile all EnpdointSlices for egress services exposed on that ProxyGroup +func serviceHandlerForEgressProxyGroupPods(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc { + return func(_ context.Context, o client.Object) []reconcile.Request { + // TODO: type: egress + pg, ok := o.GetLabels()["tailscale.com/proxy-group"] + if !ok { + return nil + } + epsList := discoveryv1.EndpointSliceList{} + if err := cl.List(context.Background(), &epsList, client.MatchingLabels(map[string]string{"tailscale.com/proxy-group": pg})); err != nil { + logger.Debugf("error listing endpointslices: %v", err) + } + reqs := make([]reconcile.Request, 0) + for _, ep := range epsList.Items { + reqs = append(reqs, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: ep.Namespace, + Name: ep.Name, + }, + }) + } + return reqs + } +} func serviceHandler(_ context.Context, o client.Object) []reconcile.Request { if isManagedByType(o, "svc") { diff --git a/ipn/store/kubestore/store_kube.go b/ipn/store/kubestore/store_kube.go index 0c90d06b3..44c4e6cfb 100644 --- a/ipn/store/kubestore/store_kube.go +++ b/ipn/store/kubestore/store_kube.go @@ -53,7 +53,7 @@ func (s *Store) String() string { return "kube.Store" } // ReadState implements the StateStore interface. func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() secret, err := s.client.GetSecret(ctx, s.secretName) @@ -83,7 +83,7 @@ func sanitizeKey(k ipn.StateKey) string { // WriteState implements the StateStore interface. func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() secret, err := s.client.GetSecret(ctx, s.secretName) diff --git a/k8s-operator/egressservices.go b/k8s-operator/egressservices.go new file mode 100644 index 000000000..2eb818c06 --- /dev/null +++ b/k8s-operator/egressservices.go @@ -0,0 +1,33 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package kube + +// TODO: figure out how to build a mechanism to dynamically update iptables/nftables rules +type EgressServices map[string]EgressService + +type EgressService struct { + TailnetTarget TailnetTarget `json:"tailnetTarget"` + Ports []PortMap `json:"ports"` +} + +type TailnetTarget struct { + IP string `json:"ip,omitempty"` + FQDN string `json:"fqdn,omitempty"` +} + +type PortMap struct { + Protocol string `json:"protocol"` + Src uint16 `json:"src"` + Dst uint16 `json:"dst"` +} + +type EgressServicesStatus map[string]EgressServiceStatus + +type EgressServiceStatus struct { + PodIP string `json:"podIP"` + TailnetTarget TailnetTarget `json:"tailnetTarget"` + Ports []PortMap `json:"ports"` +} diff --git a/kube/client.go b/kube/client.go index 62daa366e..6cc252916 100644 --- a/kube/client.go +++ b/kube/client.go @@ -253,7 +253,7 @@ type JSONPatch struct { // It currently (2023-03-02) only supports "add" and "remove" operations. func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { for _, p := range patch { - if p.Op != "remove" && p.Op != "add" { + if p.Op != "remove" && p.Op != "add" && p.Op != "replace" { panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) } } diff --git a/util/linuxfw/iptables_runner.go b/util/linuxfw/iptables_runner.go index 507f6cd48..a0f4135f5 100644 --- a/util/linuxfw/iptables_runner.go +++ b/util/linuxfw/iptables_runner.go @@ -371,6 +371,28 @@ func (i *iptablesRunner) AddDNATRule(origDst, dst netip.Addr) error { return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String()) } +// For this prototype only - for the real implementation we will probably split +// this into custom chains to make this more readable/easier update-eable. +func (i *iptablesRunner) AddDNATRuleForSrcPrt(origDst, dst netip.Addr, srcPrt, dstPrt uint16, proto uint8) error { + table := i.getIPTByAddr(dst) + protoS, err := protoName(proto) + if err != nil { + return err + } + return table.Insert("nat", "PREROUTING", 1, "-p", protoS, "--dport", fmt.Sprintf("%d", srcPrt), "--destination", origDst.String(), "-j", "DNAT", "--to-destination", fmt.Sprintf("%v:%v", dst, dstPrt)) +} + +func protoName(proto uint8) (string, error) { + switch proto { + case 6: + return "tcp", nil + case 11: + return "udp", nil + default: + return "", fmt.Errorf("unrecognized protocol code: %d", proto) + } +} + func (i *iptablesRunner) AddSNATRuleForDst(src, dst netip.Addr) error { table := i.getIPTByAddr(dst) return table.Insert("nat", "POSTROUTING", 1, "--destination", dst.String(), "-j", "SNAT", "--to-source", src.String()) diff --git a/util/linuxfw/nftables_runner.go b/util/linuxfw/nftables_runner.go index 317d84c12..be7106823 100644 --- a/util/linuxfw/nftables_runner.go +++ b/util/linuxfw/nftables_runner.go @@ -16,6 +16,7 @@ "strings" "github.com/google/nftables" + "github.com/google/nftables/binaryutil" "github.com/google/nftables/expr" "golang.org/x/sys/unix" "tailscale.com/net/tsaddr" @@ -101,6 +102,72 @@ func (n *nftablesRunner) ensurePreroutingChain(dst netip.Addr) (*nftables.Table, } return nat, preroutingCh, nil } +func (n *nftablesRunner) AddDNATRuleForSrcPrt(origDst netip.Addr, dst netip.Addr, origPrt, dstPrt uint16, proto uint8) error { + nat, preroutingCh, err := n.ensurePreroutingChain(dst) + if err != nil { + return err + } + var daddrOffset, fam, dadderLen uint32 + if origDst.Is4() { + daddrOffset = 16 + dadderLen = 4 + fam = unix.NFPROTO_IPV4 + } else { + daddrOffset = 24 + dadderLen = 16 + fam = unix.NFPROTO_IPV6 + } + dnatRule := &nftables.Rule{ + Table: nat, + Chain: preroutingCh, + Exprs: []expr.Any{ + &expr.Payload{ + DestRegister: 1, + Base: expr.PayloadBaseNetworkHeader, + Offset: daddrOffset, + Len: dadderLen, + }, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: origDst.AsSlice(), + }, + &expr.Meta{Key: expr.MetaKeyL4PROTO, Register: 1}, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: []byte{proto}, + }, + &expr.Payload{ + DestRegister: 1, + Base: expr.PayloadBaseTransportHeader, + Offset: 2, + Len: 2, + }, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: binaryutil.BigEndian.PutUint16(origPrt), + }, + &expr.Immediate{ + Register: 1, + Data: dst.AsSlice(), + }, + &expr.Immediate{ + Register: 2, + Data: binaryutil.BigEndian.PutUint16(dstPrt), + }, + &expr.NAT{ + Type: expr.NATTypeDestNAT, + Family: fam, + RegAddrMin: 1, + RegProtoMin: 2, + }, + }, + } + n.conn.InsertRule(dnatRule) + return n.conn.Flush() +} func (n *nftablesRunner) AddDNATRule(origDst netip.Addr, dst netip.Addr) error { nat, preroutingCh, err := n.ensurePreroutingChain(dst) @@ -581,6 +648,8 @@ type NetfilterRunner interface { // DelMagicsockPortRule removes the rule created by AddMagicsockPortRule, // if it exists. DelMagicsockPortRule(port uint16, network string) error + // DNAT traffic to originDst:originPrt to dst:dstPort + AddDNATRuleForSrcPrt(origDst, dst netip.Addr, originPrt, dstPrt uint16, proto uint8) error } // New creates a NetfilterRunner, auto-detecting whether to use