diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go index 936a29325..77a644e67 100644 --- a/cmd/k8s-operator/proxygroup.go +++ b/cmd/k8s-operator/proxygroup.go @@ -11,7 +11,10 @@ import ( "encoding/json" "fmt" "net/http" + "net/netip" "slices" + "sort" + "strconv" "strings" "sync" @@ -25,6 +28,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -43,11 +47,15 @@ import ( ) const ( + nodePortType = "NodePort" + directConnPortName = "direct-connection-proxy" + directConnProxyPort = 30052 reasonProxyGroupCreationFailed = "ProxyGroupCreationFailed" reasonProxyGroupReady = "ProxyGroupReady" reasonProxyGroupCreating = "ProxyGroupCreating" reasonProxyGroupInvalid = "ProxyGroupInvalid" + statefulSetPodNameSelector = "statefulset.kubernetes.io/pod-name" // Copied from k8s.io/apiserver/pkg/registry/generic/registry/store.go@cccad306d649184bf2a0e319ba830c53f65c445c optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again" ) @@ -206,20 +214,191 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ return setStatusReady(pg, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady) } -func (r *ProxyGroupReconciler) maybeExposeViaNodePort(ctx context.Context, pc *tsapi.ProxyClass, pg *tsapi.ProxyGroup) error { - // TODO: make NodePort a const - if pc == nil || pc.Spec.TailnetListenerConfig == nil || pc.Spec.TailnetListenerConfig.Type != "NodePort" { - return nil +func allocatePorts(pg *tsapi.ProxyGroup, pr []string, ports map[string]int32) error { + ranges, err := validatePortRanges(pr) + if err != nil { + return fmt.Errorf("configured port ranges invalid: %w", err) } - // 1. Create a NodePort Service per each replica - // TODO: support setting NodePort range - for i := range *(pg.Spec.Replicas) { + i := 0 + replicaCount := int(*pg.Spec.Replicas) + for _, r := range ranges { + for p := r.Start; p <= r.End && len(ports) < replicaCount; p++ { + ports[fmt.Sprintf("%s-%d", pg.Name, i)] = int32(p) + i++ + } + if i-1 >= replicaCount { + break + } + } + + if len(ports) < replicaCount { + return fmt.Errorf("not enough ports in configured ranges: needed %d, found %d", replicaCount, len(ports)) } return nil } +func validateRange(s int, e int) error { + if s < 0 || s > 65535 { + return fmt.Errorf("invalid port value: %q", s) + } + if e < 0 || e > 65535 { + return fmt.Errorf("invalid port value: %q", e) + } + if s > e { + return fmt.Errorf("invalid port range: '%d-%d'", s, e) + } + + return nil +} + +type portRange struct { + Start int + End int + String string +} + +func validatePortRanges(pr []string) ([]portRange, error) { + ranges := []portRange{} + for _, p := range pr { + parts := strings.Split(p, "-") + switch len(parts) { + case 1: + s, err := strconv.Atoi(parts[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse port range %q: %w", p, err) + } + e := s + + err = validateRange(s, e) + if err != nil { + return nil, err + } + + ranges = append(ranges, portRange{Start: s, End: e, String: p}) + case 2: + s, err := strconv.Atoi(parts[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse port range %q: %w", p, err) + } + e, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, fmt.Errorf("failed to parse port range %q: %w", p, err) + } + + err = validateRange(s, e) + if err != nil { + return nil, err + } + + ranges = append(ranges, portRange{Start: s, End: e, String: p}) + default: + return nil, fmt.Errorf("failed to parse port range %q", p) + } + } + + if len(ranges) < 2 { + return ranges, nil + } + + sort.Slice(ranges, func(i, j int) bool { + return ranges[i].Start < ranges[j].Start + }) + + for i := 1; i < len(ranges); i++ { + prev := ranges[i-1] + curr := ranges[i] + if curr.Start <= prev.End { + return nil, fmt.Errorf("overlapping ranges: %q and %q", prev.String, curr.String) + } + } + + return ranges, nil +} + +func (r *ProxyGroupReconciler) maybeExposeViaNodePort(ctx context.Context, pc *tsapi.ProxyClass, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) (map[string]int32, error) { + if pc == nil || pc.Spec.TailnetListenerConfig == nil || pc.Spec.TailnetListenerConfig.Type != nodePortType { + return nil, nil + } + + ports := make(map[string]int32) + pr := pc.Spec.TailnetListenerConfig.NodePortConfig.PortRanges + if len(pr) == 0 { + logger.Infof("no port ranges specified in ProxyClass config, leaving NodePort unspecified") + } else { + err := allocatePorts(pg, pr, ports) + if err != nil { + return nil, fmt.Errorf("failed to allocate NodePorts to ProxyGroup Services: %w", err) + } + } + + for i := range *(pg.Spec.Replicas) { + replicaName := fmt.Sprintf("%s-%d", pg.Name, i) + port, ok := ports[replicaName] + if !ok { + // NOTE: if port ranges have not been configured we want to leave Kubernetes to set the NodePort + port = 0 + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: replicaName, + Namespace: r.tsNamespace, + Labels: pgLabels(pg.Name, nil), + OwnerReferences: pgOwnerReference(pg), + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Ports: []corev1.ServicePort{ + { + Name: directConnPortName, + Port: int32(directConnProxyPort), + Protocol: corev1.ProtocolUDP, + NodePort: int32(port), + TargetPort: intstr.FromInt(directConnProxyPort), + }, + }, + Selector: map[string]string{ + statefulSetPodNameSelector: replicaName, + }, + }, + } + + createOrUpdate(ctx, r.Client, r.tsNamespace, svc, func(s *corev1.Service) { + s.ObjectMeta.Labels = svc.ObjectMeta.Labels + s.ObjectMeta.Annotations = svc.ObjectMeta.Annotations + s.ObjectMeta.OwnerReferences = svc.ObjectMeta.OwnerReferences + s.Spec.Selector = svc.Spec.Selector + if port != 0 { + s.Spec.Ports = svc.Spec.Ports + } + }) + + if port == 0 { + if err := r.Get(ctx, client.ObjectKeyFromObject(svc), svc); err != nil && !apierrors.IsNotFound(err) { + return nil, fmt.Errorf("error retrieving Kubernetes NodePort Service %s: %w", svc.Name, err) + } + + for _, p := range svc.Spec.Ports { + if p.Name == directConnPortName { + port = p.NodePort + ports[replicaName] = port + } + } + + if port == 0 { + logger.Warn("ProxyGroup %q replica %q NodePort not configured") + return nil, nil + } + + logger.Info("ProxyGroup %q replica %q exposed on NodePort %q. Please ensure the appropriate firewall rules are configured to expose it on the desired network.", pg.Name, svc.Name, port) + } + } + + return ports, nil +} + // validateProxyClassForPG applies custom validation logic for ProxyClass applied to ProxyGroup. func validateProxyClassForPG(logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) { if pg.Spec.Type == tsapi.ProxyGroupTypeIngress { @@ -251,7 +430,12 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro r.ensureAddedToGaugeForProxyGroup(pg) r.mu.Unlock() - cfgHash, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass) + ports, err := r.maybeExposeViaNodePort(ctx, proxyClass, pg, logger) + if err != nil { + return fmt.Errorf("error getting device info: %w", err) + } + + cfgHash, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass, ports) if err != nil { return fmt.Errorf("error provisioning config Secrets: %w", err) } @@ -326,7 +510,6 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro } updateSS := func(s *appsv1.StatefulSet) { - // This is a temporary workaround to ensure that egress ProxyGroup proxies with capver older than 110 // are restarted when tailscaled configfile contents have changed. // This workaround ensures that: @@ -435,7 +618,8 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, pg *tsapi.Proxy mo := &metricsOpts{ proxyLabels: pgLabels(pg.Name, nil), tsNamespace: r.tsNamespace, - proxyType: "proxygroup"} + proxyType: "proxygroup", + } if err := maybeCleanupMetricsResources(ctx, mo, r.Client); err != nil { return false, fmt.Errorf("error cleaning up metrics resources: %w", err) } @@ -463,7 +647,7 @@ func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, id tailc return nil } -func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (hash string, err error) { +func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass, ports map[string]int32) (hash string, err error) { logger := r.logger(pg.Name) var configSHA256Sum string for i := range pgReplicas(pg) { @@ -497,7 +681,25 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p } } - configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret) + endpoints := []netip.AddrPort{} + if proxyClass != nil && proxyClass.Spec.TailnetListenerConfig.Type == nodePortType { + replicaName := fmt.Sprintf("%s-%d", pg.Name, i) + port, ok := ports[replicaName] + if !ok { + err := fmt.Errorf("could not find configured NodePort for ProxyGroup replica %q", replicaName) + logger.Warn(err.Error()) + return "", err + } + + err := r.findStaticEndpoints(ctx, port, endpoints, proxyClass, logger) + if err != nil { + err := fmt.Errorf("could not find static endpoints for replica %q: %w", replicaName, err) + logger.Warn(err.Error()) + return "", err + } + } + + configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret, endpoints) if err != nil { return "", fmt.Errorf("error creating tailscaled config: %w", err) } @@ -554,6 +756,36 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p return configSHA256Sum, nil } +func (r *ProxyGroupReconciler) findStaticEndpoints(ctx context.Context, port int32, endpoints []netip.AddrPort, proxyClass *tsapi.ProxyClass, logger *zap.SugaredLogger) error { + nodes := new(corev1.NodeList) + err := r.List(ctx, nodes, client.MatchingLabels(proxyClass.Spec.TailnetListenerConfig.NodePortConfig.Selector)) + if err != nil { + return fmt.Errorf("failed to list nodes: %w", err) + } + + if len(nodes.Items) == 0 { + err := fmt.Errorf("failed to match nodes to configured NodeSelectors in TailnetListenerConfig") + logger.Warn(err.Error()) + return err + } + + for _, n := range nodes.Items { + for _, a := range n.Status.Addresses { + if a.Type == corev1.NodeExternalIP { + addrPort := fmt.Sprintf("%s:%d", a.Address, port) + i, err := netip.ParseAddrPort(addrPort) + if err != nil { + logger.Debugf("failed to parse external address on node %q: %q", n.Name, addrPort) + } + logger.Debugf("adding endpoint %q to staticEndpoints config", addrPort) + endpoints = append(endpoints, i) + } + } + } + + return nil +} + // ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup // is created. r.mu must be held. func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) { @@ -580,7 +812,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len())) } -func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) { +func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret, staticEndpoints []netip.AddrPort) (tailscaledConfigs, error) { conf := &ipn.ConfigVAlpha{ Version: "alpha0", AcceptDNS: "false", @@ -597,6 +829,10 @@ func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32 conf.AcceptRoutes = "true" } + if len(staticEndpoints) > 0 { + conf.StaticEndpoints = staticEndpoints + } + deviceAuthed := false for _, d := range pg.Status.Devices { if d.Hostname == *conf.Hostname { diff --git a/cmd/k8s-operator/proxygroup_specs.go b/cmd/k8s-operator/proxygroup_specs.go index 1d12c39e0..ee53b0e91 100644 --- a/cmd/k8s-operator/proxygroup_specs.go +++ b/cmd/k8s-operator/proxygroup_specs.go @@ -144,6 +144,13 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string }, } + if len(proxyClass.Spec.TailnetListenerConfig.NodePortConfig.PortRanges) > 0 { + envs = append(envs, corev1.EnvVar{ + Name: "PORT", + Value: strconv.Itoa(directConnProxyPort), + }) + } + if tsFirewallMode != "" { envs = append(envs, corev1.EnvVar{ Name: "TS_DEBUG_FIREWALL_MODE", diff --git a/k8s-operator/apis/v1alpha1/types_proxyclass.go b/k8s-operator/apis/v1alpha1/types_proxyclass.go index 3b6bf8376..f1e850a37 100644 --- a/k8s-operator/apis/v1alpha1/types_proxyclass.go +++ b/k8s-operator/apis/v1alpha1/types_proxyclass.go @@ -92,6 +92,7 @@ type TailnetListenerConfig struct { } type TailnetListenerConfigMode string + type NodePort struct { PortRanges []string `json:"portRanges,omitempty"` Selector map[string]string `json:"selector,omitempty"`