mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-29 04:55:31 +00:00
e8bb5d1be5
Adds a new reconciler that reconciles ExternalName Services that define a tailnet target that should be exposed to cluster workloads on a ProxyGroup's proxies. The reconciler ensures that for each such service, the config mounted to the proxies is updated with the tailnet target definition and that and EndpointSlice and ClusterIP Service are created for the service. Adds a new reconciler that ensures that as proxy Pods become ready to route traffic to a tailnet target, the EndpointSlice for the target is updated with the Pods' endpoints. Updates tailscale/tailscale#13406 Signed-off-by: Irbe Krumina <irbe@tailscale.com>
707 lines
27 KiB
Go
707 lines
27 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
//go:build !plan9
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand/v2"
|
|
"reflect"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
corev1 "k8s.io/api/core/v1"
|
|
discoveryv1 "k8s.io/api/discovery/v1"
|
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apiserver/pkg/storage/names"
|
|
"k8s.io/client-go/tools/record"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
tsoperator "tailscale.com/k8s-operator"
|
|
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
|
|
"tailscale.com/kube/egressservices"
|
|
"tailscale.com/kube/kubetypes"
|
|
"tailscale.com/tstime"
|
|
"tailscale.com/util/clientmetric"
|
|
"tailscale.com/util/mak"
|
|
"tailscale.com/util/set"
|
|
)
|
|
|
|
const (
|
|
reasonEgressSvcInvalid = "EgressSvcInvalid"
|
|
reasonEgressSvcValid = "EgressSvcValid"
|
|
reasonEgressSvcCreationFailed = "EgressSvcCreationFailed"
|
|
reasonProxyGroupNotReady = "ProxyGroupNotReady"
|
|
|
|
labelProxyGroup = "tailscale.com/proxy-group"
|
|
labelProxyGroupType = "tailscale.com/proxy-group-type"
|
|
labelExternalSvcName = "tailscale.com/external-service-name"
|
|
labelExternalSvcNamespace = "tailscale.com/external-service-namespace"
|
|
|
|
labelSvcType = "tailscale.com/svc-type" // ingress or egress
|
|
typeEgress = "egress"
|
|
// maxPorts is the maximum number of ports that can be exposed on a
|
|
// container. In practice this will be ports in range [3000 - 4000). The
|
|
// high range should make it easier to distinguish container ports from
|
|
// the tailnet target ports for debugging purposes (i.e when reading
|
|
// netfilter rules). The limit of 10000 is somewhat arbitrary, the
|
|
// assumption is that this would not be hit in practice.
|
|
maxPorts = 10000
|
|
|
|
indexEgressProxyGroup = ".metadata.annotations.egress-proxy-group"
|
|
|
|
egressSvcsCMNameTemplate = "proxy-cfg-%s"
|
|
)
|
|
|
|
var gaugeEgressServices = clientmetric.NewGauge(kubetypes.MetricEgressServiceCount)
|
|
|
|
// egressSvcsReconciler reconciles user created ExternalName Services that specify a tailnet
|
|
// endpoint that should be exposed to cluster workloads and an egress ProxyGroup
|
|
// on whose proxies it should be exposed.
|
|
type egressSvcsReconciler struct {
|
|
client.Client
|
|
logger *zap.SugaredLogger
|
|
recorder record.EventRecorder
|
|
clock tstime.Clock
|
|
tsNamespace string
|
|
|
|
mu sync.Mutex // protects following
|
|
svcs set.Slice[types.UID] // UIDs of all currently managed egress Services for ProxyGroup
|
|
}
|
|
|
|
// Reconcile reconciles an ExternalName Service that specifies a tailnet target and a ProxyGroup on whose proxies should
|
|
// forward cluster traffic to the target.
|
|
// For an ExternalName Service the reconciler:
|
|
//
|
|
// - for each port N defined on the ExternalName Service, allocates a port X in range [3000- 4000), unique for the
|
|
// ProxyGroup proxies. Proxies will forward cluster traffic received on port N to port M on the tailnet target
|
|
//
|
|
// - creates a ClusterIP Service in the operator's namespace with portmappings for all M->N port pairs. This will allow
|
|
// cluster workloads to send traffic on the user-defined tailnet target port and get it transparently mapped to the
|
|
// randomly selected port on proxy Pods.
|
|
//
|
|
// - creates an EndpointSlice in the operator's namespace with kubernetes.io/service-name label pointing to the
|
|
// ClusterIP Service. The endpoints will get dynamically updates to proxy Pod IPs as the Pods become ready to route
|
|
// traffic to the tailnet target. kubernetes.io/service-name label ensures that kube-proxy sets up routing rules to
|
|
// forward cluster traffic received on ClusterIP Service's IP address to the endpoints (Pod IPs).
|
|
//
|
|
// - updates the egress service config in a ConfigMap mounted to the ProxyGroup proxies with the tailnet target and the
|
|
// portmappings.
|
|
func (esr *egressSvcsReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
|
|
l := esr.logger.With("Service", req.NamespacedName)
|
|
defer l.Info("reconcile finished")
|
|
|
|
svc := new(corev1.Service)
|
|
if err = esr.Get(ctx, req.NamespacedName, svc); apierrors.IsNotFound(err) {
|
|
l.Info("Service not found")
|
|
return res, nil
|
|
} else if err != nil {
|
|
return res, fmt.Errorf("failed to get Service: %w", err)
|
|
}
|
|
|
|
// Name of the 'egress service', meaning the tailnet target.
|
|
tailnetSvc := tailnetSvcName(svc)
|
|
l = l.With("tailnet-service", tailnetSvc)
|
|
|
|
// Note that resources for egress Services are only cleaned up when the
|
|
// Service is actually deleted (and not if, for example, user decides to
|
|
// remove the Tailscale annotation from it). This should be fine- we
|
|
// assume that the egress ExternalName Services are always created for
|
|
// Tailscale operator specifically.
|
|
if !svc.DeletionTimestamp.IsZero() {
|
|
l.Info("Service is being deleted, ensuring resource cleanup")
|
|
return res, esr.maybeCleanup(ctx, svc, l)
|
|
}
|
|
|
|
oldStatus := svc.Status.DeepCopy()
|
|
defer func() {
|
|
if !apiequality.Semantic.DeepEqual(oldStatus, svc.Status) {
|
|
err = errors.Join(err, esr.Status().Update(ctx, svc))
|
|
}
|
|
}()
|
|
|
|
// Validate the user-created ExternalName Service and the associated ProxyGroup.
|
|
if ok, err := esr.validateClusterResources(ctx, svc, l); err != nil {
|
|
return res, fmt.Errorf("error validating cluster resources: %w", err)
|
|
} else if !ok {
|
|
return res, nil
|
|
}
|
|
|
|
if !slices.Contains(svc.Finalizers, FinalizerName) {
|
|
l.Infof("configuring tailnet service") // logged exactly once
|
|
svc.Finalizers = append(svc.Finalizers, FinalizerName)
|
|
if err := esr.Update(ctx, svc); err != nil {
|
|
err := fmt.Errorf("failed to add finalizer: %w", err)
|
|
r := svcConfiguredReason(svc, false, l)
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, metav1.ConditionFalse, r, err.Error(), esr.clock, l)
|
|
return res, err
|
|
}
|
|
esr.mu.Lock()
|
|
esr.svcs.Add(svc.UID)
|
|
gaugeEgressServices.Set(int64(esr.svcs.Len()))
|
|
esr.mu.Unlock()
|
|
}
|
|
|
|
if err := esr.maybeCleanupProxyGroupConfig(ctx, svc, l); err != nil {
|
|
err = fmt.Errorf("cleaning up resources for previous ProxyGroup failed: %w", err)
|
|
r := svcConfiguredReason(svc, false, l)
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, metav1.ConditionFalse, r, err.Error(), esr.clock, l)
|
|
return res, err
|
|
}
|
|
|
|
return res, esr.maybeProvision(ctx, svc, l)
|
|
}
|
|
|
|
func (esr *egressSvcsReconciler) maybeProvision(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) (err error) {
|
|
l.Debug("maybe provision")
|
|
r := svcConfiguredReason(svc, false, l)
|
|
st := metav1.ConditionFalse
|
|
defer func() {
|
|
msg := r
|
|
if st != metav1.ConditionTrue && err != nil {
|
|
msg = err.Error()
|
|
}
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, st, r, msg, esr.clock, l)
|
|
}()
|
|
|
|
crl := egressSvcChildResourceLabels(svc)
|
|
clusterIPSvc, err := getSingleObject[corev1.Service](ctx, esr.Client, esr.tsNamespace, crl)
|
|
if err != nil {
|
|
err = fmt.Errorf("error retrieving ClusterIP Service: %w", err)
|
|
return err
|
|
}
|
|
if clusterIPSvc == nil {
|
|
clusterIPSvc = esr.clusterIPSvcForEgress(crl)
|
|
}
|
|
upToDate := svcConfigurationUpToDate(svc, l)
|
|
provisioned := true
|
|
if !upToDate {
|
|
if clusterIPSvc, provisioned, err = esr.provision(ctx, svc.Annotations[AnnotationProxyGroup], svc, clusterIPSvc, l); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if !provisioned {
|
|
l.Infof("unable to provision cluster resources")
|
|
return nil
|
|
}
|
|
|
|
// Update ExternalName Service to point at the ClusterIP Service.
|
|
clusterDomain := retrieveClusterDomain(esr.tsNamespace, l)
|
|
clusterIPSvcFQDN := fmt.Sprintf("%s.%s.svc.%s", clusterIPSvc.Name, clusterIPSvc.Namespace, clusterDomain)
|
|
if svc.Spec.ExternalName != clusterIPSvcFQDN {
|
|
l.Infof("Configuring ExternalName Service to point to ClusterIP Service %s", clusterIPSvcFQDN)
|
|
svc.Spec.ExternalName = clusterIPSvcFQDN
|
|
if err = esr.Update(ctx, svc); err != nil {
|
|
err = fmt.Errorf("error updating ExternalName Service: %w", err)
|
|
return err
|
|
}
|
|
}
|
|
r = svcConfiguredReason(svc, true, l)
|
|
st = metav1.ConditionTrue
|
|
return nil
|
|
}
|
|
|
|
func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName string, svc, clusterIPSvc *corev1.Service, l *zap.SugaredLogger) (*corev1.Service, bool, error) {
|
|
l.Infof("updating configuration...")
|
|
usedPorts, err := esr.usedPortsForPG(ctx, proxyGroupName)
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("error calculating used ports for ProxyGroup %s: %w", proxyGroupName, err)
|
|
}
|
|
|
|
oldClusterIPSvc := clusterIPSvc.DeepCopy()
|
|
// loop over ClusterIP Service ports, remove any that are not needed.
|
|
for i := len(clusterIPSvc.Spec.Ports) - 1; i >= 0; i-- {
|
|
pm := clusterIPSvc.Spec.Ports[i]
|
|
found := false
|
|
for _, wantsPM := range svc.Spec.Ports {
|
|
if wantsPM.Port == pm.Port && strings.EqualFold(string(wantsPM.Protocol), string(pm.Protocol)) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
l.Debugf("portmapping %s:%d -> %s:%d is no longer required, removing", pm.Protocol, pm.TargetPort.IntVal, pm.Protocol, pm.Port)
|
|
clusterIPSvc.Spec.Ports = slices.Delete(clusterIPSvc.Spec.Ports, i, i+1)
|
|
}
|
|
}
|
|
|
|
// loop over ExternalName Service ports, for each one not found on
|
|
// ClusterIP Service produce new target port and add a portmapping to
|
|
// the ClusterIP Service.
|
|
for _, wantsPM := range svc.Spec.Ports {
|
|
found := false
|
|
for _, gotPM := range clusterIPSvc.Spec.Ports {
|
|
if wantsPM.Port == gotPM.Port && strings.EqualFold(string(wantsPM.Protocol), string(gotPM.Protocol)) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
// Calculate a free port to expose on container and add
|
|
// a new PortMap to the ClusterIP Service.
|
|
if usedPorts.Len() == maxPorts {
|
|
// TODO(irbekrm): refactor to avoid extra reconciles here. Low priority as in practice,
|
|
// the limit should not be hit.
|
|
return nil, false, fmt.Errorf("unable to allocate additional ports on ProxyGroup %s, %d ports already used. Create another ProxyGroup or open an issue if you believe this is unexpected.", proxyGroupName, maxPorts)
|
|
}
|
|
p := unusedPort(usedPorts)
|
|
l.Debugf("mapping tailnet target port %d to container port %d", wantsPM.Port, p)
|
|
usedPorts.Insert(p)
|
|
clusterIPSvc.Spec.Ports = append(clusterIPSvc.Spec.Ports, corev1.ServicePort{
|
|
Name: wantsPM.Name,
|
|
Protocol: wantsPM.Protocol,
|
|
Port: wantsPM.Port,
|
|
TargetPort: intstr.FromInt32(p),
|
|
})
|
|
}
|
|
}
|
|
if !reflect.DeepEqual(clusterIPSvc, oldClusterIPSvc) {
|
|
if clusterIPSvc, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, clusterIPSvc, func(svc *corev1.Service) {
|
|
svc.Labels = clusterIPSvc.Labels
|
|
svc.Spec = clusterIPSvc.Spec
|
|
}); err != nil {
|
|
return nil, false, fmt.Errorf("error ensuring ClusterIP Service: %v", err)
|
|
}
|
|
}
|
|
|
|
crl := egressSvcChildResourceLabels(svc)
|
|
// TODO(irbekrm): support IPv6, but need to investigate how kube proxy
|
|
// sets up Service -> Pod routing when IPv6 is involved.
|
|
crl[discoveryv1.LabelServiceName] = clusterIPSvc.Name
|
|
crl[discoveryv1.LabelManagedBy] = "tailscale.com"
|
|
eps := &discoveryv1.EndpointSlice{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("%s-ipv4", clusterIPSvc.Name),
|
|
Namespace: esr.tsNamespace,
|
|
Labels: crl,
|
|
},
|
|
AddressType: discoveryv1.AddressTypeIPv4,
|
|
Ports: epsPortsFromSvc(clusterIPSvc),
|
|
}
|
|
if eps, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, eps, func(e *discoveryv1.EndpointSlice) {
|
|
e.Labels = eps.Labels
|
|
e.AddressType = eps.AddressType
|
|
e.Ports = eps.Ports
|
|
for _, p := range e.Endpoints {
|
|
p.Conditions.Ready = nil
|
|
}
|
|
}); err != nil {
|
|
return nil, false, fmt.Errorf("error ensuring EndpointSlice: %w", err)
|
|
}
|
|
|
|
cm, cfgs, err := egressSvcsConfigs(ctx, esr.Client, proxyGroupName, esr.tsNamespace)
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("error retrieving egress services configuration: %w", err)
|
|
}
|
|
if cm == nil {
|
|
l.Info("ConfigMap not yet created, waiting..")
|
|
return nil, false, nil
|
|
}
|
|
tailnetSvc := tailnetSvcName(svc)
|
|
gotCfg := (*cfgs)[tailnetSvc]
|
|
wantsCfg := egressSvcCfg(svc, clusterIPSvc)
|
|
if !reflect.DeepEqual(gotCfg, wantsCfg) {
|
|
l.Debugf("updating egress services ConfigMap %s", cm.Name)
|
|
mak.Set(cfgs, tailnetSvc, wantsCfg)
|
|
bs, err := json.Marshal(cfgs)
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("error marshalling egress services configs: %w", err)
|
|
}
|
|
mak.Set(&cm.BinaryData, egressservices.KeyEgressServices, bs)
|
|
if err := esr.Update(ctx, cm); err != nil {
|
|
return nil, false, fmt.Errorf("error updating egress services ConfigMap: %w", err)
|
|
}
|
|
}
|
|
l.Infof("egress service configuration has been updated")
|
|
return clusterIPSvc, true, nil
|
|
}
|
|
|
|
func (esr *egressSvcsReconciler) maybeCleanup(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) error {
|
|
logger.Info("ensuring that resources created for egress service are deleted")
|
|
|
|
// Delete egress service config from the ConfigMap mounted by the proxies.
|
|
if err := esr.ensureEgressSvcCfgDeleted(ctx, svc, logger); err != nil {
|
|
return fmt.Errorf("error deleting egress service config: %w", err)
|
|
}
|
|
|
|
// Delete the ClusterIP Service and EndpointSlice for the egress
|
|
// service.
|
|
types := []client.Object{
|
|
&corev1.Service{},
|
|
&discoveryv1.EndpointSlice{},
|
|
}
|
|
crl := egressSvcChildResourceLabels(svc)
|
|
for _, typ := range types {
|
|
if err := esr.DeleteAllOf(ctx, typ, client.InNamespace(esr.tsNamespace), client.MatchingLabels(crl)); err != nil {
|
|
return fmt.Errorf("error deleting %s: %w", typ, err)
|
|
}
|
|
}
|
|
|
|
ix := slices.Index(svc.Finalizers, FinalizerName)
|
|
if ix != -1 {
|
|
logger.Debug("Removing Tailscale finalizer from Service")
|
|
svc.Finalizers = append(svc.Finalizers[:ix], svc.Finalizers[ix+1:]...)
|
|
if err := esr.Update(ctx, svc); err != nil {
|
|
return fmt.Errorf("failed to remove finalizer: %w", err)
|
|
}
|
|
}
|
|
esr.mu.Lock()
|
|
esr.svcs.Remove(svc.UID)
|
|
gaugeEgressServices.Set(int64(esr.svcs.Len()))
|
|
esr.mu.Unlock()
|
|
logger.Info("successfully cleaned up resources for egress Service")
|
|
return nil
|
|
}
|
|
|
|
func (esr *egressSvcsReconciler) maybeCleanupProxyGroupConfig(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) error {
|
|
wantsProxyGroup := svc.Annotations[AnnotationProxyGroup]
|
|
cond := tsoperator.GetServiceCondition(svc, tsapi.EgressSvcConfigured)
|
|
if cond == nil {
|
|
return nil
|
|
}
|
|
ss := strings.Split(cond.Reason, ":")
|
|
if len(ss) < 3 {
|
|
return nil
|
|
}
|
|
if strings.EqualFold(wantsProxyGroup, ss[2]) {
|
|
return nil
|
|
}
|
|
esr.logger.Infof("egress Service configured on ProxyGroup %s, wants ProxyGroup %s, cleaning up...", ss[2], wantsProxyGroup)
|
|
if err := esr.ensureEgressSvcCfgDeleted(ctx, svc, l); err != nil {
|
|
return fmt.Errorf("error deleting egress service config: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// usedPortsForPG calculates the currently used match ports for ProxyGroup
|
|
// containers. It does that by looking by retrieving all target ports of all
|
|
// ClusterIP Services created for egress services exposed on this ProxyGroup's
|
|
// proxies.
|
|
// TODO(irbekrm): this is currently good enough because we only have a single worker and
|
|
// because these Services are created by us, so we can always expect to get the
|
|
// latest ClusterIP Services via the controller cache. It will not work as well
|
|
// once we split into multiple workers- at that point we probably want to set
|
|
// used ports on ProxyGroup's status.
|
|
func (esr *egressSvcsReconciler) usedPortsForPG(ctx context.Context, pg string) (sets.Set[int32], error) {
|
|
svcList := &corev1.ServiceList{}
|
|
if err := esr.List(ctx, svcList, client.InNamespace(esr.tsNamespace), client.MatchingLabels(map[string]string{labelProxyGroup: pg})); err != nil {
|
|
return nil, fmt.Errorf("error listing Services: %w", err)
|
|
}
|
|
usedPorts := sets.New[int32]()
|
|
for _, s := range svcList.Items {
|
|
for _, p := range s.Spec.Ports {
|
|
usedPorts.Insert(p.TargetPort.IntVal)
|
|
}
|
|
}
|
|
return usedPorts, nil
|
|
}
|
|
|
|
// clusterIPSvcForEgress returns a template for the ClusterIP Service created
|
|
// for an egress service exposed on ProxyGroup proxies. The ClusterIP Service
|
|
// has no selector. Traffic sent to it will be routed to the endpoints defined
|
|
// by an EndpointSlice created for this egress service.
|
|
func (esr *egressSvcsReconciler) clusterIPSvcForEgress(crl map[string]string) *corev1.Service {
|
|
return &corev1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
GenerateName: svcNameBase(crl[labelExternalSvcName]),
|
|
Namespace: esr.tsNamespace,
|
|
Labels: crl,
|
|
},
|
|
Spec: corev1.ServiceSpec{
|
|
Type: corev1.ServiceTypeClusterIP,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (esr *egressSvcsReconciler) ensureEgressSvcCfgDeleted(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) error {
|
|
crl := egressSvcChildResourceLabels(svc)
|
|
cmName := fmt.Sprintf(egressSvcsCMNameTemplate, crl[labelProxyGroup])
|
|
cm := &corev1.ConfigMap{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: cmName,
|
|
Namespace: esr.tsNamespace,
|
|
},
|
|
}
|
|
l := logger.With("ConfigMap", client.ObjectKeyFromObject(cm))
|
|
l.Debug("ensuring that egress service configuration is removed from proxy config")
|
|
if err := esr.Get(ctx, client.ObjectKeyFromObject(cm), cm); apierrors.IsNotFound(err) {
|
|
l.Debugf("ConfigMap not found")
|
|
return nil
|
|
} else if err != nil {
|
|
return fmt.Errorf("error retrieving ConfigMap: %w", err)
|
|
}
|
|
bs := cm.BinaryData[egressservices.KeyEgressServices]
|
|
if len(bs) == 0 {
|
|
l.Debugf("ConfigMap does not contain egress service configs")
|
|
return nil
|
|
}
|
|
cfgs := &egressservices.Configs{}
|
|
if err := json.Unmarshal(bs, cfgs); err != nil {
|
|
return fmt.Errorf("error unmarshalling egress services configs")
|
|
}
|
|
tailnetSvc := tailnetSvcName(svc)
|
|
_, ok := (*cfgs)[tailnetSvc]
|
|
if !ok {
|
|
l.Debugf("ConfigMap does not contain egress service config, likely because it was already deleted")
|
|
return nil
|
|
}
|
|
l.Infof("before deleting config %+#v", *cfgs)
|
|
delete(*cfgs, tailnetSvc)
|
|
l.Infof("after deleting config %+#v", *cfgs)
|
|
bs, err := json.Marshal(cfgs)
|
|
if err != nil {
|
|
return fmt.Errorf("error marshalling egress services configs: %w", err)
|
|
}
|
|
mak.Set(&cm.BinaryData, egressservices.KeyEgressServices, bs)
|
|
return esr.Update(ctx, cm)
|
|
}
|
|
|
|
func (esr *egressSvcsReconciler) validateClusterResources(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) (bool, error) {
|
|
proxyGroupName := svc.Annotations[AnnotationProxyGroup]
|
|
pg := &tsapi.ProxyGroup{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: proxyGroupName,
|
|
},
|
|
}
|
|
if err := esr.Get(ctx, client.ObjectKeyFromObject(pg), pg); apierrors.IsNotFound(err) {
|
|
l.Infof("ProxyGroup %q not found, waiting...", proxyGroupName)
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l)
|
|
return false, nil
|
|
} else if err != nil {
|
|
err := fmt.Errorf("unable to retrieve ProxyGroup %s: %w", proxyGroupName, err)
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, err.Error(), esr.clock, l)
|
|
return false, err
|
|
}
|
|
if !tsoperator.ProxyGroupIsReady(pg) {
|
|
l.Infof("ProxyGroup %s is not ready, waiting...", proxyGroupName)
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l)
|
|
return false, nil
|
|
}
|
|
|
|
if violations := validateEgressService(svc, pg); len(violations) > 0 {
|
|
msg := fmt.Sprintf("invalid egress Service: %s", strings.Join(violations, ", "))
|
|
esr.recorder.Event(svc, corev1.EventTypeWarning, "INVALIDSERVICE", msg)
|
|
l.Info(msg)
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionFalse, reasonEgressSvcInvalid, msg, esr.clock, l)
|
|
return false, nil
|
|
}
|
|
l.Debugf("egress service is valid")
|
|
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionTrue, reasonEgressSvcValid, reasonEgressSvcValid, esr.clock, l)
|
|
return true, nil
|
|
}
|
|
|
|
func validateEgressService(svc *corev1.Service, pg *tsapi.ProxyGroup) []string {
|
|
violations := validateService(svc)
|
|
|
|
// We check that only one of these two is set in the earlier validateService function.
|
|
if svc.Annotations[AnnotationTailnetTargetFQDN] == "" && svc.Annotations[AnnotationTailnetTargetIP] == "" {
|
|
violations = append(violations, fmt.Sprintf("egress Service for ProxyGroup must have one of %s, %s annotations set", AnnotationTailnetTargetFQDN, AnnotationTailnetTargetIP))
|
|
}
|
|
if len(svc.Spec.Ports) == 0 {
|
|
violations = append(violations, "egress Service for ProxyGroup must have at least one target Port specified")
|
|
}
|
|
if svc.Spec.Type != corev1.ServiceTypeExternalName {
|
|
violations = append(violations, fmt.Sprintf("unexpected egress Service type %s. The only supported type is ExternalName.", svc.Spec.Type))
|
|
}
|
|
if pg.Spec.Type != tsapi.ProxyGroupTypeEgress {
|
|
violations = append(violations, fmt.Sprintf("egress Service references ProxyGroup of type %s, must be type %s", pg.Spec.Type, tsapi.ProxyGroupTypeEgress))
|
|
}
|
|
return violations
|
|
}
|
|
|
|
// egressSvcNameBase returns a name base that can be passed to
|
|
// ObjectMeta.GenerateName to generate a name for the ClusterIP Service.
|
|
// The generated name needs to be short enough so that it can later be used to
|
|
// generate a valid Kubernetes resource name for the EndpointSlice in form
|
|
// 'ipv4-|ipv6-<ClusterIP Service name>.
|
|
// A valid Kubernetes resource name must not be longer than 253 chars.
|
|
func svcNameBase(s string) string {
|
|
// -ipv4 - ipv6
|
|
const maxClusterIPSvcNameLength = 253 - 5
|
|
base := fmt.Sprintf("ts-%s-", s)
|
|
generator := names.SimpleNameGenerator
|
|
for {
|
|
generatedName := generator.GenerateName(base)
|
|
excess := len(generatedName) - maxClusterIPSvcNameLength
|
|
if excess <= 0 {
|
|
return base
|
|
}
|
|
base = base[:len(base)-1-excess] // cut off the excess chars
|
|
base = base + "-" // re-instate the dash
|
|
}
|
|
}
|
|
|
|
// unusedPort returns a port in range [3000 - 4000). The caller must ensure that
|
|
// usedPorts does not contain all ports in range [3000 - 4000).
|
|
func unusedPort(usedPorts sets.Set[int32]) int32 {
|
|
foundFreePort := false
|
|
var suggestPort int32
|
|
for !foundFreePort {
|
|
suggestPort = rand.Int32N(maxPorts) + 3000
|
|
if !usedPorts.Has(suggestPort) {
|
|
foundFreePort = true
|
|
}
|
|
}
|
|
return suggestPort
|
|
}
|
|
|
|
// tailnetTargetFromSvc returns a tailnet target for the given egress Service.
|
|
// Service must contain exactly one of tailscale.com/tailnet-ip,
|
|
// tailscale.com/tailnet-fqdn annotations.
|
|
func tailnetTargetFromSvc(svc *corev1.Service) egressservices.TailnetTarget {
|
|
if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" {
|
|
return egressservices.TailnetTarget{
|
|
FQDN: fqdn,
|
|
}
|
|
}
|
|
return egressservices.TailnetTarget{
|
|
IP: svc.Annotations[AnnotationTailnetTargetIP],
|
|
}
|
|
}
|
|
|
|
func egressSvcCfg(externalNameSvc, clusterIPSvc *corev1.Service) egressservices.Config {
|
|
tt := tailnetTargetFromSvc(externalNameSvc)
|
|
cfg := egressservices.Config{TailnetTarget: tt}
|
|
for _, svcPort := range clusterIPSvc.Spec.Ports {
|
|
pm := portMap(svcPort)
|
|
mak.Set(&cfg.Ports, pm, struct{}{})
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
func portMap(p corev1.ServicePort) egressservices.PortMap {
|
|
// TODO (irbekrm): out of bounds check?
|
|
return egressservices.PortMap{Protocol: string(p.Protocol), MatchPort: uint16(p.TargetPort.IntVal), TargetPort: uint16(p.Port)}
|
|
}
|
|
|
|
func isEgressSvcForProxyGroup(obj client.Object) bool {
|
|
s, ok := obj.(*corev1.Service)
|
|
if !ok {
|
|
return false
|
|
}
|
|
annots := s.ObjectMeta.Annotations
|
|
return annots[AnnotationProxyGroup] != "" && (annots[AnnotationTailnetTargetFQDN] != "" || annots[AnnotationTailnetTargetIP] != "")
|
|
}
|
|
|
|
// egressSvcConfig returns a ConfigMap that contains egress services configuration for the provided ProxyGroup as well
|
|
// as unmarshalled configuration from the ConfigMap.
|
|
func egressSvcsConfigs(ctx context.Context, cl client.Client, proxyGroupName, tsNamespace string) (cm *corev1.ConfigMap, cfgs *egressservices.Configs, err error) {
|
|
cmName := fmt.Sprintf(egressSvcsCMNameTemplate, proxyGroupName)
|
|
cm = &corev1.ConfigMap{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: cmName,
|
|
Namespace: tsNamespace,
|
|
},
|
|
}
|
|
if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil {
|
|
return nil, nil, fmt.Errorf("error retrieving egress services ConfigMap %s: %v", cmName, err)
|
|
}
|
|
cfgs = &egressservices.Configs{}
|
|
if len(cm.BinaryData[egressservices.KeyEgressServices]) != 0 {
|
|
if err := json.Unmarshal(cm.BinaryData[egressservices.KeyEgressServices], cfgs); err != nil {
|
|
return nil, nil, fmt.Errorf("error unmarshaling egress services config %v: %w", cm.BinaryData[egressservices.KeyEgressServices], err)
|
|
}
|
|
}
|
|
return cm, cfgs, nil
|
|
}
|
|
|
|
// egressSvcChildResourceLabels returns labels that should be applied to the
|
|
// ClusterIP Service and the EndpointSlice created for the egress service.
|
|
// TODO(irbekrm): we currently set a bunch of labels based on Kubernetes
|
|
// resource names (ProxyGroup, Service). Maximum allowed label length is 63
|
|
// chars whilst the maximum allowed resource name length is 253 chars, so we
|
|
// should probably validate and truncate (?) the names is they are too long.
|
|
func egressSvcChildResourceLabels(svc *corev1.Service) map[string]string {
|
|
return map[string]string{
|
|
LabelManaged: "true",
|
|
labelProxyGroup: svc.Annotations[AnnotationProxyGroup],
|
|
labelExternalSvcName: svc.Name,
|
|
labelExternalSvcNamespace: svc.Namespace,
|
|
labelSvcType: typeEgress,
|
|
}
|
|
}
|
|
|
|
func svcConfigurationUpToDate(svc *corev1.Service, l *zap.SugaredLogger) bool {
|
|
cond := tsoperator.GetServiceCondition(svc, tsapi.EgressSvcConfigured)
|
|
if cond == nil {
|
|
return false
|
|
}
|
|
if cond.Status != metav1.ConditionTrue {
|
|
return false
|
|
}
|
|
wantsReadyReason := svcConfiguredReason(svc, true, l)
|
|
return strings.EqualFold(wantsReadyReason, cond.Reason)
|
|
}
|
|
|
|
func cfgHash(c cfg, l *zap.SugaredLogger) string {
|
|
bs, err := json.Marshal(c)
|
|
if err != nil {
|
|
// Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace.
|
|
l.Infof("error marhsalling Config: %v", err)
|
|
return ""
|
|
}
|
|
h := sha256.New()
|
|
if _, err := h.Write(bs); err != nil {
|
|
// Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace.
|
|
l.Infof("error producing Config hash: %v", err)
|
|
return ""
|
|
}
|
|
return fmt.Sprintf("%x", h.Sum(nil))
|
|
}
|
|
|
|
type cfg struct {
|
|
Ports []corev1.ServicePort `json:"ports"`
|
|
TailnetTarget egressservices.TailnetTarget `json:"tailnetTarget"`
|
|
ProxyGroup string `json:"proxyGroup"`
|
|
}
|
|
|
|
func svcConfiguredReason(svc *corev1.Service, configured bool, l *zap.SugaredLogger) string {
|
|
var r string
|
|
if configured {
|
|
r = "ConfiguredFor:"
|
|
} else {
|
|
r = fmt.Sprintf("ConfigurationFailed:%s", r)
|
|
}
|
|
r += fmt.Sprintf("ProxyGroup:%s", svc.Annotations[AnnotationProxyGroup])
|
|
tt := tailnetTargetFromSvc(svc)
|
|
s := cfg{
|
|
Ports: svc.Spec.Ports,
|
|
TailnetTarget: tt,
|
|
ProxyGroup: svc.Annotations[AnnotationProxyGroup],
|
|
}
|
|
r += fmt.Sprintf(":Config:%s", cfgHash(s, l))
|
|
return r
|
|
}
|
|
|
|
// tailnetSvc accepts and ExternalName Service name and returns a name that will be used to distinguish this tailnet
|
|
// service from other tailnet services exposed to cluster workloads.
|
|
func tailnetSvcName(extNSvc *corev1.Service) string {
|
|
return fmt.Sprintf("%s-%s", extNSvc.Namespace, extNSvc.Name)
|
|
}
|
|
|
|
// epsPortsFromSvc takes the ClusterIP Service created for an egress service and
|
|
// returns its Port array in a form that can be used for an EndpointSlice.
|
|
func epsPortsFromSvc(svc *corev1.Service) (ep []discoveryv1.EndpointPort) {
|
|
for _, p := range svc.Spec.Ports {
|
|
ep = append(ep, discoveryv1.EndpointPort{
|
|
Protocol: &p.Protocol,
|
|
Port: &p.TargetPort.IntVal,
|
|
Name: &p.Name,
|
|
})
|
|
}
|
|
return ep
|
|
}
|