2024-10-04 13:11:35 +01:00
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"reflect"
"slices"
"strings"
"sync"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/kubetypes"
"tailscale.com/tstime"
"tailscale.com/util/clientmetric"
"tailscale.com/util/mak"
"tailscale.com/util/set"
)
const (
reasonEgressSvcInvalid = "EgressSvcInvalid"
reasonEgressSvcValid = "EgressSvcValid"
reasonEgressSvcCreationFailed = "EgressSvcCreationFailed"
reasonProxyGroupNotReady = "ProxyGroupNotReady"
2024-10-07 20:12:56 +01:00
labelProxyGroup = "tailscale.com/proxy-group"
2024-10-04 13:11:35 +01:00
labelSvcType = "tailscale.com/svc-type" // ingress or egress
typeEgress = "egress"
// maxPorts is the maximum number of ports that can be exposed on a
// container. In practice this will be ports in range [3000 - 4000). The
// high range should make it easier to distinguish container ports from
// the tailnet target ports for debugging purposes (i.e when reading
// netfilter rules). The limit of 10000 is somewhat arbitrary, the
// assumption is that this would not be hit in practice.
maxPorts = 10000
indexEgressProxyGroup = ".metadata.annotations.egress-proxy-group"
)
var gaugeEgressServices = clientmetric . NewGauge ( kubetypes . MetricEgressServiceCount )
// egressSvcsReconciler reconciles user created ExternalName Services that specify a tailnet
// endpoint that should be exposed to cluster workloads and an egress ProxyGroup
// on whose proxies it should be exposed.
type egressSvcsReconciler struct {
client . Client
logger * zap . SugaredLogger
recorder record . EventRecorder
clock tstime . Clock
tsNamespace string
mu sync . Mutex // protects following
svcs set . Slice [ types . UID ] // UIDs of all currently managed egress Services for ProxyGroup
}
// Reconcile reconciles an ExternalName Service that specifies a tailnet target and a ProxyGroup on whose proxies should
// forward cluster traffic to the target.
// For an ExternalName Service the reconciler:
//
// - for each port N defined on the ExternalName Service, allocates a port X in range [3000- 4000), unique for the
// ProxyGroup proxies. Proxies will forward cluster traffic received on port N to port M on the tailnet target
//
// - creates a ClusterIP Service in the operator's namespace with portmappings for all M->N port pairs. This will allow
// cluster workloads to send traffic on the user-defined tailnet target port and get it transparently mapped to the
// randomly selected port on proxy Pods.
//
// - creates an EndpointSlice in the operator's namespace with kubernetes.io/service-name label pointing to the
// ClusterIP Service. The endpoints will get dynamically updates to proxy Pod IPs as the Pods become ready to route
// traffic to the tailnet target. kubernetes.io/service-name label ensures that kube-proxy sets up routing rules to
// forward cluster traffic received on ClusterIP Service's IP address to the endpoints (Pod IPs).
//
// - updates the egress service config in a ConfigMap mounted to the ProxyGroup proxies with the tailnet target and the
// portmappings.
func ( esr * egressSvcsReconciler ) Reconcile ( ctx context . Context , req reconcile . Request ) ( res reconcile . Result , err error ) {
l := esr . logger . With ( "Service" , req . NamespacedName )
defer l . Info ( "reconcile finished" )
svc := new ( corev1 . Service )
if err = esr . Get ( ctx , req . NamespacedName , svc ) ; apierrors . IsNotFound ( err ) {
l . Info ( "Service not found" )
return res , nil
} else if err != nil {
return res , fmt . Errorf ( "failed to get Service: %w" , err )
}
// Name of the 'egress service', meaning the tailnet target.
tailnetSvc := tailnetSvcName ( svc )
l = l . With ( "tailnet-service" , tailnetSvc )
// Note that resources for egress Services are only cleaned up when the
// Service is actually deleted (and not if, for example, user decides to
// remove the Tailscale annotation from it). This should be fine- we
// assume that the egress ExternalName Services are always created for
// Tailscale operator specifically.
if ! svc . DeletionTimestamp . IsZero ( ) {
l . Info ( "Service is being deleted, ensuring resource cleanup" )
return res , esr . maybeCleanup ( ctx , svc , l )
}
oldStatus := svc . Status . DeepCopy ( )
defer func ( ) {
if ! apiequality . Semantic . DeepEqual ( oldStatus , svc . Status ) {
err = errors . Join ( err , esr . Status ( ) . Update ( ctx , svc ) )
}
} ( )
// Validate the user-created ExternalName Service and the associated ProxyGroup.
if ok , err := esr . validateClusterResources ( ctx , svc , l ) ; err != nil {
return res , fmt . Errorf ( "error validating cluster resources: %w" , err )
} else if ! ok {
return res , nil
}
if ! slices . Contains ( svc . Finalizers , FinalizerName ) {
l . Infof ( "configuring tailnet service" ) // logged exactly once
svc . Finalizers = append ( svc . Finalizers , FinalizerName )
if err := esr . Update ( ctx , svc ) ; err != nil {
err := fmt . Errorf ( "failed to add finalizer: %w" , err )
r := svcConfiguredReason ( svc , false , l )
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcConfigured , metav1 . ConditionFalse , r , err . Error ( ) , esr . clock , l )
return res , err
}
esr . mu . Lock ( )
esr . svcs . Add ( svc . UID )
gaugeEgressServices . Set ( int64 ( esr . svcs . Len ( ) ) )
esr . mu . Unlock ( )
}
if err := esr . maybeCleanupProxyGroupConfig ( ctx , svc , l ) ; err != nil {
err = fmt . Errorf ( "cleaning up resources for previous ProxyGroup failed: %w" , err )
r := svcConfiguredReason ( svc , false , l )
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcConfigured , metav1 . ConditionFalse , r , err . Error ( ) , esr . clock , l )
return res , err
}
return res , esr . maybeProvision ( ctx , svc , l )
}
func ( esr * egressSvcsReconciler ) maybeProvision ( ctx context . Context , svc * corev1 . Service , l * zap . SugaredLogger ) ( err error ) {
l . Debug ( "maybe provision" )
r := svcConfiguredReason ( svc , false , l )
st := metav1 . ConditionFalse
defer func ( ) {
msg := r
if st != metav1 . ConditionTrue && err != nil {
msg = err . Error ( )
}
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcConfigured , st , r , msg , esr . clock , l )
} ( )
crl := egressSvcChildResourceLabels ( svc )
clusterIPSvc , err := getSingleObject [ corev1 . Service ] ( ctx , esr . Client , esr . tsNamespace , crl )
if err != nil {
err = fmt . Errorf ( "error retrieving ClusterIP Service: %w" , err )
return err
}
if clusterIPSvc == nil {
clusterIPSvc = esr . clusterIPSvcForEgress ( crl )
}
upToDate := svcConfigurationUpToDate ( svc , l )
provisioned := true
if ! upToDate {
if clusterIPSvc , provisioned , err = esr . provision ( ctx , svc . Annotations [ AnnotationProxyGroup ] , svc , clusterIPSvc , l ) ; err != nil {
return err
}
}
if ! provisioned {
l . Infof ( "unable to provision cluster resources" )
return nil
}
// Update ExternalName Service to point at the ClusterIP Service.
clusterDomain := retrieveClusterDomain ( esr . tsNamespace , l )
clusterIPSvcFQDN := fmt . Sprintf ( "%s.%s.svc.%s" , clusterIPSvc . Name , clusterIPSvc . Namespace , clusterDomain )
if svc . Spec . ExternalName != clusterIPSvcFQDN {
l . Infof ( "Configuring ExternalName Service to point to ClusterIP Service %s" , clusterIPSvcFQDN )
svc . Spec . ExternalName = clusterIPSvcFQDN
if err = esr . Update ( ctx , svc ) ; err != nil {
err = fmt . Errorf ( "error updating ExternalName Service: %w" , err )
return err
}
}
r = svcConfiguredReason ( svc , true , l )
st = metav1 . ConditionTrue
return nil
}
func ( esr * egressSvcsReconciler ) provision ( ctx context . Context , proxyGroupName string , svc , clusterIPSvc * corev1 . Service , l * zap . SugaredLogger ) ( * corev1 . Service , bool , error ) {
l . Infof ( "updating configuration..." )
usedPorts , err := esr . usedPortsForPG ( ctx , proxyGroupName )
if err != nil {
return nil , false , fmt . Errorf ( "error calculating used ports for ProxyGroup %s: %w" , proxyGroupName , err )
}
oldClusterIPSvc := clusterIPSvc . DeepCopy ( )
// loop over ClusterIP Service ports, remove any that are not needed.
for i := len ( clusterIPSvc . Spec . Ports ) - 1 ; i >= 0 ; i -- {
pm := clusterIPSvc . Spec . Ports [ i ]
found := false
for _ , wantsPM := range svc . Spec . Ports {
if wantsPM . Port == pm . Port && strings . EqualFold ( string ( wantsPM . Protocol ) , string ( pm . Protocol ) ) {
found = true
break
}
}
if ! found {
l . Debugf ( "portmapping %s:%d -> %s:%d is no longer required, removing" , pm . Protocol , pm . TargetPort . IntVal , pm . Protocol , pm . Port )
clusterIPSvc . Spec . Ports = slices . Delete ( clusterIPSvc . Spec . Ports , i , i + 1 )
}
}
// loop over ExternalName Service ports, for each one not found on
// ClusterIP Service produce new target port and add a portmapping to
// the ClusterIP Service.
for _ , wantsPM := range svc . Spec . Ports {
found := false
for _ , gotPM := range clusterIPSvc . Spec . Ports {
if wantsPM . Port == gotPM . Port && strings . EqualFold ( string ( wantsPM . Protocol ) , string ( gotPM . Protocol ) ) {
found = true
break
}
}
if ! found {
// Calculate a free port to expose on container and add
// a new PortMap to the ClusterIP Service.
if usedPorts . Len ( ) == maxPorts {
// TODO(irbekrm): refactor to avoid extra reconciles here. Low priority as in practice,
// the limit should not be hit.
return nil , false , fmt . Errorf ( "unable to allocate additional ports on ProxyGroup %s, %d ports already used. Create another ProxyGroup or open an issue if you believe this is unexpected." , proxyGroupName , maxPorts )
}
p := unusedPort ( usedPorts )
l . Debugf ( "mapping tailnet target port %d to container port %d" , wantsPM . Port , p )
usedPorts . Insert ( p )
clusterIPSvc . Spec . Ports = append ( clusterIPSvc . Spec . Ports , corev1 . ServicePort {
Name : wantsPM . Name ,
Protocol : wantsPM . Protocol ,
Port : wantsPM . Port ,
TargetPort : intstr . FromInt32 ( p ) ,
} )
}
}
if ! reflect . DeepEqual ( clusterIPSvc , oldClusterIPSvc ) {
if clusterIPSvc , err = createOrUpdate ( ctx , esr . Client , esr . tsNamespace , clusterIPSvc , func ( svc * corev1 . Service ) {
svc . Labels = clusterIPSvc . Labels
svc . Spec = clusterIPSvc . Spec
} ) ; err != nil {
return nil , false , fmt . Errorf ( "error ensuring ClusterIP Service: %v" , err )
}
}
crl := egressSvcChildResourceLabels ( svc )
// TODO(irbekrm): support IPv6, but need to investigate how kube proxy
// sets up Service -> Pod routing when IPv6 is involved.
crl [ discoveryv1 . LabelServiceName ] = clusterIPSvc . Name
crl [ discoveryv1 . LabelManagedBy ] = "tailscale.com"
eps := & discoveryv1 . EndpointSlice {
ObjectMeta : metav1 . ObjectMeta {
Name : fmt . Sprintf ( "%s-ipv4" , clusterIPSvc . Name ) ,
Namespace : esr . tsNamespace ,
Labels : crl ,
} ,
AddressType : discoveryv1 . AddressTypeIPv4 ,
Ports : epsPortsFromSvc ( clusterIPSvc ) ,
}
if eps , err = createOrUpdate ( ctx , esr . Client , esr . tsNamespace , eps , func ( e * discoveryv1 . EndpointSlice ) {
e . Labels = eps . Labels
e . AddressType = eps . AddressType
e . Ports = eps . Ports
for _ , p := range e . Endpoints {
p . Conditions . Ready = nil
}
} ) ; err != nil {
return nil , false , fmt . Errorf ( "error ensuring EndpointSlice: %w" , err )
}
cm , cfgs , err := egressSvcsConfigs ( ctx , esr . Client , proxyGroupName , esr . tsNamespace )
if err != nil {
return nil , false , fmt . Errorf ( "error retrieving egress services configuration: %w" , err )
}
if cm == nil {
l . Info ( "ConfigMap not yet created, waiting.." )
return nil , false , nil
}
tailnetSvc := tailnetSvcName ( svc )
gotCfg := ( * cfgs ) [ tailnetSvc ]
wantsCfg := egressSvcCfg ( svc , clusterIPSvc )
if ! reflect . DeepEqual ( gotCfg , wantsCfg ) {
l . Debugf ( "updating egress services ConfigMap %s" , cm . Name )
mak . Set ( cfgs , tailnetSvc , wantsCfg )
bs , err := json . Marshal ( cfgs )
if err != nil {
return nil , false , fmt . Errorf ( "error marshalling egress services configs: %w" , err )
}
mak . Set ( & cm . BinaryData , egressservices . KeyEgressServices , bs )
if err := esr . Update ( ctx , cm ) ; err != nil {
return nil , false , fmt . Errorf ( "error updating egress services ConfigMap: %w" , err )
}
}
l . Infof ( "egress service configuration has been updated" )
return clusterIPSvc , true , nil
}
func ( esr * egressSvcsReconciler ) maybeCleanup ( ctx context . Context , svc * corev1 . Service , logger * zap . SugaredLogger ) error {
logger . Info ( "ensuring that resources created for egress service are deleted" )
// Delete egress service config from the ConfigMap mounted by the proxies.
if err := esr . ensureEgressSvcCfgDeleted ( ctx , svc , logger ) ; err != nil {
return fmt . Errorf ( "error deleting egress service config: %w" , err )
}
// Delete the ClusterIP Service and EndpointSlice for the egress
// service.
types := [ ] client . Object {
& corev1 . Service { } ,
& discoveryv1 . EndpointSlice { } ,
}
crl := egressSvcChildResourceLabels ( svc )
for _ , typ := range types {
if err := esr . DeleteAllOf ( ctx , typ , client . InNamespace ( esr . tsNamespace ) , client . MatchingLabels ( crl ) ) ; err != nil {
return fmt . Errorf ( "error deleting %s: %w" , typ , err )
}
}
ix := slices . Index ( svc . Finalizers , FinalizerName )
if ix != - 1 {
logger . Debug ( "Removing Tailscale finalizer from Service" )
svc . Finalizers = append ( svc . Finalizers [ : ix ] , svc . Finalizers [ ix + 1 : ] ... )
if err := esr . Update ( ctx , svc ) ; err != nil {
return fmt . Errorf ( "failed to remove finalizer: %w" , err )
}
}
esr . mu . Lock ( )
esr . svcs . Remove ( svc . UID )
gaugeEgressServices . Set ( int64 ( esr . svcs . Len ( ) ) )
esr . mu . Unlock ( )
logger . Info ( "successfully cleaned up resources for egress Service" )
return nil
}
func ( esr * egressSvcsReconciler ) maybeCleanupProxyGroupConfig ( ctx context . Context , svc * corev1 . Service , l * zap . SugaredLogger ) error {
wantsProxyGroup := svc . Annotations [ AnnotationProxyGroup ]
cond := tsoperator . GetServiceCondition ( svc , tsapi . EgressSvcConfigured )
if cond == nil {
return nil
}
ss := strings . Split ( cond . Reason , ":" )
if len ( ss ) < 3 {
return nil
}
if strings . EqualFold ( wantsProxyGroup , ss [ 2 ] ) {
return nil
}
esr . logger . Infof ( "egress Service configured on ProxyGroup %s, wants ProxyGroup %s, cleaning up..." , ss [ 2 ] , wantsProxyGroup )
if err := esr . ensureEgressSvcCfgDeleted ( ctx , svc , l ) ; err != nil {
return fmt . Errorf ( "error deleting egress service config: %w" , err )
}
return nil
}
// usedPortsForPG calculates the currently used match ports for ProxyGroup
// containers. It does that by looking by retrieving all target ports of all
// ClusterIP Services created for egress services exposed on this ProxyGroup's
// proxies.
// TODO(irbekrm): this is currently good enough because we only have a single worker and
// because these Services are created by us, so we can always expect to get the
// latest ClusterIP Services via the controller cache. It will not work as well
// once we split into multiple workers- at that point we probably want to set
// used ports on ProxyGroup's status.
func ( esr * egressSvcsReconciler ) usedPortsForPG ( ctx context . Context , pg string ) ( sets . Set [ int32 ] , error ) {
svcList := & corev1 . ServiceList { }
if err := esr . List ( ctx , svcList , client . InNamespace ( esr . tsNamespace ) , client . MatchingLabels ( map [ string ] string { labelProxyGroup : pg } ) ) ; err != nil {
return nil , fmt . Errorf ( "error listing Services: %w" , err )
}
usedPorts := sets . New [ int32 ] ( )
for _ , s := range svcList . Items {
for _ , p := range s . Spec . Ports {
usedPorts . Insert ( p . TargetPort . IntVal )
}
}
return usedPorts , nil
}
// clusterIPSvcForEgress returns a template for the ClusterIP Service created
// for an egress service exposed on ProxyGroup proxies. The ClusterIP Service
// has no selector. Traffic sent to it will be routed to the endpoints defined
// by an EndpointSlice created for this egress service.
func ( esr * egressSvcsReconciler ) clusterIPSvcForEgress ( crl map [ string ] string ) * corev1 . Service {
return & corev1 . Service {
ObjectMeta : metav1 . ObjectMeta {
2024-10-07 20:12:56 +01:00
GenerateName : svcNameBase ( crl [ LabelParentName ] ) ,
2024-10-04 13:11:35 +01:00
Namespace : esr . tsNamespace ,
Labels : crl ,
} ,
Spec : corev1 . ServiceSpec {
Type : corev1 . ServiceTypeClusterIP ,
} ,
}
}
func ( esr * egressSvcsReconciler ) ensureEgressSvcCfgDeleted ( ctx context . Context , svc * corev1 . Service , logger * zap . SugaredLogger ) error {
crl := egressSvcChildResourceLabels ( svc )
2024-10-07 20:12:56 +01:00
cmName := pgEgressCMName ( crl [ labelProxyGroup ] )
2024-10-04 13:11:35 +01:00
cm := & corev1 . ConfigMap {
ObjectMeta : metav1 . ObjectMeta {
Name : cmName ,
Namespace : esr . tsNamespace ,
} ,
}
l := logger . With ( "ConfigMap" , client . ObjectKeyFromObject ( cm ) )
l . Debug ( "ensuring that egress service configuration is removed from proxy config" )
if err := esr . Get ( ctx , client . ObjectKeyFromObject ( cm ) , cm ) ; apierrors . IsNotFound ( err ) {
l . Debugf ( "ConfigMap not found" )
return nil
} else if err != nil {
return fmt . Errorf ( "error retrieving ConfigMap: %w" , err )
}
bs := cm . BinaryData [ egressservices . KeyEgressServices ]
if len ( bs ) == 0 {
l . Debugf ( "ConfigMap does not contain egress service configs" )
return nil
}
cfgs := & egressservices . Configs { }
if err := json . Unmarshal ( bs , cfgs ) ; err != nil {
return fmt . Errorf ( "error unmarshalling egress services configs" )
}
tailnetSvc := tailnetSvcName ( svc )
_ , ok := ( * cfgs ) [ tailnetSvc ]
if ! ok {
l . Debugf ( "ConfigMap does not contain egress service config, likely because it was already deleted" )
return nil
}
l . Infof ( "before deleting config %+#v" , * cfgs )
delete ( * cfgs , tailnetSvc )
l . Infof ( "after deleting config %+#v" , * cfgs )
bs , err := json . Marshal ( cfgs )
if err != nil {
return fmt . Errorf ( "error marshalling egress services configs: %w" , err )
}
mak . Set ( & cm . BinaryData , egressservices . KeyEgressServices , bs )
return esr . Update ( ctx , cm )
}
func ( esr * egressSvcsReconciler ) validateClusterResources ( ctx context . Context , svc * corev1 . Service , l * zap . SugaredLogger ) ( bool , error ) {
proxyGroupName := svc . Annotations [ AnnotationProxyGroup ]
pg := & tsapi . ProxyGroup {
ObjectMeta : metav1 . ObjectMeta {
Name : proxyGroupName ,
} ,
}
if err := esr . Get ( ctx , client . ObjectKeyFromObject ( pg ) , pg ) ; apierrors . IsNotFound ( err ) {
l . Infof ( "ProxyGroup %q not found, waiting..." , proxyGroupName )
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcValid , metav1 . ConditionUnknown , reasonProxyGroupNotReady , reasonProxyGroupNotReady , esr . clock , l )
2024-10-07 20:12:56 +01:00
tsoperator . RemoveServiceCondition ( svc , tsapi . EgressSvcConfigured )
2024-10-04 13:11:35 +01:00
return false , nil
} else if err != nil {
err := fmt . Errorf ( "unable to retrieve ProxyGroup %s: %w" , proxyGroupName , err )
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcValid , metav1 . ConditionUnknown , reasonProxyGroupNotReady , err . Error ( ) , esr . clock , l )
2024-10-07 20:12:56 +01:00
tsoperator . RemoveServiceCondition ( svc , tsapi . EgressSvcConfigured )
2024-10-04 13:11:35 +01:00
return false , err
}
if ! tsoperator . ProxyGroupIsReady ( pg ) {
l . Infof ( "ProxyGroup %s is not ready, waiting..." , proxyGroupName )
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcValid , metav1 . ConditionUnknown , reasonProxyGroupNotReady , reasonProxyGroupNotReady , esr . clock , l )
2024-10-07 20:12:56 +01:00
tsoperator . RemoveServiceCondition ( svc , tsapi . EgressSvcConfigured )
2024-10-04 13:11:35 +01:00
return false , nil
}
if violations := validateEgressService ( svc , pg ) ; len ( violations ) > 0 {
msg := fmt . Sprintf ( "invalid egress Service: %s" , strings . Join ( violations , ", " ) )
esr . recorder . Event ( svc , corev1 . EventTypeWarning , "INVALIDSERVICE" , msg )
l . Info ( msg )
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcValid , metav1 . ConditionFalse , reasonEgressSvcInvalid , msg , esr . clock , l )
2024-10-07 20:12:56 +01:00
tsoperator . RemoveServiceCondition ( svc , tsapi . EgressSvcConfigured )
2024-10-04 13:11:35 +01:00
return false , nil
}
l . Debugf ( "egress service is valid" )
tsoperator . SetServiceCondition ( svc , tsapi . EgressSvcValid , metav1 . ConditionTrue , reasonEgressSvcValid , reasonEgressSvcValid , esr . clock , l )
return true , nil
}
func validateEgressService ( svc * corev1 . Service , pg * tsapi . ProxyGroup ) [ ] string {
violations := validateService ( svc )
// We check that only one of these two is set in the earlier validateService function.
if svc . Annotations [ AnnotationTailnetTargetFQDN ] == "" && svc . Annotations [ AnnotationTailnetTargetIP ] == "" {
violations = append ( violations , fmt . Sprintf ( "egress Service for ProxyGroup must have one of %s, %s annotations set" , AnnotationTailnetTargetFQDN , AnnotationTailnetTargetIP ) )
}
if len ( svc . Spec . Ports ) == 0 {
violations = append ( violations , "egress Service for ProxyGroup must have at least one target Port specified" )
}
if svc . Spec . Type != corev1 . ServiceTypeExternalName {
violations = append ( violations , fmt . Sprintf ( "unexpected egress Service type %s. The only supported type is ExternalName." , svc . Spec . Type ) )
}
if pg . Spec . Type != tsapi . ProxyGroupTypeEgress {
violations = append ( violations , fmt . Sprintf ( "egress Service references ProxyGroup of type %s, must be type %s" , pg . Spec . Type , tsapi . ProxyGroupTypeEgress ) )
}
return violations
}
// egressSvcNameBase returns a name base that can be passed to
// ObjectMeta.GenerateName to generate a name for the ClusterIP Service.
// The generated name needs to be short enough so that it can later be used to
// generate a valid Kubernetes resource name for the EndpointSlice in form
// 'ipv4-|ipv6-<ClusterIP Service name>.
// A valid Kubernetes resource name must not be longer than 253 chars.
func svcNameBase ( s string ) string {
// -ipv4 - ipv6
const maxClusterIPSvcNameLength = 253 - 5
base := fmt . Sprintf ( "ts-%s-" , s )
generator := names . SimpleNameGenerator
for {
generatedName := generator . GenerateName ( base )
excess := len ( generatedName ) - maxClusterIPSvcNameLength
if excess <= 0 {
return base
}
base = base [ : len ( base ) - 1 - excess ] // cut off the excess chars
base = base + "-" // re-instate the dash
}
}
// unusedPort returns a port in range [3000 - 4000). The caller must ensure that
// usedPorts does not contain all ports in range [3000 - 4000).
func unusedPort ( usedPorts sets . Set [ int32 ] ) int32 {
foundFreePort := false
var suggestPort int32
for ! foundFreePort {
suggestPort = rand . Int32N ( maxPorts ) + 3000
if ! usedPorts . Has ( suggestPort ) {
foundFreePort = true
}
}
return suggestPort
}
// tailnetTargetFromSvc returns a tailnet target for the given egress Service.
// Service must contain exactly one of tailscale.com/tailnet-ip,
// tailscale.com/tailnet-fqdn annotations.
func tailnetTargetFromSvc ( svc * corev1 . Service ) egressservices . TailnetTarget {
if fqdn := svc . Annotations [ AnnotationTailnetTargetFQDN ] ; fqdn != "" {
return egressservices . TailnetTarget {
FQDN : fqdn ,
}
}
return egressservices . TailnetTarget {
IP : svc . Annotations [ AnnotationTailnetTargetIP ] ,
}
}
func egressSvcCfg ( externalNameSvc , clusterIPSvc * corev1 . Service ) egressservices . Config {
tt := tailnetTargetFromSvc ( externalNameSvc )
cfg := egressservices . Config { TailnetTarget : tt }
for _ , svcPort := range clusterIPSvc . Spec . Ports {
pm := portMap ( svcPort )
mak . Set ( & cfg . Ports , pm , struct { } { } )
}
return cfg
}
func portMap ( p corev1 . ServicePort ) egressservices . PortMap {
// TODO (irbekrm): out of bounds check?
return egressservices . PortMap { Protocol : string ( p . Protocol ) , MatchPort : uint16 ( p . TargetPort . IntVal ) , TargetPort : uint16 ( p . Port ) }
}
func isEgressSvcForProxyGroup ( obj client . Object ) bool {
s , ok := obj . ( * corev1 . Service )
if ! ok {
return false
}
annots := s . ObjectMeta . Annotations
return annots [ AnnotationProxyGroup ] != "" && ( annots [ AnnotationTailnetTargetFQDN ] != "" || annots [ AnnotationTailnetTargetIP ] != "" )
}
// egressSvcConfig returns a ConfigMap that contains egress services configuration for the provided ProxyGroup as well
// as unmarshalled configuration from the ConfigMap.
func egressSvcsConfigs ( ctx context . Context , cl client . Client , proxyGroupName , tsNamespace string ) ( cm * corev1 . ConfigMap , cfgs * egressservices . Configs , err error ) {
2024-10-07 20:12:56 +01:00
name := pgEgressCMName ( proxyGroupName )
2024-10-04 13:11:35 +01:00
cm = & corev1 . ConfigMap {
ObjectMeta : metav1 . ObjectMeta {
2024-10-07 20:12:56 +01:00
Name : name ,
2024-10-04 13:11:35 +01:00
Namespace : tsNamespace ,
} ,
}
if err := cl . Get ( ctx , client . ObjectKeyFromObject ( cm ) , cm ) ; err != nil {
2024-10-07 20:12:56 +01:00
return nil , nil , fmt . Errorf ( "error retrieving egress services ConfigMap %s: %v" , name , err )
2024-10-04 13:11:35 +01:00
}
cfgs = & egressservices . Configs { }
if len ( cm . BinaryData [ egressservices . KeyEgressServices ] ) != 0 {
if err := json . Unmarshal ( cm . BinaryData [ egressservices . KeyEgressServices ] , cfgs ) ; err != nil {
return nil , nil , fmt . Errorf ( "error unmarshaling egress services config %v: %w" , cm . BinaryData [ egressservices . KeyEgressServices ] , err )
}
}
return cm , cfgs , nil
}
// egressSvcChildResourceLabels returns labels that should be applied to the
// ClusterIP Service and the EndpointSlice created for the egress service.
// TODO(irbekrm): we currently set a bunch of labels based on Kubernetes
// resource names (ProxyGroup, Service). Maximum allowed label length is 63
// chars whilst the maximum allowed resource name length is 253 chars, so we
// should probably validate and truncate (?) the names is they are too long.
func egressSvcChildResourceLabels ( svc * corev1 . Service ) map [ string ] string {
return map [ string ] string {
2024-10-07 20:12:56 +01:00
LabelManaged : "true" ,
LabelParentType : "svc" ,
LabelParentName : svc . Name ,
LabelParentNamespace : svc . Namespace ,
labelProxyGroup : svc . Annotations [ AnnotationProxyGroup ] ,
labelSvcType : typeEgress ,
2024-10-04 13:11:35 +01:00
}
}
func svcConfigurationUpToDate ( svc * corev1 . Service , l * zap . SugaredLogger ) bool {
cond := tsoperator . GetServiceCondition ( svc , tsapi . EgressSvcConfigured )
if cond == nil {
return false
}
if cond . Status != metav1 . ConditionTrue {
return false
}
wantsReadyReason := svcConfiguredReason ( svc , true , l )
return strings . EqualFold ( wantsReadyReason , cond . Reason )
}
func cfgHash ( c cfg , l * zap . SugaredLogger ) string {
bs , err := json . Marshal ( c )
if err != nil {
// Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace.
l . Infof ( "error marhsalling Config: %v" , err )
return ""
}
h := sha256 . New ( )
if _ , err := h . Write ( bs ) ; err != nil {
// Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace.
l . Infof ( "error producing Config hash: %v" , err )
return ""
}
return fmt . Sprintf ( "%x" , h . Sum ( nil ) )
}
type cfg struct {
Ports [ ] corev1 . ServicePort ` json:"ports" `
TailnetTarget egressservices . TailnetTarget ` json:"tailnetTarget" `
ProxyGroup string ` json:"proxyGroup" `
}
func svcConfiguredReason ( svc * corev1 . Service , configured bool , l * zap . SugaredLogger ) string {
var r string
if configured {
r = "ConfiguredFor:"
} else {
r = fmt . Sprintf ( "ConfigurationFailed:%s" , r )
}
r += fmt . Sprintf ( "ProxyGroup:%s" , svc . Annotations [ AnnotationProxyGroup ] )
tt := tailnetTargetFromSvc ( svc )
s := cfg {
Ports : svc . Spec . Ports ,
TailnetTarget : tt ,
ProxyGroup : svc . Annotations [ AnnotationProxyGroup ] ,
}
r += fmt . Sprintf ( ":Config:%s" , cfgHash ( s , l ) )
return r
}
// tailnetSvc accepts and ExternalName Service name and returns a name that will be used to distinguish this tailnet
// service from other tailnet services exposed to cluster workloads.
func tailnetSvcName ( extNSvc * corev1 . Service ) string {
return fmt . Sprintf ( "%s-%s" , extNSvc . Namespace , extNSvc . Name )
}
// epsPortsFromSvc takes the ClusterIP Service created for an egress service and
// returns its Port array in a form that can be used for an EndpointSlice.
func epsPortsFromSvc ( svc * corev1 . Service ) ( ep [ ] discoveryv1 . EndpointPort ) {
for _ , p := range svc . Spec . Ports {
ep = append ( ep , discoveryv1 . EndpointPort {
Protocol : & p . Protocol ,
Port : & p . TargetPort . IntVal ,
Name : & p . Name ,
} )
}
return ep
}