cmd/k8s-operator, k8s-operator: support direct connections on ProxyGroups

updates: #14674

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
chaosinthecrd 2025-05-22 16:22:48 +01:00
parent 95dcd133dd
commit 05ecda9855
No known key found for this signature in database
GPG Key ID: 87942E75F71EF65D
3 changed files with 257 additions and 13 deletions

View File

@ -11,7 +11,10 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/netip"
"slices"
"sort"
"strconv"
"strings"
"sync"
@ -25,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@ -43,11 +47,15 @@ import (
)
const (
nodePortType = "NodePort"
directConnPortName = "direct-connection-proxy"
directConnProxyPort = 30052
reasonProxyGroupCreationFailed = "ProxyGroupCreationFailed"
reasonProxyGroupReady = "ProxyGroupReady"
reasonProxyGroupCreating = "ProxyGroupCreating"
reasonProxyGroupInvalid = "ProxyGroupInvalid"
statefulSetPodNameSelector = "statefulset.kubernetes.io/pod-name"
// Copied from k8s.io/apiserver/pkg/registry/generic/registry/store.go@cccad306d649184bf2a0e319ba830c53f65c445c
optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
)
@ -206,18 +214,189 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
return setStatusReady(pg, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady)
}
func (r *ProxyGroupReconciler) maybeExposeViaNodePort(ctx context.Context, pc *tsapi.ProxyClass, pg *tsapi.ProxyGroup) error {
// TODO: make NodePort a const
if pc == nil || pc.Spec.TailnetListenerConfig == nil || pc.Spec.TailnetListenerConfig.Type != "NodePort" {
func allocatePorts(pg *tsapi.ProxyGroup, pr []string, ports map[string]int32) error {
ranges, err := validatePortRanges(pr)
if err != nil {
return fmt.Errorf("configured port ranges invalid: %w", err)
}
i := 0
replicaCount := int(*pg.Spec.Replicas)
for _, r := range ranges {
for p := r.Start; p <= r.End && len(ports) < replicaCount; p++ {
ports[fmt.Sprintf("%s-%d", pg.Name, i)] = int32(p)
i++
}
if i-1 >= replicaCount {
break
}
}
if len(ports) < replicaCount {
return fmt.Errorf("not enough ports in configured ranges: needed %d, found %d", replicaCount, len(ports))
}
return nil
}
// 1. Create a NodePort Service per each replica
// TODO: support setting NodePort range
func validateRange(s int, e int) error {
if s < 0 || s > 65535 {
return fmt.Errorf("invalid port value: %q", s)
}
if e < 0 || e > 65535 {
return fmt.Errorf("invalid port value: %q", e)
}
if s > e {
return fmt.Errorf("invalid port range: '%d-%d'", s, e)
}
return nil
}
type portRange struct {
Start int
End int
String string
}
func validatePortRanges(pr []string) ([]portRange, error) {
ranges := []portRange{}
for _, p := range pr {
parts := strings.Split(p, "-")
switch len(parts) {
case 1:
s, err := strconv.Atoi(parts[0])
if err != nil {
return nil, fmt.Errorf("failed to parse port range %q: %w", p, err)
}
e := s
err = validateRange(s, e)
if err != nil {
return nil, err
}
ranges = append(ranges, portRange{Start: s, End: e, String: p})
case 2:
s, err := strconv.Atoi(parts[0])
if err != nil {
return nil, fmt.Errorf("failed to parse port range %q: %w", p, err)
}
e, err := strconv.Atoi(parts[1])
if err != nil {
return nil, fmt.Errorf("failed to parse port range %q: %w", p, err)
}
err = validateRange(s, e)
if err != nil {
return nil, err
}
ranges = append(ranges, portRange{Start: s, End: e, String: p})
default:
return nil, fmt.Errorf("failed to parse port range %q", p)
}
}
if len(ranges) < 2 {
return ranges, nil
}
sort.Slice(ranges, func(i, j int) bool {
return ranges[i].Start < ranges[j].Start
})
for i := 1; i < len(ranges); i++ {
prev := ranges[i-1]
curr := ranges[i]
if curr.Start <= prev.End {
return nil, fmt.Errorf("overlapping ranges: %q and %q", prev.String, curr.String)
}
}
return ranges, nil
}
func (r *ProxyGroupReconciler) maybeExposeViaNodePort(ctx context.Context, pc *tsapi.ProxyClass, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) (map[string]int32, error) {
if pc == nil || pc.Spec.TailnetListenerConfig == nil || pc.Spec.TailnetListenerConfig.Type != nodePortType {
return nil, nil
}
ports := make(map[string]int32)
pr := pc.Spec.TailnetListenerConfig.NodePortConfig.PortRanges
if len(pr) == 0 {
logger.Infof("no port ranges specified in ProxyClass config, leaving NodePort unspecified")
} else {
err := allocatePorts(pg, pr, ports)
if err != nil {
return nil, fmt.Errorf("failed to allocate NodePorts to ProxyGroup Services: %w", err)
}
}
for i := range *(pg.Spec.Replicas) {
replicaName := fmt.Sprintf("%s-%d", pg.Name, i)
port, ok := ports[replicaName]
if !ok {
// NOTE: if port ranges have not been configured we want to leave Kubernetes to set the NodePort
port = 0
}
return nil
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: replicaName,
Namespace: r.tsNamespace,
Labels: pgLabels(pg.Name, nil),
OwnerReferences: pgOwnerReference(pg),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Ports: []corev1.ServicePort{
{
Name: directConnPortName,
Port: int32(directConnProxyPort),
Protocol: corev1.ProtocolUDP,
NodePort: int32(port),
TargetPort: intstr.FromInt(directConnProxyPort),
},
},
Selector: map[string]string{
statefulSetPodNameSelector: replicaName,
},
},
}
createOrUpdate(ctx, r.Client, r.tsNamespace, svc, func(s *corev1.Service) {
s.ObjectMeta.Labels = svc.ObjectMeta.Labels
s.ObjectMeta.Annotations = svc.ObjectMeta.Annotations
s.ObjectMeta.OwnerReferences = svc.ObjectMeta.OwnerReferences
s.Spec.Selector = svc.Spec.Selector
if port != 0 {
s.Spec.Ports = svc.Spec.Ports
}
})
if port == 0 {
if err := r.Get(ctx, client.ObjectKeyFromObject(svc), svc); err != nil && !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving Kubernetes NodePort Service %s: %w", svc.Name, err)
}
for _, p := range svc.Spec.Ports {
if p.Name == directConnPortName {
port = p.NodePort
ports[replicaName] = port
}
}
if port == 0 {
logger.Warn("ProxyGroup %q replica %q NodePort not configured")
return nil, nil
}
logger.Info("ProxyGroup %q replica %q exposed on NodePort %q. Please ensure the appropriate firewall rules are configured to expose it on the desired network.", pg.Name, svc.Name, port)
}
}
return ports, nil
}
// validateProxyClassForPG applies custom validation logic for ProxyClass applied to ProxyGroup.
@ -251,7 +430,12 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
r.ensureAddedToGaugeForProxyGroup(pg)
r.mu.Unlock()
cfgHash, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass)
ports, err := r.maybeExposeViaNodePort(ctx, proxyClass, pg, logger)
if err != nil {
return fmt.Errorf("error getting device info: %w", err)
}
cfgHash, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass, ports)
if err != nil {
return fmt.Errorf("error provisioning config Secrets: %w", err)
}
@ -326,7 +510,6 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
}
updateSS := func(s *appsv1.StatefulSet) {
// This is a temporary workaround to ensure that egress ProxyGroup proxies with capver older than 110
// are restarted when tailscaled configfile contents have changed.
// This workaround ensures that:
@ -435,7 +618,8 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, pg *tsapi.Proxy
mo := &metricsOpts{
proxyLabels: pgLabels(pg.Name, nil),
tsNamespace: r.tsNamespace,
proxyType: "proxygroup"}
proxyType: "proxygroup",
}
if err := maybeCleanupMetricsResources(ctx, mo, r.Client); err != nil {
return false, fmt.Errorf("error cleaning up metrics resources: %w", err)
}
@ -463,7 +647,7 @@ func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, id tailc
return nil
}
func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (hash string, err error) {
func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass, ports map[string]int32) (hash string, err error) {
logger := r.logger(pg.Name)
var configSHA256Sum string
for i := range pgReplicas(pg) {
@ -497,7 +681,25 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
}
}
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret)
endpoints := []netip.AddrPort{}
if proxyClass != nil && proxyClass.Spec.TailnetListenerConfig.Type == nodePortType {
replicaName := fmt.Sprintf("%s-%d", pg.Name, i)
port, ok := ports[replicaName]
if !ok {
err := fmt.Errorf("could not find configured NodePort for ProxyGroup replica %q", replicaName)
logger.Warn(err.Error())
return "", err
}
err := r.findStaticEndpoints(ctx, port, endpoints, proxyClass, logger)
if err != nil {
err := fmt.Errorf("could not find static endpoints for replica %q: %w", replicaName, err)
logger.Warn(err.Error())
return "", err
}
}
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret, endpoints)
if err != nil {
return "", fmt.Errorf("error creating tailscaled config: %w", err)
}
@ -554,6 +756,36 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
return configSHA256Sum, nil
}
func (r *ProxyGroupReconciler) findStaticEndpoints(ctx context.Context, port int32, endpoints []netip.AddrPort, proxyClass *tsapi.ProxyClass, logger *zap.SugaredLogger) error {
nodes := new(corev1.NodeList)
err := r.List(ctx, nodes, client.MatchingLabels(proxyClass.Spec.TailnetListenerConfig.NodePortConfig.Selector))
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
if len(nodes.Items) == 0 {
err := fmt.Errorf("failed to match nodes to configured NodeSelectors in TailnetListenerConfig")
logger.Warn(err.Error())
return err
}
for _, n := range nodes.Items {
for _, a := range n.Status.Addresses {
if a.Type == corev1.NodeExternalIP {
addrPort := fmt.Sprintf("%s:%d", a.Address, port)
i, err := netip.ParseAddrPort(addrPort)
if err != nil {
logger.Debugf("failed to parse external address on node %q: %q", n.Name, addrPort)
}
logger.Debugf("adding endpoint %q to staticEndpoints config", addrPort)
endpoints = append(endpoints, i)
}
}
}
return nil
}
// ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
// is created. r.mu must be held.
func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
@ -580,7 +812,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
}
func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) {
func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret, staticEndpoints []netip.AddrPort) (tailscaledConfigs, error) {
conf := &ipn.ConfigVAlpha{
Version: "alpha0",
AcceptDNS: "false",
@ -597,6 +829,10 @@ func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32
conf.AcceptRoutes = "true"
}
if len(staticEndpoints) > 0 {
conf.StaticEndpoints = staticEndpoints
}
deviceAuthed := false
for _, d := range pg.Status.Devices {
if d.Hostname == *conf.Hostname {

View File

@ -144,6 +144,13 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
},
}
if len(proxyClass.Spec.TailnetListenerConfig.NodePortConfig.PortRanges) > 0 {
envs = append(envs, corev1.EnvVar{
Name: "PORT",
Value: strconv.Itoa(directConnProxyPort),
})
}
if tsFirewallMode != "" {
envs = append(envs, corev1.EnvVar{
Name: "TS_DEBUG_FIREWALL_MODE",

View File

@ -92,6 +92,7 @@ type TailnetListenerConfig struct {
}
type TailnetListenerConfigMode string
type NodePort struct {
PortRanges []string `json:"portRanges,omitempty"`
Selector map[string]string `json:"selector,omitempty"`