mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-24 01:26:39 +00:00
cmd/k8s-operator, k8s-operator: support Static Endpoints on ProxyGroups (#16115)
updates: #14674 Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -24,6 +25,7 @@ import (
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
@@ -48,7 +50,8 @@ const (
|
||||
reasonProxyGroupInvalid = "ProxyGroupInvalid"
|
||||
|
||||
// Copied from k8s.io/apiserver/pkg/registry/generic/registry/store.go@cccad306d649184bf2a0e319ba830c53f65c445c
|
||||
optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
|
||||
optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
|
||||
staticEndpointsMaxAddrs = 2
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -174,7 +177,8 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
|
||||
}
|
||||
}
|
||||
|
||||
if err = r.maybeProvision(ctx, pg, proxyClass); err != nil {
|
||||
isProvisioned, err := r.maybeProvision(ctx, pg, proxyClass)
|
||||
if err != nil {
|
||||
reason := reasonProxyGroupCreationFailed
|
||||
msg := fmt.Sprintf("error provisioning ProxyGroup resources: %s", err)
|
||||
if strings.Contains(err.Error(), optimisticLockErrorMsg) {
|
||||
@@ -185,9 +189,20 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
|
||||
} else {
|
||||
r.recorder.Eventf(pg, corev1.EventTypeWarning, reason, msg)
|
||||
}
|
||||
|
||||
return setStatusReady(pg, metav1.ConditionFalse, reason, msg)
|
||||
}
|
||||
|
||||
if !isProvisioned {
|
||||
if !apiequality.Semantic.DeepEqual(oldPGStatus, &pg.Status) {
|
||||
// An error encountered here should get returned by the Reconcile function.
|
||||
if updateErr := r.Client.Status().Update(ctx, pg); updateErr != nil {
|
||||
return reconcile.Result{}, errors.Join(err, updateErr)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
desiredReplicas := int(pgReplicas(pg))
|
||||
if len(pg.Status.Devices) < desiredReplicas {
|
||||
message := fmt.Sprintf("%d/%d ProxyGroup pods running", len(pg.Status.Devices), desiredReplicas)
|
||||
@@ -230,15 +245,42 @@ func validateProxyClassForPG(logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, pc
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) error {
|
||||
func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (isProvisioned bool, err error) {
|
||||
logger := r.logger(pg.Name)
|
||||
r.mu.Lock()
|
||||
r.ensureAddedToGaugeForProxyGroup(pg)
|
||||
r.mu.Unlock()
|
||||
|
||||
if err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass); err != nil {
|
||||
return fmt.Errorf("error provisioning config Secrets: %w", err)
|
||||
svcToNodePorts := make(map[string]uint16)
|
||||
var tailscaledPort *uint16
|
||||
if proxyClass != nil && proxyClass.Spec.StaticEndpoints != nil {
|
||||
svcToNodePorts, tailscaledPort, err = r.ensureNodePortServiceCreated(ctx, pg, proxyClass)
|
||||
if err != nil {
|
||||
wrappedErr := fmt.Errorf("error provisioning NodePort Services for static endpoints: %w", err)
|
||||
var allocatePortErr *allocatePortsErr
|
||||
if errors.As(err, &allocatePortErr) {
|
||||
reason := reasonProxyGroupCreationFailed
|
||||
msg := fmt.Sprintf("error provisioning ProxyGroup resources: %s", wrappedErr)
|
||||
r.setStatusReady(pg, metav1.ConditionFalse, reason, msg, logger)
|
||||
return false, nil
|
||||
}
|
||||
return false, wrappedErr
|
||||
}
|
||||
}
|
||||
|
||||
staticEndpoints, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass, svcToNodePorts)
|
||||
if err != nil {
|
||||
wrappedErr := fmt.Errorf("error provisioning config Secrets: %w", err)
|
||||
var selectorErr *FindStaticEndpointErr
|
||||
if errors.As(err, &selectorErr) {
|
||||
reason := reasonProxyGroupCreationFailed
|
||||
msg := fmt.Sprintf("error provisioning ProxyGroup resources: %s", wrappedErr)
|
||||
r.setStatusReady(pg, metav1.ConditionFalse, reason, msg, logger)
|
||||
return false, nil
|
||||
}
|
||||
return false, wrappedErr
|
||||
}
|
||||
|
||||
// State secrets are precreated so we can use the ProxyGroup CR as their owner ref.
|
||||
stateSecrets := pgStateSecrets(pg, r.tsNamespace)
|
||||
for _, sec := range stateSecrets {
|
||||
@@ -247,7 +289,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
s.ObjectMeta.Annotations = sec.ObjectMeta.Annotations
|
||||
s.ObjectMeta.OwnerReferences = sec.ObjectMeta.OwnerReferences
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error provisioning state Secrets: %w", err)
|
||||
return false, fmt.Errorf("error provisioning state Secrets: %w", err)
|
||||
}
|
||||
}
|
||||
sa := pgServiceAccount(pg, r.tsNamespace)
|
||||
@@ -256,7 +298,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
s.ObjectMeta.Annotations = sa.ObjectMeta.Annotations
|
||||
s.ObjectMeta.OwnerReferences = sa.ObjectMeta.OwnerReferences
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error provisioning ServiceAccount: %w", err)
|
||||
return false, fmt.Errorf("error provisioning ServiceAccount: %w", err)
|
||||
}
|
||||
role := pgRole(pg, r.tsNamespace)
|
||||
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, role, func(r *rbacv1.Role) {
|
||||
@@ -265,7 +307,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
r.ObjectMeta.OwnerReferences = role.ObjectMeta.OwnerReferences
|
||||
r.Rules = role.Rules
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error provisioning Role: %w", err)
|
||||
return false, fmt.Errorf("error provisioning Role: %w", err)
|
||||
}
|
||||
roleBinding := pgRoleBinding(pg, r.tsNamespace)
|
||||
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, roleBinding, func(r *rbacv1.RoleBinding) {
|
||||
@@ -275,7 +317,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
r.RoleRef = roleBinding.RoleRef
|
||||
r.Subjects = roleBinding.Subjects
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error provisioning RoleBinding: %w", err)
|
||||
return false, fmt.Errorf("error provisioning RoleBinding: %w", err)
|
||||
}
|
||||
if pg.Spec.Type == tsapi.ProxyGroupTypeEgress {
|
||||
cm, hp := pgEgressCM(pg, r.tsNamespace)
|
||||
@@ -284,7 +326,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences
|
||||
mak.Set(&existing.BinaryData, egressservices.KeyHEPPings, hp)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error provisioning egress ConfigMap %q: %w", cm.Name, err)
|
||||
return false, fmt.Errorf("error provisioning egress ConfigMap %q: %w", cm.Name, err)
|
||||
}
|
||||
}
|
||||
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
|
||||
@@ -293,12 +335,12 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
existing.ObjectMeta.Labels = cm.ObjectMeta.Labels
|
||||
existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error provisioning ingress ConfigMap %q: %w", cm.Name, err)
|
||||
return false, fmt.Errorf("error provisioning ingress ConfigMap %q: %w", cm.Name, err)
|
||||
}
|
||||
}
|
||||
ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode, proxyClass)
|
||||
ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode, tailscaledPort, proxyClass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error generating StatefulSet spec: %w", err)
|
||||
return false, fmt.Errorf("error generating StatefulSet spec: %w", err)
|
||||
}
|
||||
cfg := &tailscaleSTSConfig{
|
||||
proxyType: string(pg.Spec.Type),
|
||||
@@ -306,7 +348,6 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
ss = applyProxyClassToStatefulSet(proxyClass, ss, cfg, logger)
|
||||
|
||||
updateSS := func(s *appsv1.StatefulSet) {
|
||||
|
||||
s.Spec = ss.Spec
|
||||
|
||||
s.ObjectMeta.Labels = ss.ObjectMeta.Labels
|
||||
@@ -314,7 +355,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
s.ObjectMeta.OwnerReferences = ss.ObjectMeta.OwnerReferences
|
||||
}
|
||||
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, ss, updateSS); err != nil {
|
||||
return fmt.Errorf("error provisioning StatefulSet: %w", err)
|
||||
return false, fmt.Errorf("error provisioning StatefulSet: %w", err)
|
||||
}
|
||||
mo := &metricsOpts{
|
||||
tsNamespace: r.tsNamespace,
|
||||
@@ -323,26 +364,150 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
proxyType: "proxygroup",
|
||||
}
|
||||
if err := reconcileMetricsResources(ctx, logger, mo, proxyClass, r.Client); err != nil {
|
||||
return fmt.Errorf("error reconciling metrics resources: %w", err)
|
||||
return false, fmt.Errorf("error reconciling metrics resources: %w", err)
|
||||
}
|
||||
|
||||
if err := r.cleanupDanglingResources(ctx, pg); err != nil {
|
||||
return fmt.Errorf("error cleaning up dangling resources: %w", err)
|
||||
if err := r.cleanupDanglingResources(ctx, pg, proxyClass); err != nil {
|
||||
return false, fmt.Errorf("error cleaning up dangling resources: %w", err)
|
||||
}
|
||||
|
||||
devices, err := r.getDeviceInfo(ctx, pg)
|
||||
devices, err := r.getDeviceInfo(ctx, staticEndpoints, pg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get device info: %w", err)
|
||||
return false, fmt.Errorf("failed to get device info: %w", err)
|
||||
}
|
||||
|
||||
pg.Status.Devices = devices
|
||||
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// getServicePortsForProxyGroups returns a map of ProxyGroup Service names to their NodePorts,
|
||||
// and a set of all allocated NodePorts for quick occupancy checking.
|
||||
func getServicePortsForProxyGroups(ctx context.Context, c client.Client, namespace string, portRanges tsapi.PortRanges) (map[string]uint16, set.Set[uint16], error) {
|
||||
svcs := new(corev1.ServiceList)
|
||||
matchingLabels := client.MatchingLabels(map[string]string{
|
||||
LabelParentType: "proxygroup",
|
||||
})
|
||||
|
||||
err := c.List(ctx, svcs, matchingLabels, client.InNamespace(namespace))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to list ProxyGroup Services: %w", err)
|
||||
}
|
||||
|
||||
svcToNodePorts := map[string]uint16{}
|
||||
usedPorts := set.Set[uint16]{}
|
||||
for _, svc := range svcs.Items {
|
||||
if len(svc.Spec.Ports) == 1 && svc.Spec.Ports[0].NodePort != 0 {
|
||||
p := uint16(svc.Spec.Ports[0].NodePort)
|
||||
if portRanges.Contains(p) {
|
||||
svcToNodePorts[svc.Name] = p
|
||||
usedPorts.Add(p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return svcToNodePorts, usedPorts, nil
|
||||
}
|
||||
|
||||
type allocatePortsErr struct {
|
||||
msg string
|
||||
}
|
||||
|
||||
func (e *allocatePortsErr) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) allocatePorts(ctx context.Context, pg *tsapi.ProxyGroup, proxyClassName string, portRanges tsapi.PortRanges) (map[string]uint16, error) {
|
||||
replicaCount := int(pgReplicas(pg))
|
||||
svcToNodePorts, usedPorts, err := getServicePortsForProxyGroups(ctx, r.Client, r.tsNamespace, portRanges)
|
||||
if err != nil {
|
||||
return nil, &allocatePortsErr{msg: fmt.Sprintf("failed to find ports for existing ProxyGroup NodePort Services: %s", err.Error())}
|
||||
}
|
||||
|
||||
replicasAllocated := 0
|
||||
for i := range pgReplicas(pg) {
|
||||
if _, ok := svcToNodePorts[pgNodePortServiceName(pg.Name, i)]; !ok {
|
||||
svcToNodePorts[pgNodePortServiceName(pg.Name, i)] = 0
|
||||
} else {
|
||||
replicasAllocated++
|
||||
}
|
||||
}
|
||||
|
||||
for replica, port := range svcToNodePorts {
|
||||
if port == 0 {
|
||||
for p := range portRanges.All() {
|
||||
if !usedPorts.Contains(p) {
|
||||
svcToNodePorts[replica] = p
|
||||
usedPorts.Add(p)
|
||||
replicasAllocated++
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if replicasAllocated < replicaCount {
|
||||
return nil, &allocatePortsErr{msg: fmt.Sprintf("not enough available ports to allocate all replicas (needed %d, got %d). Field 'spec.staticEndpoints.nodePort.ports' on ProxyClass %q must have bigger range allocated", replicaCount, usedPorts.Len(), proxyClassName)}
|
||||
}
|
||||
|
||||
return svcToNodePorts, nil
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) ensureNodePortServiceCreated(ctx context.Context, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) (map[string]uint16, *uint16, error) {
|
||||
// NOTE: (ChaosInTheCRD) we want the same TargetPort for every static endpoint NodePort Service for the ProxyGroup
|
||||
tailscaledPort := getRandomPort()
|
||||
svcs := []*corev1.Service{}
|
||||
for i := range pgReplicas(pg) {
|
||||
replicaName := pgNodePortServiceName(pg.Name, i)
|
||||
|
||||
svc := &corev1.Service{}
|
||||
err := r.Get(ctx, types.NamespacedName{Name: replicaName, Namespace: r.tsNamespace}, svc)
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return nil, nil, fmt.Errorf("error getting Kubernetes Service %q: %w", replicaName, err)
|
||||
}
|
||||
if apierrors.IsNotFound(err) {
|
||||
svcs = append(svcs, pgNodePortService(pg, replicaName, r.tsNamespace))
|
||||
} else {
|
||||
// NOTE: if we can we want to recover the random port used for tailscaled,
|
||||
// as well as the NodePort previously used for that Service
|
||||
if len(svc.Spec.Ports) == 1 {
|
||||
if svc.Spec.Ports[0].Port != 0 {
|
||||
tailscaledPort = uint16(svc.Spec.Ports[0].Port)
|
||||
}
|
||||
}
|
||||
svcs = append(svcs, svc)
|
||||
}
|
||||
}
|
||||
|
||||
svcToNodePorts, err := r.allocatePorts(ctx, pg, pc.Name, pc.Spec.StaticEndpoints.NodePort.Ports)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to allocate NodePorts to ProxyGroup Services: %w", err)
|
||||
}
|
||||
|
||||
for _, svc := range svcs {
|
||||
// NOTE: we know that every service is going to have 1 port here
|
||||
svc.Spec.Ports[0].Port = int32(tailscaledPort)
|
||||
svc.Spec.Ports[0].TargetPort = intstr.FromInt(int(tailscaledPort))
|
||||
svc.Spec.Ports[0].NodePort = int32(svcToNodePorts[svc.Name])
|
||||
|
||||
_, err = createOrUpdate(ctx, r.Client, r.tsNamespace, svc, func(s *corev1.Service) {
|
||||
s.ObjectMeta.Labels = svc.ObjectMeta.Labels
|
||||
s.ObjectMeta.Annotations = svc.ObjectMeta.Annotations
|
||||
s.ObjectMeta.OwnerReferences = svc.ObjectMeta.OwnerReferences
|
||||
s.Spec.Selector = svc.Spec.Selector
|
||||
s.Spec.Ports = svc.Spec.Ports
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating/updating Kubernetes NodePort Service %q: %w", svc.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return svcToNodePorts, ptr.To(tailscaledPort), nil
|
||||
}
|
||||
|
||||
// cleanupDanglingResources ensures we don't leak config secrets, state secrets, and
|
||||
// tailnet devices when the number of replicas specified is reduced.
|
||||
func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, pg *tsapi.ProxyGroup) error {
|
||||
func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) error {
|
||||
logger := r.logger(pg.Name)
|
||||
metadata, err := r.getNodeMetadata(ctx, pg)
|
||||
if err != nil {
|
||||
@@ -371,6 +536,30 @@ func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, pg
|
||||
return fmt.Errorf("error deleting config Secret %s: %w", configSecret.Name, err)
|
||||
}
|
||||
}
|
||||
// NOTE(ChaosInTheCRD): we shouldn't need to get the service first, checking for a not found error should be enough
|
||||
svc := &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-nodeport", m.stateSecret.Name),
|
||||
Namespace: m.stateSecret.Namespace,
|
||||
},
|
||||
}
|
||||
if err := r.Delete(ctx, svc); err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
return fmt.Errorf("error deleting static endpoints Kubernetes Service %q: %w", svc.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If the ProxyClass has its StaticEndpoints config removed, we want to remove all of the NodePort Services
|
||||
if pc != nil && pc.Spec.StaticEndpoints == nil {
|
||||
labels := map[string]string{
|
||||
kubetypes.LabelManaged: "true",
|
||||
LabelParentType: proxyTypeProxyGroup,
|
||||
LabelParentName: pg.Name,
|
||||
}
|
||||
if err := r.DeleteAllOf(ctx, &corev1.Service{}, client.InNamespace(r.tsNamespace), client.MatchingLabels(labels)); err != nil {
|
||||
return fmt.Errorf("error deleting Kubernetes Services for static endpoints: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -396,7 +585,8 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, pg *tsapi.Proxy
|
||||
mo := &metricsOpts{
|
||||
proxyLabels: pgLabels(pg.Name, nil),
|
||||
tsNamespace: r.tsNamespace,
|
||||
proxyType: "proxygroup"}
|
||||
proxyType: "proxygroup",
|
||||
}
|
||||
if err := maybeCleanupMetricsResources(ctx, mo, r.Client); err != nil {
|
||||
return false, fmt.Errorf("error cleaning up metrics resources: %w", err)
|
||||
}
|
||||
@@ -424,8 +614,9 @@ func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, id tailc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (err error) {
|
||||
func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass, svcToNodePorts map[string]uint16) (endpoints map[string][]netip.AddrPort, err error) {
|
||||
logger := r.logger(pg.Name)
|
||||
endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg))
|
||||
for i := range pgReplicas(pg) {
|
||||
cfgSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -441,7 +632,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
|
||||
logger.Debugf("Secret %s/%s already exists", cfgSecret.GetNamespace(), cfgSecret.GetName())
|
||||
existingCfgSecret = cfgSecret.DeepCopy()
|
||||
} else if !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var authKey string
|
||||
@@ -453,19 +644,32 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
|
||||
}
|
||||
authKey, err = newAuthKey(ctx, r.tsClient, tags)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret)
|
||||
replicaName := pgNodePortServiceName(pg.Name, i)
|
||||
if len(svcToNodePorts) > 0 {
|
||||
port, ok := svcToNodePorts[replicaName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("could not find configured NodePort for ProxyGroup replica %q", replicaName)
|
||||
}
|
||||
|
||||
endpoints[replicaName], err = r.findStaticEndpoints(ctx, existingCfgSecret, proxyClass, port, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not find static endpoints for replica %q: %w", replicaName, err)
|
||||
}
|
||||
}
|
||||
|
||||
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret, endpoints[replicaName])
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating tailscaled config: %w", err)
|
||||
return nil, fmt.Errorf("error creating tailscaled config: %w", err)
|
||||
}
|
||||
|
||||
for cap, cfg := range configs {
|
||||
cfgJSON, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshalling tailscaled config: %w", err)
|
||||
return nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
|
||||
}
|
||||
mak.Set(&cfgSecret.Data, tsoperator.TailscaledConfigFileName(cap), cfgJSON)
|
||||
}
|
||||
@@ -474,18 +678,111 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
|
||||
if !apiequality.Semantic.DeepEqual(existingCfgSecret, cfgSecret) {
|
||||
logger.Debugf("Updating the existing ProxyGroup config Secret %s", cfgSecret.Name)
|
||||
if err := r.Update(ctx, cfgSecret); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.Debugf("Creating a new config Secret %s for the ProxyGroup", cfgSecret.Name)
|
||||
if err := r.Create(ctx, cfgSecret); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
type FindStaticEndpointErr struct {
|
||||
msg string
|
||||
}
|
||||
|
||||
func (e *FindStaticEndpointErr) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
// findStaticEndpoints returns up to two `netip.AddrPort` entries, derived from the ExternalIPs of Nodes that
|
||||
// match the `proxyClass`'s selector within the StaticEndpoints configuration. The port is set to the replica's NodePort Service Port.
|
||||
func (r *ProxyGroupReconciler) findStaticEndpoints(ctx context.Context, existingCfgSecret *corev1.Secret, proxyClass *tsapi.ProxyClass, port uint16, logger *zap.SugaredLogger) ([]netip.AddrPort, error) {
|
||||
var currAddrs []netip.AddrPort
|
||||
if existingCfgSecret != nil {
|
||||
oldConfB := existingCfgSecret.Data[tsoperator.TailscaledConfigFileName(106)]
|
||||
if len(oldConfB) > 0 {
|
||||
var oldConf ipn.ConfigVAlpha
|
||||
if err := json.Unmarshal(oldConfB, &oldConf); err == nil {
|
||||
currAddrs = oldConf.StaticEndpoints
|
||||
} else {
|
||||
logger.Debugf("failed to unmarshal tailscaled config from secret %q: %v", existingCfgSecret.Name, err)
|
||||
}
|
||||
} else {
|
||||
logger.Debugf("failed to get tailscaled config from secret %q: empty data", existingCfgSecret.Name)
|
||||
}
|
||||
}
|
||||
|
||||
nodes := new(corev1.NodeList)
|
||||
selectors := client.MatchingLabels(proxyClass.Spec.StaticEndpoints.NodePort.Selector)
|
||||
|
||||
err := r.List(ctx, nodes, selectors)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list nodes: %w", err)
|
||||
}
|
||||
|
||||
if len(nodes.Items) == 0 {
|
||||
return nil, &FindStaticEndpointErr{msg: fmt.Sprintf("failed to match nodes to configured Selectors on `spec.staticEndpoints.nodePort.selectors` field for ProxyClass %q", proxyClass.Name)}
|
||||
}
|
||||
|
||||
endpoints := []netip.AddrPort{}
|
||||
|
||||
// NOTE(ChaosInTheCRD): Setting a hard limit of two static endpoints.
|
||||
newAddrs := []netip.AddrPort{}
|
||||
for _, n := range nodes.Items {
|
||||
for _, a := range n.Status.Addresses {
|
||||
if a.Type == corev1.NodeExternalIP {
|
||||
addr := getStaticEndpointAddress(&a, port)
|
||||
if addr == nil {
|
||||
logger.Debugf("failed to parse %q address on node %q: %q", corev1.NodeExternalIP, n.Name, a.Address)
|
||||
continue
|
||||
}
|
||||
|
||||
// we want to add the currently used IPs first before
|
||||
// adding new ones.
|
||||
if currAddrs != nil && slices.Contains(currAddrs, *addr) {
|
||||
endpoints = append(endpoints, *addr)
|
||||
} else {
|
||||
newAddrs = append(newAddrs, *addr)
|
||||
}
|
||||
}
|
||||
|
||||
if len(endpoints) == 2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if the 2 endpoints limit hasn't been reached, we
|
||||
// can start adding newIPs.
|
||||
if len(endpoints) < 2 {
|
||||
for _, a := range newAddrs {
|
||||
endpoints = append(endpoints, a)
|
||||
if len(endpoints) == 2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(endpoints) == 0 {
|
||||
return nil, &FindStaticEndpointErr{msg: fmt.Sprintf("failed to find any `status.addresses` of type %q on nodes using configured Selectors on `spec.staticEndpoints.nodePort.selectors` for ProxyClass %q", corev1.NodeExternalIP, proxyClass.Name)}
|
||||
}
|
||||
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
func getStaticEndpointAddress(a *corev1.NodeAddress, port uint16) *netip.AddrPort {
|
||||
addr, err := netip.ParseAddr(a.Address)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return ptr.To(netip.AddrPortFrom(addr, port))
|
||||
}
|
||||
|
||||
// ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
|
||||
@@ -514,7 +811,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro
|
||||
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
|
||||
}
|
||||
|
||||
func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) {
|
||||
func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret, staticEndpoints []netip.AddrPort) (tailscaledConfigs, error) {
|
||||
conf := &ipn.ConfigVAlpha{
|
||||
Version: "alpha0",
|
||||
AcceptDNS: "false",
|
||||
@@ -531,6 +828,10 @@ func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32
|
||||
conf.AcceptRoutes = "true"
|
||||
}
|
||||
|
||||
if len(staticEndpoints) > 0 {
|
||||
conf.StaticEndpoints = staticEndpoints
|
||||
}
|
||||
|
||||
deviceAuthed := false
|
||||
for _, d := range pg.Status.Devices {
|
||||
if d.Hostname == *conf.Hostname {
|
||||
@@ -624,7 +925,7 @@ func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.Pr
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) getDeviceInfo(ctx context.Context, pg *tsapi.ProxyGroup) (devices []tsapi.TailnetDevice, _ error) {
|
||||
func (r *ProxyGroupReconciler) getDeviceInfo(ctx context.Context, staticEndpoints map[string][]netip.AddrPort, pg *tsapi.ProxyGroup) (devices []tsapi.TailnetDevice, _ error) {
|
||||
metadata, err := r.getNodeMetadata(ctx, pg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -638,10 +939,21 @@ func (r *ProxyGroupReconciler) getDeviceInfo(ctx context.Context, pg *tsapi.Prox
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
devices = append(devices, tsapi.TailnetDevice{
|
||||
|
||||
dev := tsapi.TailnetDevice{
|
||||
Hostname: device.Hostname,
|
||||
TailnetIPs: device.TailnetIPs,
|
||||
})
|
||||
}
|
||||
|
||||
if ep, ok := staticEndpoints[device.Hostname]; ok && len(ep) > 0 {
|
||||
eps := make([]string, 0, len(ep))
|
||||
for _, e := range ep {
|
||||
eps = append(eps, e.String())
|
||||
}
|
||||
dev.StaticEndpoints = eps
|
||||
}
|
||||
|
||||
devices = append(devices, dev)
|
||||
}
|
||||
|
||||
return devices, nil
|
||||
@@ -655,3 +967,8 @@ type nodeMetadata struct {
|
||||
tsID tailcfg.StableNodeID
|
||||
dnsName string
|
||||
}
|
||||
|
||||
func (pr *ProxyGroupReconciler) setStatusReady(pg *tsapi.ProxyGroup, status metav1.ConditionStatus, reason string, msg string, logger *zap.SugaredLogger) {
|
||||
pr.recorder.Eventf(pg, corev1.EventTypeWarning, reason, msg)
|
||||
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, status, reason, msg, pg.Generation, pr.clock, logger)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user