// Copyright (c) Tailscale Inc & AUTHORS // SPDX-License-Identifier: BSD-3-Clause //go:build !plan9 package main import ( "context" "crypto/sha256" "encoding/json" "errors" "fmt" "math/rand/v2" "reflect" "slices" "strings" "sync" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" tsoperator "tailscale.com/k8s-operator" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/kube/egressservices" "tailscale.com/kube/kubetypes" "tailscale.com/tstime" "tailscale.com/util/clientmetric" "tailscale.com/util/mak" "tailscale.com/util/set" ) const ( reasonEgressSvcInvalid = "EgressSvcInvalid" reasonEgressSvcValid = "EgressSvcValid" reasonEgressSvcCreationFailed = "EgressSvcCreationFailed" reasonProxyGroupNotReady = "ProxyGroupNotReady" labelProxyGroup = "tailscale.com/proxy-group" labelSvcType = "tailscale.com/svc-type" // ingress or egress typeEgress = "egress" // maxPorts is the maximum number of ports that can be exposed on a // container. In practice this will be ports in range [10000 - 11000). The // high range should make it easier to distinguish container ports from // the tailnet target ports for debugging purposes (i.e when reading // netfilter rules). The limit of 1000 is somewhat arbitrary, the // assumption is that this would not be hit in practice. maxPorts = 1000 indexEgressProxyGroup = ".metadata.annotations.egress-proxy-group" ) var gaugeEgressServices = clientmetric.NewGauge(kubetypes.MetricEgressServiceCount) // egressSvcsReconciler reconciles user created ExternalName Services that specify a tailnet // endpoint that should be exposed to cluster workloads and an egress ProxyGroup // on whose proxies it should be exposed. type egressSvcsReconciler struct { client.Client logger *zap.SugaredLogger recorder record.EventRecorder clock tstime.Clock tsNamespace string mu sync.Mutex // protects following svcs set.Slice[types.UID] // UIDs of all currently managed egress Services for ProxyGroup } // Reconcile reconciles an ExternalName Service that specifies a tailnet target and a ProxyGroup on whose proxies should // forward cluster traffic to the target. // For an ExternalName Service the reconciler: // // - for each port N defined on the ExternalName Service, allocates a port X in range [3000- 4000), unique for the // ProxyGroup proxies. Proxies will forward cluster traffic received on port N to port M on the tailnet target // // - creates a ClusterIP Service in the operator's namespace with portmappings for all M->N port pairs. This will allow // cluster workloads to send traffic on the user-defined tailnet target port and get it transparently mapped to the // randomly selected port on proxy Pods. // // - creates an EndpointSlice in the operator's namespace with kubernetes.io/service-name label pointing to the // ClusterIP Service. The endpoints will get dynamically updates to proxy Pod IPs as the Pods become ready to route // traffic to the tailnet target. kubernetes.io/service-name label ensures that kube-proxy sets up routing rules to // forward cluster traffic received on ClusterIP Service's IP address to the endpoints (Pod IPs). // // - updates the egress service config in a ConfigMap mounted to the ProxyGroup proxies with the tailnet target and the // portmappings. func (esr *egressSvcsReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { l := esr.logger.With("Service", req.NamespacedName) defer l.Info("reconcile finished") svc := new(corev1.Service) if err = esr.Get(ctx, req.NamespacedName, svc); apierrors.IsNotFound(err) { l.Info("Service not found") return res, nil } else if err != nil { return res, fmt.Errorf("failed to get Service: %w", err) } // Name of the 'egress service', meaning the tailnet target. tailnetSvc := tailnetSvcName(svc) l = l.With("tailnet-service", tailnetSvc) // Note that resources for egress Services are only cleaned up when the // Service is actually deleted (and not if, for example, user decides to // remove the Tailscale annotation from it). This should be fine- we // assume that the egress ExternalName Services are always created for // Tailscale operator specifically. if !svc.DeletionTimestamp.IsZero() { l.Info("Service is being deleted, ensuring resource cleanup") return res, esr.maybeCleanup(ctx, svc, l) } oldStatus := svc.Status.DeepCopy() defer func() { if !apiequality.Semantic.DeepEqual(oldStatus, svc.Status) { err = errors.Join(err, esr.Status().Update(ctx, svc)) } }() // Validate the user-created ExternalName Service and the associated ProxyGroup. if ok, err := esr.validateClusterResources(ctx, svc, l); err != nil { return res, fmt.Errorf("error validating cluster resources: %w", err) } else if !ok { return res, nil } if !slices.Contains(svc.Finalizers, FinalizerName) { svc.Finalizers = append(svc.Finalizers, FinalizerName) if err := esr.updateSvcSpec(ctx, svc); err != nil { err := fmt.Errorf("failed to add finalizer: %w", err) r := svcConfiguredReason(svc, false, l) tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, metav1.ConditionFalse, r, err.Error(), esr.clock, l) return res, err } esr.mu.Lock() esr.svcs.Add(svc.UID) gaugeEgressServices.Set(int64(esr.svcs.Len())) esr.mu.Unlock() } if err := esr.maybeCleanupProxyGroupConfig(ctx, svc, l); err != nil { err = fmt.Errorf("cleaning up resources for previous ProxyGroup failed: %w", err) r := svcConfiguredReason(svc, false, l) tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, metav1.ConditionFalse, r, err.Error(), esr.clock, l) return res, err } return res, esr.maybeProvision(ctx, svc, l) } func (esr *egressSvcsReconciler) maybeProvision(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) (err error) { r := svcConfiguredReason(svc, false, l) st := metav1.ConditionFalse defer func() { msg := r if st != metav1.ConditionTrue && err != nil { msg = err.Error() } tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, st, r, msg, esr.clock, l) }() crl := egressSvcChildResourceLabels(svc) clusterIPSvc, err := getSingleObject[corev1.Service](ctx, esr.Client, esr.tsNamespace, crl) if err != nil { err = fmt.Errorf("error retrieving ClusterIP Service: %w", err) return err } if clusterIPSvc == nil { clusterIPSvc = esr.clusterIPSvcForEgress(crl) } upToDate := svcConfigurationUpToDate(svc, l) provisioned := true if !upToDate { if clusterIPSvc, provisioned, err = esr.provision(ctx, svc.Annotations[AnnotationProxyGroup], svc, clusterIPSvc, l); err != nil { return err } } if !provisioned { l.Infof("unable to provision cluster resources") return nil } // Update ExternalName Service to point at the ClusterIP Service. clusterDomain := retrieveClusterDomain(esr.tsNamespace, l) clusterIPSvcFQDN := fmt.Sprintf("%s.%s.svc.%s", clusterIPSvc.Name, clusterIPSvc.Namespace, clusterDomain) if svc.Spec.ExternalName != clusterIPSvcFQDN { l.Infof("Configuring ExternalName Service to point to ClusterIP Service %s", clusterIPSvcFQDN) svc.Spec.ExternalName = clusterIPSvcFQDN if err = esr.updateSvcSpec(ctx, svc); err != nil { err = fmt.Errorf("error updating ExternalName Service: %w", err) return err } } r = svcConfiguredReason(svc, true, l) st = metav1.ConditionTrue return nil } func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName string, svc, clusterIPSvc *corev1.Service, l *zap.SugaredLogger) (*corev1.Service, bool, error) { l.Infof("updating configuration...") usedPorts, err := esr.usedPortsForPG(ctx, proxyGroupName) if err != nil { return nil, false, fmt.Errorf("error calculating used ports for ProxyGroup %s: %w", proxyGroupName, err) } oldClusterIPSvc := clusterIPSvc.DeepCopy() // loop over ClusterIP Service ports, remove any that are not needed. for i := len(clusterIPSvc.Spec.Ports) - 1; i >= 0; i-- { pm := clusterIPSvc.Spec.Ports[i] found := false for _, wantsPM := range svc.Spec.Ports { if wantsPM.Port == pm.Port && strings.EqualFold(string(wantsPM.Protocol), string(pm.Protocol)) { // We don't use the port name to distinguish this port internally, but Kubernetes // require that, for Service ports with more than one name each port is uniquely named. // So we can always pick the port name from the ExternalName Service as at this point we // know that those are valid names because Kuberentes already validated it once. Note // that users could have changed an unnamed port to a named port and might have changed // port names- this should still work. // https://kubernetes.io/docs/concepts/services-networking/service/#multi-port-services // See also https://github.com/tailscale/tailscale/issues/13406#issuecomment-2507230388 clusterIPSvc.Spec.Ports[i].Name = wantsPM.Name found = true break } } if !found { l.Debugf("portmapping %s:%d -> %s:%d is no longer required, removing", pm.Protocol, pm.TargetPort.IntVal, pm.Protocol, pm.Port) clusterIPSvc.Spec.Ports = slices.Delete(clusterIPSvc.Spec.Ports, i, i+1) } } // loop over ExternalName Service ports, for each one not found on // ClusterIP Service produce new target port and add a portmapping to // the ClusterIP Service. for _, wantsPM := range svc.Spec.Ports { found := false for _, gotPM := range clusterIPSvc.Spec.Ports { if wantsPM.Port == gotPM.Port && strings.EqualFold(string(wantsPM.Protocol), string(gotPM.Protocol)) { found = true break } } if !found { // Calculate a free port to expose on container and add // a new PortMap to the ClusterIP Service. if usedPorts.Len() >= maxPorts { // TODO(irbekrm): refactor to avoid extra reconciles here. Low priority as in practice, // the limit should not be hit. return nil, false, fmt.Errorf("unable to allocate additional ports on ProxyGroup %s, %d ports already used. Create another ProxyGroup or open an issue if you believe this is unexpected.", proxyGroupName, maxPorts) } p := unusedPort(usedPorts) l.Debugf("mapping tailnet target port %d to container port %d", wantsPM.Port, p) usedPorts.Insert(p) clusterIPSvc.Spec.Ports = append(clusterIPSvc.Spec.Ports, corev1.ServicePort{ Name: wantsPM.Name, Protocol: wantsPM.Protocol, Port: wantsPM.Port, TargetPort: intstr.FromInt32(p), }) } } if !reflect.DeepEqual(clusterIPSvc, oldClusterIPSvc) { if clusterIPSvc, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, clusterIPSvc, func(svc *corev1.Service) { svc.Labels = clusterIPSvc.Labels svc.Spec = clusterIPSvc.Spec }); err != nil { return nil, false, fmt.Errorf("error ensuring ClusterIP Service: %v", err) } } crl := egressSvcEpsLabels(svc, clusterIPSvc) // TODO(irbekrm): support IPv6, but need to investigate how kube proxy // sets up Service -> Pod routing when IPv6 is involved. eps := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-ipv4", clusterIPSvc.Name), Namespace: esr.tsNamespace, Labels: crl, }, AddressType: discoveryv1.AddressTypeIPv4, Ports: epsPortsFromSvc(clusterIPSvc), } if eps, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, eps, func(e *discoveryv1.EndpointSlice) { e.Labels = eps.Labels e.AddressType = eps.AddressType e.Ports = eps.Ports for _, p := range e.Endpoints { p.Conditions.Ready = nil } }); err != nil { return nil, false, fmt.Errorf("error ensuring EndpointSlice: %w", err) } cm, cfgs, err := egressSvcsConfigs(ctx, esr.Client, proxyGroupName, esr.tsNamespace) if err != nil { return nil, false, fmt.Errorf("error retrieving egress services configuration: %w", err) } if cm == nil { l.Info("ConfigMap not yet created, waiting..") return nil, false, nil } tailnetSvc := tailnetSvcName(svc) gotCfg := (*cfgs)[tailnetSvc] wantsCfg := egressSvcCfg(svc, clusterIPSvc) if !reflect.DeepEqual(gotCfg, wantsCfg) { l.Debugf("updating egress services ConfigMap %s", cm.Name) mak.Set(cfgs, tailnetSvc, wantsCfg) bs, err := json.Marshal(cfgs) if err != nil { return nil, false, fmt.Errorf("error marshalling egress services configs: %w", err) } mak.Set(&cm.BinaryData, egressservices.KeyEgressServices, bs) if err := esr.Update(ctx, cm); err != nil { return nil, false, fmt.Errorf("error updating egress services ConfigMap: %w", err) } } l.Infof("egress service configuration has been updated") return clusterIPSvc, true, nil } func (esr *egressSvcsReconciler) maybeCleanup(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) error { logger.Info("ensuring that resources created for egress service are deleted") // Delete egress service config from the ConfigMap mounted by the proxies. if err := esr.ensureEgressSvcCfgDeleted(ctx, svc, logger); err != nil { return fmt.Errorf("error deleting egress service config: %w", err) } // Delete the ClusterIP Service and EndpointSlice for the egress // service. types := []client.Object{ &corev1.Service{}, &discoveryv1.EndpointSlice{}, } crl := egressSvcChildResourceLabels(svc) for _, typ := range types { if err := esr.DeleteAllOf(ctx, typ, client.InNamespace(esr.tsNamespace), client.MatchingLabels(crl)); err != nil { return fmt.Errorf("error deleting %s: %w", typ, err) } } ix := slices.Index(svc.Finalizers, FinalizerName) if ix != -1 { logger.Debug("Removing Tailscale finalizer from Service") svc.Finalizers = append(svc.Finalizers[:ix], svc.Finalizers[ix+1:]...) if err := esr.Update(ctx, svc); err != nil { return fmt.Errorf("failed to remove finalizer: %w", err) } } esr.mu.Lock() esr.svcs.Remove(svc.UID) gaugeEgressServices.Set(int64(esr.svcs.Len())) esr.mu.Unlock() logger.Info("successfully cleaned up resources for egress Service") return nil } func (esr *egressSvcsReconciler) maybeCleanupProxyGroupConfig(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) error { wantsProxyGroup := svc.Annotations[AnnotationProxyGroup] cond := tsoperator.GetServiceCondition(svc, tsapi.EgressSvcConfigured) if cond == nil { return nil } ss := strings.Split(cond.Reason, ":") if len(ss) < 3 { return nil } if strings.EqualFold(wantsProxyGroup, ss[2]) { return nil } esr.logger.Infof("egress Service configured on ProxyGroup %s, wants ProxyGroup %s, cleaning up...", ss[2], wantsProxyGroup) if err := esr.ensureEgressSvcCfgDeleted(ctx, svc, l); err != nil { return fmt.Errorf("error deleting egress service config: %w", err) } return nil } // usedPortsForPG calculates the currently used match ports for ProxyGroup // containers. It does that by looking by retrieving all target ports of all // ClusterIP Services created for egress services exposed on this ProxyGroup's // proxies. // TODO(irbekrm): this is currently good enough because we only have a single worker and // because these Services are created by us, so we can always expect to get the // latest ClusterIP Services via the controller cache. It will not work as well // once we split into multiple workers- at that point we probably want to set // used ports on ProxyGroup's status. func (esr *egressSvcsReconciler) usedPortsForPG(ctx context.Context, pg string) (sets.Set[int32], error) { svcList := &corev1.ServiceList{} if err := esr.List(ctx, svcList, client.InNamespace(esr.tsNamespace), client.MatchingLabels(map[string]string{labelProxyGroup: pg})); err != nil { return nil, fmt.Errorf("error listing Services: %w", err) } usedPorts := sets.New[int32]() for _, s := range svcList.Items { for _, p := range s.Spec.Ports { usedPorts.Insert(p.TargetPort.IntVal) } } return usedPorts, nil } // clusterIPSvcForEgress returns a template for the ClusterIP Service created // for an egress service exposed on ProxyGroup proxies. The ClusterIP Service // has no selector. Traffic sent to it will be routed to the endpoints defined // by an EndpointSlice created for this egress service. func (esr *egressSvcsReconciler) clusterIPSvcForEgress(crl map[string]string) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ GenerateName: svcNameBase(crl[LabelParentName]), Namespace: esr.tsNamespace, Labels: crl, }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, }, } } func (esr *egressSvcsReconciler) ensureEgressSvcCfgDeleted(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) error { crl := egressSvcChildResourceLabels(svc) cmName := pgEgressCMName(crl[labelProxyGroup]) cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: cmName, Namespace: esr.tsNamespace, }, } l := logger.With("ConfigMap", client.ObjectKeyFromObject(cm)) l.Debug("ensuring that egress service configuration is removed from proxy config") if err := esr.Get(ctx, client.ObjectKeyFromObject(cm), cm); apierrors.IsNotFound(err) { l.Debugf("ConfigMap not found") return nil } else if err != nil { return fmt.Errorf("error retrieving ConfigMap: %w", err) } bs := cm.BinaryData[egressservices.KeyEgressServices] if len(bs) == 0 { l.Debugf("ConfigMap does not contain egress service configs") return nil } cfgs := &egressservices.Configs{} if err := json.Unmarshal(bs, cfgs); err != nil { return fmt.Errorf("error unmarshalling egress services configs") } tailnetSvc := tailnetSvcName(svc) _, ok := (*cfgs)[tailnetSvc] if !ok { l.Debugf("ConfigMap does not contain egress service config, likely because it was already deleted") return nil } l.Infof("before deleting config %+#v", *cfgs) delete(*cfgs, tailnetSvc) l.Infof("after deleting config %+#v", *cfgs) bs, err := json.Marshal(cfgs) if err != nil { return fmt.Errorf("error marshalling egress services configs: %w", err) } mak.Set(&cm.BinaryData, egressservices.KeyEgressServices, bs) return esr.Update(ctx, cm) } func (esr *egressSvcsReconciler) validateClusterResources(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) (bool, error) { proxyGroupName := svc.Annotations[AnnotationProxyGroup] pg := &tsapi.ProxyGroup{ ObjectMeta: metav1.ObjectMeta{ Name: proxyGroupName, }, } if err := esr.Get(ctx, client.ObjectKeyFromObject(pg), pg); apierrors.IsNotFound(err) { l.Infof("ProxyGroup %q not found, waiting...", proxyGroupName) tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l) tsoperator.RemoveServiceCondition(svc, tsapi.EgressSvcConfigured) return false, nil } else if err != nil { err := fmt.Errorf("unable to retrieve ProxyGroup %s: %w", proxyGroupName, err) tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, err.Error(), esr.clock, l) tsoperator.RemoveServiceCondition(svc, tsapi.EgressSvcConfigured) return false, err } if !tsoperator.ProxyGroupIsReady(pg) { l.Infof("ProxyGroup %s is not ready, waiting...", proxyGroupName) tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l) tsoperator.RemoveServiceCondition(svc, tsapi.EgressSvcConfigured) return false, nil } if violations := validateEgressService(svc, pg); len(violations) > 0 { msg := fmt.Sprintf("invalid egress Service: %s", strings.Join(violations, ", ")) esr.recorder.Event(svc, corev1.EventTypeWarning, "INVALIDSERVICE", msg) l.Info(msg) tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionFalse, reasonEgressSvcInvalid, msg, esr.clock, l) tsoperator.RemoveServiceCondition(svc, tsapi.EgressSvcConfigured) return false, nil } l.Debugf("egress service is valid") tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionTrue, reasonEgressSvcValid, reasonEgressSvcValid, esr.clock, l) return true, nil } func validateEgressService(svc *corev1.Service, pg *tsapi.ProxyGroup) []string { violations := validateService(svc) // We check that only one of these two is set in the earlier validateService function. if svc.Annotations[AnnotationTailnetTargetFQDN] == "" && svc.Annotations[AnnotationTailnetTargetIP] == "" { violations = append(violations, fmt.Sprintf("egress Service for ProxyGroup must have one of %s, %s annotations set", AnnotationTailnetTargetFQDN, AnnotationTailnetTargetIP)) } if len(svc.Spec.Ports) == 0 { violations = append(violations, "egress Service for ProxyGroup must have at least one target Port specified") } if svc.Spec.Type != corev1.ServiceTypeExternalName { violations = append(violations, fmt.Sprintf("unexpected egress Service type %s. The only supported type is ExternalName.", svc.Spec.Type)) } if pg.Spec.Type != tsapi.ProxyGroupTypeEgress { violations = append(violations, fmt.Sprintf("egress Service references ProxyGroup of type %s, must be type %s", pg.Spec.Type, tsapi.ProxyGroupTypeEgress)) } return violations } // egressSvcNameBase returns a name base that can be passed to // ObjectMeta.GenerateName to generate a name for the ClusterIP Service. // The generated name needs to be short enough so that it can later be used to // generate a valid Kubernetes resource name for the EndpointSlice in form // 'ipv4-|ipv6-. // A valid Kubernetes resource name must not be longer than 253 chars. func svcNameBase(s string) string { // -ipv4 - ipv6 const maxClusterIPSvcNameLength = 253 - 5 base := fmt.Sprintf("ts-%s-", s) generator := names.SimpleNameGenerator for { generatedName := generator.GenerateName(base) excess := len(generatedName) - maxClusterIPSvcNameLength if excess <= 0 { return base } base = base[:len(base)-1-excess] // cut off the excess chars base = base + "-" // re-instate the dash } } // unusedPort returns a port in range [10000 - 11000). The caller must ensure that // usedPorts does not contain all ports in range [10000 - 11000). func unusedPort(usedPorts sets.Set[int32]) int32 { foundFreePort := false var suggestPort int32 for !foundFreePort { suggestPort = rand.Int32N(maxPorts) + 10000 if !usedPorts.Has(suggestPort) { foundFreePort = true } } return suggestPort } // tailnetTargetFromSvc returns a tailnet target for the given egress Service. // Service must contain exactly one of tailscale.com/tailnet-ip, // tailscale.com/tailnet-fqdn annotations. func tailnetTargetFromSvc(svc *corev1.Service) egressservices.TailnetTarget { if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" { return egressservices.TailnetTarget{ FQDN: fqdn, } } return egressservices.TailnetTarget{ IP: svc.Annotations[AnnotationTailnetTargetIP], } } func egressSvcCfg(externalNameSvc, clusterIPSvc *corev1.Service) egressservices.Config { tt := tailnetTargetFromSvc(externalNameSvc) cfg := egressservices.Config{TailnetTarget: tt} for _, svcPort := range clusterIPSvc.Spec.Ports { pm := portMap(svcPort) mak.Set(&cfg.Ports, pm, struct{}{}) } return cfg } func portMap(p corev1.ServicePort) egressservices.PortMap { // TODO (irbekrm): out of bounds check? return egressservices.PortMap{Protocol: string(p.Protocol), MatchPort: uint16(p.TargetPort.IntVal), TargetPort: uint16(p.Port)} } func isEgressSvcForProxyGroup(obj client.Object) bool { s, ok := obj.(*corev1.Service) if !ok { return false } annots := s.ObjectMeta.Annotations return annots[AnnotationProxyGroup] != "" && (annots[AnnotationTailnetTargetFQDN] != "" || annots[AnnotationTailnetTargetIP] != "") } // egressSvcConfig returns a ConfigMap that contains egress services configuration for the provided ProxyGroup as well // as unmarshalled configuration from the ConfigMap. func egressSvcsConfigs(ctx context.Context, cl client.Client, proxyGroupName, tsNamespace string) (cm *corev1.ConfigMap, cfgs *egressservices.Configs, err error) { name := pgEgressCMName(proxyGroupName) cm = &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: tsNamespace, }, } if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil { return nil, nil, fmt.Errorf("error retrieving egress services ConfigMap %s: %v", name, err) } cfgs = &egressservices.Configs{} if len(cm.BinaryData[egressservices.KeyEgressServices]) != 0 { if err := json.Unmarshal(cm.BinaryData[egressservices.KeyEgressServices], cfgs); err != nil { return nil, nil, fmt.Errorf("error unmarshaling egress services config %v: %w", cm.BinaryData[egressservices.KeyEgressServices], err) } } return cm, cfgs, nil } // egressSvcChildResourceLabels returns labels that should be applied to the // ClusterIP Service and the EndpointSlice created for the egress service. // TODO(irbekrm): we currently set a bunch of labels based on Kubernetes // resource names (ProxyGroup, Service). Maximum allowed label length is 63 // chars whilst the maximum allowed resource name length is 253 chars, so we // should probably validate and truncate (?) the names is they are too long. func egressSvcChildResourceLabels(svc *corev1.Service) map[string]string { return map[string]string{ LabelManaged: "true", LabelParentType: "svc", LabelParentName: svc.Name, LabelParentNamespace: svc.Namespace, labelProxyGroup: svc.Annotations[AnnotationProxyGroup], labelSvcType: typeEgress, } } // egressEpsLabels returns labels to be added to an EndpointSlice created for an egress service. func egressSvcEpsLabels(extNSvc, clusterIPSvc *corev1.Service) map[string]string { l := egressSvcChildResourceLabels(extNSvc) // Adding this label is what makes kube proxy set up rules to route traffic sent to the clusterIP Service to the // endpoints defined on this EndpointSlice. // https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership l[discoveryv1.LabelServiceName] = clusterIPSvc.Name // Kubernetes recommends setting this label. // https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#management l[discoveryv1.LabelManagedBy] = "tailscale.com" return l } func svcConfigurationUpToDate(svc *corev1.Service, l *zap.SugaredLogger) bool { cond := tsoperator.GetServiceCondition(svc, tsapi.EgressSvcConfigured) if cond == nil { return false } if cond.Status != metav1.ConditionTrue { return false } wantsReadyReason := svcConfiguredReason(svc, true, l) return strings.EqualFold(wantsReadyReason, cond.Reason) } func cfgHash(c cfg, l *zap.SugaredLogger) string { bs, err := json.Marshal(c) if err != nil { // Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace. l.Infof("error marhsalling Config: %v", err) return "" } h := sha256.New() if _, err := h.Write(bs); err != nil { // Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace. l.Infof("error producing Config hash: %v", err) return "" } return fmt.Sprintf("%x", h.Sum(nil)) } type cfg struct { Ports []corev1.ServicePort `json:"ports"` TailnetTarget egressservices.TailnetTarget `json:"tailnetTarget"` ProxyGroup string `json:"proxyGroup"` } func svcConfiguredReason(svc *corev1.Service, configured bool, l *zap.SugaredLogger) string { var r string if configured { r = "ConfiguredFor:" } else { r = fmt.Sprintf("ConfigurationFailed:%s", r) } r += fmt.Sprintf("ProxyGroup:%s", svc.Annotations[AnnotationProxyGroup]) tt := tailnetTargetFromSvc(svc) s := cfg{ Ports: svc.Spec.Ports, TailnetTarget: tt, ProxyGroup: svc.Annotations[AnnotationProxyGroup], } r += fmt.Sprintf(":Config:%s", cfgHash(s, l)) return r } // tailnetSvc accepts and ExternalName Service name and returns a name that will be used to distinguish this tailnet // service from other tailnet services exposed to cluster workloads. func tailnetSvcName(extNSvc *corev1.Service) string { return fmt.Sprintf("%s-%s", extNSvc.Namespace, extNSvc.Name) } // epsPortsFromSvc takes the ClusterIP Service created for an egress service and // returns its Port array in a form that can be used for an EndpointSlice. func epsPortsFromSvc(svc *corev1.Service) (ep []discoveryv1.EndpointPort) { for _, p := range svc.Spec.Ports { ep = append(ep, discoveryv1.EndpointPort{ Protocol: &p.Protocol, Port: &p.TargetPort.IntVal, Name: &p.Name, }) } return ep } // updateSvcSpec ensures that the given Service's spec is updated in cluster, but the local Service object still retains // the not-yet-applied status. // TODO(irbekrm): once we do SSA for these patch updates, this will no longer be needed. func (esr *egressSvcsReconciler) updateSvcSpec(ctx context.Context, svc *corev1.Service) error { st := svc.Status.DeepCopy() err := esr.Update(ctx, svc) svc.Status = *st return err }