cmd/k8s-operator: ensure HA Ingress can operate in multicluster mode. (#15157)

cmd/k8s-operator: ensure HA Ingress can operate in multicluster mode.

Update the owner reference mechanism so that:
- if during HA Ingress resource creation, a VIPService
with some other operator's owner reference is already found,
just update the owner references to add one for this operator
- if during HA Ingress deletion, the VIPService is found to have owner
reference(s) from another operator, don't delete the VIPService, just
remove this operator's owner reference
- requeue after HA Ingress reconciles that resulted in VIPService updates,
to guard against overwrites due to concurrent operations from different
clusters.

Updates tailscale/corp#24795


Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2025-03-06 15:13:10 -08:00 committed by GitHub
parent 9d7f2719bb
commit 74a2373e1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 480 additions and 195 deletions

View File

@ -15,6 +15,9 @@ import (
"slices" "slices"
"strings" "strings"
"sync" "sync"
"time"
"math/rand/v2"
"go.uber.org/zap" "go.uber.org/zap"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
@ -53,9 +56,9 @@ const (
var gaugePGIngressResources = clientmetric.NewGauge(kubetypes.MetricIngressPGResourceCount) var gaugePGIngressResources = clientmetric.NewGauge(kubetypes.MetricIngressPGResourceCount)
// IngressPGReconciler is a controller that reconciles Tailscale Ingresses should be exposed on an ingress ProxyGroup // HAIngressReconciler is a controller that reconciles Tailscale Ingresses
// (in HA mode). // should be exposed on an ingress ProxyGroup (in HA mode).
type IngressPGReconciler struct { type HAIngressReconciler struct {
client.Client client.Client
recorder record.EventRecorder recorder record.EventRecorder
@ -65,6 +68,7 @@ type IngressPGReconciler struct {
tsNamespace string tsNamespace string
lc localClient lc localClient
defaultTags []string defaultTags []string
operatorID string // stableID of the operator's Tailscale device
mu sync.Mutex // protects following mu sync.Mutex // protects following
// managedIngresses is a set of all ingress resources that we're currently // managedIngresses is a set of all ingress resources that we're currently
@ -72,20 +76,29 @@ type IngressPGReconciler struct {
managedIngresses set.Slice[types.UID] managedIngresses set.Slice[types.UID]
} }
// Reconcile reconciles Ingresses that should be exposed over Tailscale in HA mode (on a ProxyGroup). It looks at all // Reconcile reconciles Ingresses that should be exposed over Tailscale in HA
// Ingresses with tailscale.com/proxy-group annotation. For each such Ingress, it ensures that a VIPService named after // mode (on a ProxyGroup). It looks at all Ingresses with
// the hostname of the Ingress exists and is up to date. It also ensures that the serve config for the ingress // tailscale.com/proxy-group annotation. For each such Ingress, it ensures that
// ProxyGroup is updated to route traffic for the VIPService to the Ingress's backend Services. // a VIPService named after the hostname of the Ingress exists and is up to
// When an Ingress is deleted or unexposed, the VIPService and the associated serve config are cleaned up. // date. It also ensures that the serve config for the ingress ProxyGroup is
// Ingress hostname change also results in the VIPService for the previous hostname being cleaned up and a new VIPService // updated to route traffic for the VIPService to the Ingress's backend
// being created for the new hostname. // Services. Ingress hostname change also results in the VIPService for the
func (a *IngressPGReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { // previous hostname being cleaned up and a new VIPService being created for the
logger := a.logger.With("Ingress", req.NamespacedName) // new hostname.
// HA Ingresses support multi-cluster Ingress setup.
// Each VIPService contains a list of owner references that uniquely identify
// the Ingress resource and the operator. When an Ingress that acts as a
// backend is being deleted, the corresponding VIPService is only deleted if the
// only owner reference that it contains is for this Ingress. If other owner
// references are found, then cleanup operation only removes this Ingress' owner
// reference.
func (r *HAIngressReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
logger := r.logger.With("Ingress", req.NamespacedName)
logger.Debugf("starting reconcile") logger.Debugf("starting reconcile")
defer logger.Debugf("reconcile finished") defer logger.Debugf("reconcile finished")
ing := new(networkingv1.Ingress) ing := new(networkingv1.Ingress)
err = a.Get(ctx, req.NamespacedName, ing) err = r.Get(ctx, req.NamespacedName, ing)
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request. // Request object not found, could have been deleted after reconcile request.
logger.Debugf("Ingress not found, assuming it was deleted") logger.Debugf("Ingress not found, assuming it was deleted")
@ -99,57 +112,71 @@ func (a *IngressPGReconciler) Reconcile(ctx context.Context, req reconcile.Reque
hostname := hostnameForIngress(ing) hostname := hostnameForIngress(ing)
logger = logger.With("hostname", hostname) logger = logger.With("hostname", hostname)
if !ing.DeletionTimestamp.IsZero() || !shouldExpose(ing) { // needsRequeue is set to true if the underlying VIPService has changed as a result of this reconcile. If that
return res, a.maybeCleanup(ctx, hostname, ing, logger) // is the case, we reconcile the Ingress one more time to ensure that concurrent updates to the VIPService in a
// multi-cluster Ingress setup have not resulted in another actor overwriting our VIPService update.
needsRequeue := false
if !ing.DeletionTimestamp.IsZero() || !r.shouldExpose(ing) {
needsRequeue, err = r.maybeCleanup(ctx, hostname, ing, logger)
} else {
needsRequeue, err = r.maybeProvision(ctx, hostname, ing, logger)
} }
if err != nil {
if err := a.maybeProvision(ctx, hostname, ing, logger); err != nil { return res, err
return res, fmt.Errorf("failed to provision: %w", err) }
if needsRequeue {
res = reconcile.Result{RequeueAfter: requeueInterval()}
} }
return res, nil return res, nil
} }
// maybeProvision ensures that the VIPService and serve config for the Ingress are created or updated. // maybeProvision ensures that a VIPService for this Ingress exists and is up to date and that the serve config for the
func (a *IngressPGReconciler) maybeProvision(ctx context.Context, hostname string, ing *networkingv1.Ingress, logger *zap.SugaredLogger) error { // corresponding ProxyGroup contains the Ingress backend's definition.
if err := validateIngressClass(ctx, a.Client); err != nil { // If a VIPService does not exist, it will be created.
// If a VIPService exists, but only with owner references from other operator instances, an owner reference for this
// operator instance is added.
// If a VIPService exists, but does not have an owner reference from any operator, we error
// out assuming that this is an owner reference created by an unknown actor.
// Returns true if the operation resulted in a VIPService update.
func (r *HAIngressReconciler) maybeProvision(ctx context.Context, hostname string, ing *networkingv1.Ingress, logger *zap.SugaredLogger) (svcsChanged bool, err error) {
if err := validateIngressClass(ctx, r.Client); err != nil {
logger.Infof("error validating tailscale IngressClass: %v.", err) logger.Infof("error validating tailscale IngressClass: %v.", err)
return nil return false, nil
} }
// Get and validate ProxyGroup readiness // Get and validate ProxyGroup readiness
pgName := ing.Annotations[AnnotationProxyGroup] pgName := ing.Annotations[AnnotationProxyGroup]
if pgName == "" { if pgName == "" {
logger.Infof("[unexpected] no ProxyGroup annotation, skipping VIPService provisioning") logger.Infof("[unexpected] no ProxyGroup annotation, skipping VIPService provisioning")
return nil return false, nil
} }
logger = logger.With("ProxyGroup", pgName) logger = logger.With("ProxyGroup", pgName)
pg := &tsapi.ProxyGroup{} pg := &tsapi.ProxyGroup{}
if err := a.Get(ctx, client.ObjectKey{Name: pgName}, pg); err != nil { if err := r.Get(ctx, client.ObjectKey{Name: pgName}, pg); err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
logger.Infof("ProxyGroup %q does not exist", pgName) logger.Infof("ProxyGroup %q does not exist", pgName)
return nil return false, nil
} }
return fmt.Errorf("getting ProxyGroup %q: %w", pgName, err) return false, fmt.Errorf("getting ProxyGroup %q: %w", pgName, err)
} }
if !tsoperator.ProxyGroupIsReady(pg) { if !tsoperator.ProxyGroupIsReady(pg) {
// TODO(irbekrm): we need to reconcile ProxyGroup Ingresses on ProxyGroup changes to not miss the status update logger.Infof("ProxyGroup %q is not (yet) ready", pgName)
// in this case. return false, nil
logger.Infof("ProxyGroup %q is not ready", pgName)
return nil
} }
// Validate Ingress configuration // Validate Ingress configuration
if err := a.validateIngress(ing, pg); err != nil { if err := r.validateIngress(ctx, ing, pg); err != nil {
logger.Infof("invalid Ingress configuration: %v", err) logger.Infof("invalid Ingress configuration: %v", err)
a.recorder.Event(ing, corev1.EventTypeWarning, "InvalidIngressConfiguration", err.Error()) r.recorder.Event(ing, corev1.EventTypeWarning, "InvalidIngressConfiguration", err.Error())
return nil return false, nil
} }
if !IsHTTPSEnabledOnTailnet(a.tsnetServer) { if !IsHTTPSEnabledOnTailnet(r.tsnetServer) {
a.recorder.Event(ing, corev1.EventTypeWarning, "HTTPSNotEnabled", "HTTPS is not enabled on the tailnet; ingress may not work") r.recorder.Event(ing, corev1.EventTypeWarning, "HTTPSNotEnabled", "HTTPS is not enabled on the tailnet; ingress may not work")
} }
logger = logger.With("proxy-group", pg.Name)
if !slices.Contains(ing.Finalizers, FinalizerNamePG) { if !slices.Contains(ing.Finalizers, FinalizerNamePG) {
// This log line is printed exactly once during initial provisioning, // This log line is printed exactly once during initial provisioning,
// because once the finalizer is in place this block gets skipped. So, // because once the finalizer is in place this block gets skipped. So,
@ -157,64 +184,78 @@ func (a *IngressPGReconciler) maybeProvision(ctx context.Context, hostname strin
// multi-reconcile operation is underway. // multi-reconcile operation is underway.
logger.Infof("exposing Ingress over tailscale") logger.Infof("exposing Ingress over tailscale")
ing.Finalizers = append(ing.Finalizers, FinalizerNamePG) ing.Finalizers = append(ing.Finalizers, FinalizerNamePG)
if err := a.Update(ctx, ing); err != nil { if err := r.Update(ctx, ing); err != nil {
return fmt.Errorf("failed to add finalizer: %w", err) return false, fmt.Errorf("failed to add finalizer: %w", err)
} }
a.mu.Lock() r.mu.Lock()
a.managedIngresses.Add(ing.UID) r.managedIngresses.Add(ing.UID)
gaugePGIngressResources.Set(int64(a.managedIngresses.Len())) gaugePGIngressResources.Set(int64(r.managedIngresses.Len()))
a.mu.Unlock() r.mu.Unlock()
} }
// 1. Ensure that if Ingress' hostname has changed, any VIPService resources corresponding to the old hostname // 1. Ensure that if Ingress' hostname has changed, any VIPService
// are cleaned up. // resources corresponding to the old hostname are cleaned up.
// In practice, this function will ensure that any VIPServices that are associated with the provided ProxyGroup // In practice, this function will ensure that any VIPServices that are
// and no longer owned by an Ingress are cleaned up. This is fine- it is not expensive and ensures that in edge // associated with the provided ProxyGroup and no longer owned by an
// cases (a single update changed both hostname and removed ProxyGroup annotation) the VIPService is more likely // Ingress are cleaned up. This is fine- it is not expensive and ensures
// to be (eventually) removed. // that in edge cases (a single update changed both hostname and removed
if err := a.maybeCleanupProxyGroup(ctx, pgName, logger); err != nil { // ProxyGroup annotation) the VIPService is more likely to be
return fmt.Errorf("failed to cleanup VIPService resources for ProxyGroup: %w", err) // (eventually) removed.
} svcsChanged, err = r.maybeCleanupProxyGroup(ctx, pgName, logger)
// 2. Ensure that there isn't a VIPService with the same hostname already created and not owned by this Ingress.
// TODO(irbekrm): perhaps in future we could have record names being stored on VIPServices. I am not certain if
// there might not be edge cases (custom domains, etc?) where attempting to determine the DNS name of the
// VIPService in this way won't be incorrect.
tcd, err := a.tailnetCertDomain(ctx)
if err != nil { if err != nil {
return fmt.Errorf("error determining DNS name base: %w", err) return false, fmt.Errorf("failed to cleanup VIPService resources for ProxyGroup: %w", err)
}
// 2. Ensure that there isn't a VIPService with the same hostname
// already created and not owned by this Ingress.
// TODO(irbekrm): perhaps in future we could have record names being
// stored on VIPServices. I am not certain if there might not be edge
// cases (custom domains, etc?) where attempting to determine the DNS
// name of the VIPService in this way won't be incorrect.
tcd, err := r.tailnetCertDomain(ctx)
if err != nil {
return false, fmt.Errorf("error determining DNS name base: %w", err)
} }
dnsName := hostname + "." + tcd dnsName := hostname + "." + tcd
serviceName := tailcfg.ServiceName("svc:" + hostname) serviceName := tailcfg.ServiceName("svc:" + hostname)
existingVIPSvc, err := a.tsClient.GetVIPService(ctx, serviceName) existingVIPSvc, err := r.tsClient.GetVIPService(ctx, serviceName)
// TODO(irbekrm): here and when creating the VIPService, verify if the error is not terminal (and therefore // TODO(irbekrm): here and when creating the VIPService, verify if the
// should not be reconciled). For example, if the hostname is already a hostname of a Tailscale node, the GET // error is not terminal (and therefore should not be reconciled). For
// here will fail. // example, if the hostname is already a hostname of a Tailscale node,
// the GET here will fail.
if err != nil { if err != nil {
errResp := &tailscale.ErrResponse{} errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status != http.StatusNotFound { if ok := errors.As(err, errResp); ok && errResp.Status != http.StatusNotFound {
return fmt.Errorf("error getting VIPService %q: %w", hostname, err) return false, fmt.Errorf("error getting VIPService %q: %w", hostname, err)
} }
} }
if existingVIPSvc != nil && !isVIPServiceForIngress(existingVIPSvc, ing) { // Generate the VIPService comment for new or existing VIPService. This
logger.Infof("VIPService %q for MagicDNS name %q already exists, but is not owned by this Ingress. Please delete it manually and recreate this Ingress to proceed or create an Ingress for a different MagicDNS name", hostname, dnsName) // checks and ensures that VIPService's owner references are updated for
a.recorder.Event(ing, corev1.EventTypeWarning, "ConflictingVIPServiceExists", fmt.Sprintf("VIPService %q for MagicDNS name %q already exists, but is not owned by this Ingress. Please delete it manually to proceed or create an Ingress for a different MagicDNS name", hostname, dnsName)) // this Ingress and errors if that is not possible (i.e. because it
return nil // appears that the VIPService has been created by a non-operator
// actor).
svcComment, err := r.ownerRefsComment(existingVIPSvc)
if err != nil {
const instr = "To proceed, you can either manually delete the existing VIPService or choose a different MagicDNS name at `.spec.tls.hosts[0] in the Ingress definition"
msg := fmt.Sprintf("error ensuring ownership of VIPService %s: %v. %s", hostname, err, instr)
logger.Warn(msg)
r.recorder.Event(ing, corev1.EventTypeWarning, "InvalidVIPService", msg)
return false, nil
} }
// 3. Ensure that the serve config for the ProxyGroup contains the VIPService // 3. Ensure that the serve config for the ProxyGroup contains the VIPService.
cm, cfg, err := a.proxyGroupServeConfig(ctx, pgName) cm, cfg, err := r.proxyGroupServeConfig(ctx, pgName)
if err != nil { if err != nil {
return fmt.Errorf("error getting Ingress serve config: %w", err) return false, fmt.Errorf("error getting Ingress serve config: %w", err)
} }
if cm == nil { if cm == nil {
logger.Infof("no Ingress serve config ConfigMap found, unable to update serve config. Ensure that ProxyGroup is healthy.") logger.Infof("no Ingress serve config ConfigMap found, unable to update serve config. Ensure that ProxyGroup is healthy.")
return nil return svcsChanged, nil
} }
ep := ipn.HostPort(fmt.Sprintf("%s:443", dnsName)) ep := ipn.HostPort(fmt.Sprintf("%s:443", dnsName))
handlers, err := handlersForIngress(ctx, ing, a.Client, a.recorder, dnsName, logger) handlers, err := handlersForIngress(ctx, ing, r.Client, r.recorder, dnsName, logger)
if err != nil { if err != nil {
return fmt.Errorf("failed to get handlers for Ingress: %w", err) return false, fmt.Errorf("failed to get handlers for Ingress: %w", err)
} }
ingCfg := &ipn.ServiceConfig{ ingCfg := &ipn.ServiceConfig{
TCP: map[uint16]*ipn.TCPPortHandler{ TCP: map[uint16]*ipn.TCPPortHandler{
@ -250,16 +291,16 @@ func (a *IngressPGReconciler) maybeProvision(ctx context.Context, hostname strin
mak.Set(&cfg.Services, serviceName, ingCfg) mak.Set(&cfg.Services, serviceName, ingCfg)
cfgBytes, err := json.Marshal(cfg) cfgBytes, err := json.Marshal(cfg)
if err != nil { if err != nil {
return fmt.Errorf("error marshaling serve config: %w", err) return false, fmt.Errorf("error marshaling serve config: %w", err)
} }
mak.Set(&cm.BinaryData, serveConfigKey, cfgBytes) mak.Set(&cm.BinaryData, serveConfigKey, cfgBytes)
if err := a.Update(ctx, cm); err != nil { if err := r.Update(ctx, cm); err != nil {
return fmt.Errorf("error updating serve config: %w", err) return false, fmt.Errorf("error updating serve config: %w", err)
} }
} }
// 4. Ensure that the VIPService exists and is up to date. // 4. Ensure that the VIPService exists and is up to date.
tags := a.defaultTags tags := r.defaultTags
if tstr, ok := ing.Annotations[AnnotationTags]; ok { if tstr, ok := ing.Annotations[AnnotationTags]; ok {
tags = strings.Split(tstr, ",") tags = strings.Split(tstr, ",")
} }
@ -273,27 +314,32 @@ func (a *IngressPGReconciler) maybeProvision(ctx context.Context, hostname strin
Name: serviceName, Name: serviceName,
Tags: tags, Tags: tags,
Ports: vipPorts, Ports: vipPorts,
Comment: fmt.Sprintf(VIPSvcOwnerRef, ing.UID), Comment: svcComment,
} }
if existingVIPSvc != nil { if existingVIPSvc != nil {
vipSvc.Addrs = existingVIPSvc.Addrs vipSvc.Addrs = existingVIPSvc.Addrs
} }
// TODO(irbekrm): right now if two Ingress resources attempt to apply different VIPService configs (different
// tags, or HTTP endpoint settings) we can end up reconciling those in a loop. We should detect when an Ingress
// with the same generation number has been reconciled ~more than N times and stop attempting to apply updates.
if existingVIPSvc == nil || if existingVIPSvc == nil ||
!reflect.DeepEqual(vipSvc.Tags, existingVIPSvc.Tags) || !reflect.DeepEqual(vipSvc.Tags, existingVIPSvc.Tags) ||
!reflect.DeepEqual(vipSvc.Ports, existingVIPSvc.Ports) { !reflect.DeepEqual(vipSvc.Ports, existingVIPSvc.Ports) ||
!strings.EqualFold(vipSvc.Comment, existingVIPSvc.Comment) {
logger.Infof("Ensuring VIPService %q exists and is up to date", hostname) logger.Infof("Ensuring VIPService %q exists and is up to date", hostname)
if err := a.tsClient.CreateOrUpdateVIPService(ctx, vipSvc); err != nil { if err := r.tsClient.CreateOrUpdateVIPService(ctx, vipSvc); err != nil {
logger.Infof("error creating VIPService: %v", err) return false, fmt.Errorf("error creating VIPService: %w", err)
return fmt.Errorf("error creating VIPService: %w", err)
} }
} }
// 5. Update tailscaled's AdvertiseServices config, which should add the VIPService // 5. Update tailscaled's AdvertiseServices config, which should add the VIPService
// IPs to the ProxyGroup Pods' AllowedIPs in the next netmap update if approved. // IPs to the ProxyGroup Pods' AllowedIPs in the next netmap update if approved.
if err = a.maybeUpdateAdvertiseServicesConfig(ctx, pg.Name, serviceName, true, logger); err != nil { if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pg.Name, serviceName, true, logger); err != nil {
return fmt.Errorf("failed to update tailscaled config: %w", err) return false, fmt.Errorf("failed to update tailscaled config: %w", err)
} }
// TODO(irbekrm): check that the replicas are ready to route traffic for the VIPService before updating Ingress
// status.
// 6. Update Ingress status // 6. Update Ingress status
oldStatus := ing.Status.DeepCopy() oldStatus := ing.Status.DeepCopy()
ports := []networkingv1.IngressPortStatus{ ports := []networkingv1.IngressPortStatus{
@ -315,30 +361,29 @@ func (a *IngressPGReconciler) maybeProvision(ctx context.Context, hostname strin
}, },
} }
if apiequality.Semantic.DeepEqual(oldStatus, ing.Status) { if apiequality.Semantic.DeepEqual(oldStatus, ing.Status) {
return nil return svcsChanged, nil
} }
if err := a.Status().Update(ctx, ing); err != nil { if err := r.Status().Update(ctx, ing); err != nil {
return fmt.Errorf("failed to update Ingress status: %w", err) return false, fmt.Errorf("failed to update Ingress status: %w", err)
} }
return nil return svcsChanged, nil
} }
// maybeCleanupProxyGroup ensures that if an Ingress hostname has changed, any VIPService resources created for the // VIPServices that are associated with the provided ProxyGroup and no longer managed this operator's instance are deleted, if not owned by other operator instances, else the owner reference is cleaned up.
// Ingress' ProxyGroup corresponding to the old hostname are cleaned up. A run of this function will ensure that any // Returns true if the operation resulted in existing VIPService updates (owner reference removal).
// VIPServices that are associated with the provided ProxyGroup and no longer owned by an Ingress are cleaned up. func (r *HAIngressReconciler) maybeCleanupProxyGroup(ctx context.Context, proxyGroupName string, logger *zap.SugaredLogger) (svcsChanged bool, err error) {
func (a *IngressPGReconciler) maybeCleanupProxyGroup(ctx context.Context, pgName string, logger *zap.SugaredLogger) error {
// Get serve config for the ProxyGroup // Get serve config for the ProxyGroup
cm, cfg, err := a.proxyGroupServeConfig(ctx, pgName) cm, cfg, err := r.proxyGroupServeConfig(ctx, proxyGroupName)
if err != nil { if err != nil {
return fmt.Errorf("getting serve config: %w", err) return false, fmt.Errorf("getting serve config: %w", err)
} }
if cfg == nil { if cfg == nil {
return nil // ProxyGroup does not have any VIPServices return false, nil // ProxyGroup does not have any VIPServices
} }
ingList := &networkingv1.IngressList{} ingList := &networkingv1.IngressList{}
if err := a.List(ctx, ingList); err != nil { if err := r.List(ctx, ingList); err != nil {
return fmt.Errorf("listing Ingresses: %w", err) return false, fmt.Errorf("listing Ingresses: %w", err)
} }
serveConfigChanged := false serveConfigChanged := false
// For each VIPService in serve config... // For each VIPService in serve config...
@ -357,26 +402,21 @@ func (a *IngressPGReconciler) maybeCleanupProxyGroup(ctx context.Context, pgName
logger.Infof("VIPService %q is not owned by any Ingress, cleaning up", vipServiceName) logger.Infof("VIPService %q is not owned by any Ingress, cleaning up", vipServiceName)
// Delete the VIPService from control if necessary. // Delete the VIPService from control if necessary.
svc, err := a.tsClient.GetVIPService(ctx, vipServiceName) svc, _ := r.tsClient.GetVIPService(ctx, vipServiceName)
if err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); !ok || errResp.Status != http.StatusNotFound {
return err
}
}
if svc != nil && isVIPServiceForAnyIngress(svc) { if svc != nil && isVIPServiceForAnyIngress(svc) {
logger.Infof("cleaning up orphaned VIPService %q", vipServiceName) logger.Infof("cleaning up orphaned VIPService %q", vipServiceName)
if err := a.tsClient.DeleteVIPService(ctx, vipServiceName); err != nil { svcsChanged, err = r.cleanupVIPService(ctx, vipServiceName, logger)
if err != nil {
errResp := &tailscale.ErrResponse{} errResp := &tailscale.ErrResponse{}
if !errors.As(err, &errResp) || errResp.Status != http.StatusNotFound { if !errors.As(err, &errResp) || errResp.Status != http.StatusNotFound {
return fmt.Errorf("deleting VIPService %q: %w", vipServiceName, err) return false, fmt.Errorf("deleting VIPService %q: %w", vipServiceName, err)
} }
} }
} }
// Make sure the VIPService is not advertised in tailscaled or serve config. // Make sure the VIPService is not advertised in tailscaled or serve config.
if err = a.maybeUpdateAdvertiseServicesConfig(ctx, pgName, vipServiceName, false, logger); err != nil { if err = r.maybeUpdateAdvertiseServicesConfig(ctx, proxyGroupName, vipServiceName, false, logger); err != nil {
return fmt.Errorf("failed to update tailscaled config services: %w", err) return false, fmt.Errorf("failed to update tailscaled config services: %w", err)
} }
delete(cfg.Services, vipServiceName) delete(cfg.Services, vipServiceName)
serveConfigChanged = true serveConfigChanged = true
@ -386,55 +426,67 @@ func (a *IngressPGReconciler) maybeCleanupProxyGroup(ctx context.Context, pgName
if serveConfigChanged { if serveConfigChanged {
cfgBytes, err := json.Marshal(cfg) cfgBytes, err := json.Marshal(cfg)
if err != nil { if err != nil {
return fmt.Errorf("marshaling serve config: %w", err) return false, fmt.Errorf("marshaling serve config: %w", err)
} }
mak.Set(&cm.BinaryData, serveConfigKey, cfgBytes) mak.Set(&cm.BinaryData, serveConfigKey, cfgBytes)
if err := a.Update(ctx, cm); err != nil { if err := r.Update(ctx, cm); err != nil {
return fmt.Errorf("updating serve config: %w", err) return false, fmt.Errorf("updating serve config: %w", err)
} }
} }
return svcsChanged, nil
return nil
} }
// maybeCleanup ensures that any resources, such as a VIPService created for this Ingress, are cleaned up when the // maybeCleanup ensures that any resources, such as a VIPService created for this Ingress, are cleaned up when the
// Ingress is being deleted or is unexposed. // Ingress is being deleted or is unexposed. The cleanup is safe for a multi-cluster setup- the VIPService is only
func (a *IngressPGReconciler) maybeCleanup(ctx context.Context, hostname string, ing *networkingv1.Ingress, logger *zap.SugaredLogger) error { // deleted if it does not contain any other owner references. If it does the cleanup only removes the owner reference
// corresponding to this Ingress.
func (r *HAIngressReconciler) maybeCleanup(ctx context.Context, hostname string, ing *networkingv1.Ingress, logger *zap.SugaredLogger) (svcChanged bool, err error) {
logger.Debugf("Ensuring any resources for Ingress are cleaned up") logger.Debugf("Ensuring any resources for Ingress are cleaned up")
ix := slices.Index(ing.Finalizers, FinalizerNamePG) ix := slices.Index(ing.Finalizers, FinalizerNamePG)
if ix < 0 { if ix < 0 {
logger.Debugf("no finalizer, nothing to do") logger.Debugf("no finalizer, nothing to do")
a.mu.Lock() return false, nil
defer a.mu.Unlock()
a.managedIngresses.Remove(ing.UID)
gaugePGIngressResources.Set(int64(a.managedIngresses.Len()))
return nil
}
// 1. Check if there is a VIPService created for this Ingress.
pg := ing.Annotations[AnnotationProxyGroup]
cm, cfg, err := a.proxyGroupServeConfig(ctx, pg)
if err != nil {
return fmt.Errorf("error getting ProxyGroup serve config: %w", err)
}
serviceName := tailcfg.ServiceName("svc:" + hostname)
// VIPService is always first added to serve config and only then created in the Tailscale API, so if it is not
// found in the serve config, we can assume that there is no VIPService. TODO(irbekrm): once we have ingress
// ProxyGroup, we will probably add currently exposed VIPServices to its status. At that point, we can use the
// status rather than checking the serve config each time.
if cfg == nil || cfg.Services == nil || cfg.Services[serviceName] == nil {
return nil
} }
logger.Infof("Ensuring that VIPService %q configuration is cleaned up", hostname) logger.Infof("Ensuring that VIPService %q configuration is cleaned up", hostname)
// 2. Delete the VIPService. // Ensure that if cleanup succeeded Ingress finalizers are removed.
if err := a.deleteVIPServiceIfExists(ctx, serviceName, ing, logger); err != nil { defer func() {
return fmt.Errorf("error deleting VIPService: %w", err) if err != nil {
return
}
if e := r.deleteFinalizer(ctx, ing, logger); err != nil {
err = errors.Join(err, e)
}
}()
// 1. Check if there is a VIPService associated with this Ingress.
pg := ing.Annotations[AnnotationProxyGroup]
cm, cfg, err := r.proxyGroupServeConfig(ctx, pg)
if err != nil {
return false, fmt.Errorf("error getting ProxyGroup serve config: %w", err)
}
serviceName := tailcfg.ServiceName("svc:" + hostname)
// VIPService is always first added to serve config and only then created in the Tailscale API, so if it is not
// found in the serve config, we can assume that there is no VIPService. (If the serve config does not exist at
// all, it is possible that the ProxyGroup has been deleted before cleaning up the Ingress, so carry on with
// cleanup).
if cfg != nil && cfg.Services != nil && cfg.Services[serviceName] == nil {
return false, nil
}
// 2. Clean up the VIPService resources.
svcChanged, err = r.cleanupVIPService(ctx, serviceName, logger)
if err != nil {
return false, fmt.Errorf("error deleting VIPService: %w", err)
}
if cfg == nil || cfg.Services == nil { // user probably deleted the ProxyGroup
return svcChanged, nil
} }
// 3. Unadvertise the VIPService in tailscaled config. // 3. Unadvertise the VIPService in tailscaled config.
if err = a.maybeUpdateAdvertiseServicesConfig(ctx, pg, serviceName, false, logger); err != nil { if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pg, serviceName, false, logger); err != nil {
return fmt.Errorf("failed to update tailscaled config services: %w", err) return false, fmt.Errorf("failed to update tailscaled config services: %w", err)
} }
// 4. Remove the VIPService from the serve config for the ProxyGroup. // 4. Remove the VIPService from the serve config for the ProxyGroup.
@ -442,24 +494,13 @@ func (a *IngressPGReconciler) maybeCleanup(ctx context.Context, hostname string,
delete(cfg.Services, serviceName) delete(cfg.Services, serviceName)
cfgBytes, err := json.Marshal(cfg) cfgBytes, err := json.Marshal(cfg)
if err != nil { if err != nil {
return fmt.Errorf("error marshaling serve config: %w", err) return false, fmt.Errorf("error marshaling serve config: %w", err)
} }
mak.Set(&cm.BinaryData, serveConfigKey, cfgBytes) mak.Set(&cm.BinaryData, serveConfigKey, cfgBytes)
if err := a.Update(ctx, cm); err != nil { return svcChanged, r.Update(ctx, cm)
return fmt.Errorf("error updating ConfigMap %q: %w", cm.Name, err)
}
if err := a.deleteFinalizer(ctx, ing, logger); err != nil {
return fmt.Errorf("failed to remove finalizer: %w", err)
}
a.mu.Lock()
defer a.mu.Unlock()
a.managedIngresses.Remove(ing.UID)
gaugePGIngressResources.Set(int64(a.managedIngresses.Len()))
return nil
} }
func (a *IngressPGReconciler) deleteFinalizer(ctx context.Context, ing *networkingv1.Ingress, logger *zap.SugaredLogger) error { func (r *HAIngressReconciler) deleteFinalizer(ctx context.Context, ing *networkingv1.Ingress, logger *zap.SugaredLogger) error {
found := false found := false
ing.Finalizers = slices.DeleteFunc(ing.Finalizers, func(f string) bool { ing.Finalizers = slices.DeleteFunc(ing.Finalizers, func(f string) bool {
found = true found = true
@ -470,9 +511,13 @@ func (a *IngressPGReconciler) deleteFinalizer(ctx context.Context, ing *networki
} }
logger.Debug("ensure %q finalizer is removed", FinalizerNamePG) logger.Debug("ensure %q finalizer is removed", FinalizerNamePG)
if err := a.Update(ctx, ing); err != nil { if err := r.Update(ctx, ing); err != nil {
return fmt.Errorf("failed to remove finalizer %q: %w", FinalizerNamePG, err) return fmt.Errorf("failed to remove finalizer %q: %w", FinalizerNamePG, err)
} }
r.mu.Lock()
defer r.mu.Unlock()
r.managedIngresses.Remove(ing.UID)
gaugePGIngressResources.Set(int64(r.managedIngresses.Len()))
return nil return nil
} }
@ -480,15 +525,15 @@ func pgIngressCMName(pg string) string {
return fmt.Sprintf("%s-ingress-config", pg) return fmt.Sprintf("%s-ingress-config", pg)
} }
func (a *IngressPGReconciler) proxyGroupServeConfig(ctx context.Context, pg string) (cm *corev1.ConfigMap, cfg *ipn.ServeConfig, err error) { func (r *HAIngressReconciler) proxyGroupServeConfig(ctx context.Context, pg string) (cm *corev1.ConfigMap, cfg *ipn.ServeConfig, err error) {
name := pgIngressCMName(pg) name := pgIngressCMName(pg)
cm = &corev1.ConfigMap{ cm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
Namespace: a.tsNamespace, Namespace: r.tsNamespace,
}, },
} }
if err := a.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil && !apierrors.IsNotFound(err) { if err := r.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil && !apierrors.IsNotFound(err) {
return nil, nil, fmt.Errorf("error retrieving ingress serve config ConfigMap %s: %v", name, err) return nil, nil, fmt.Errorf("error retrieving ingress serve config ConfigMap %s: %v", name, err)
} }
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
@ -508,16 +553,16 @@ type localClient interface {
} }
// tailnetCertDomain returns the base domain (TCD) of the current tailnet. // tailnetCertDomain returns the base domain (TCD) of the current tailnet.
func (a *IngressPGReconciler) tailnetCertDomain(ctx context.Context) (string, error) { func (r *HAIngressReconciler) tailnetCertDomain(ctx context.Context) (string, error) {
st, err := a.lc.StatusWithoutPeers(ctx) st, err := r.lc.StatusWithoutPeers(ctx)
if err != nil { if err != nil {
return "", fmt.Errorf("error getting tailscale status: %w", err) return "", fmt.Errorf("error getting tailscale status: %w", err)
} }
return st.CurrentTailnet.MagicDNSSuffix, nil return st.CurrentTailnet.MagicDNSSuffix, nil
} }
// shouldExpose returns true if the Ingress should be exposed over Tailscale in HA mode (on a ProxyGroup) // shouldExpose returns true if the Ingress should be exposed over Tailscale in HA mode (on a ProxyGroup).
func shouldExpose(ing *networkingv1.Ingress) bool { func (r *HAIngressReconciler) shouldExpose(ing *networkingv1.Ingress) bool {
isTSIngress := ing != nil && isTSIngress := ing != nil &&
ing.Spec.IngressClassName != nil && ing.Spec.IngressClassName != nil &&
*ing.Spec.IngressClassName == tailscaleIngressClassName *ing.Spec.IngressClassName == tailscaleIngressClassName
@ -525,13 +570,6 @@ func shouldExpose(ing *networkingv1.Ingress) bool {
return isTSIngress && pgAnnot != "" return isTSIngress && pgAnnot != ""
} }
func isVIPServiceForIngress(svc *tailscale.VIPService, ing *networkingv1.Ingress) bool {
if svc == nil || ing == nil {
return false
}
return strings.EqualFold(svc.Comment, fmt.Sprintf(VIPSvcOwnerRef, ing.UID))
}
func isVIPServiceForAnyIngress(svc *tailscale.VIPService) bool { func isVIPServiceForAnyIngress(svc *tailscale.VIPService) bool {
if svc == nil { if svc == nil {
return false return false
@ -545,7 +583,7 @@ func isVIPServiceForAnyIngress(svc *tailscale.VIPService) bool {
// - The derived hostname is a valid DNS label // - The derived hostname is a valid DNS label
// - The referenced ProxyGroup exists and is of type 'ingress' // - The referenced ProxyGroup exists and is of type 'ingress'
// - Ingress' TLS block is invalid // - Ingress' TLS block is invalid
func (a *IngressPGReconciler) validateIngress(ing *networkingv1.Ingress, pg *tsapi.ProxyGroup) error { func (r *HAIngressReconciler) validateIngress(ctx context.Context, ing *networkingv1.Ingress, pg *tsapi.ProxyGroup) error {
var errs []error var errs []error
// Validate tags if present // Validate tags if present
@ -581,30 +619,66 @@ func (a *IngressPGReconciler) validateIngress(ing *networkingv1.Ingress, pg *tsa
errs = append(errs, fmt.Errorf("ProxyGroup %q is not ready", pg.Name)) errs = append(errs, fmt.Errorf("ProxyGroup %q is not ready", pg.Name))
} }
// It is invalid to have multiple Ingress resources for the same VIPService in one cluster.
ingList := &networkingv1.IngressList{}
if err := r.List(ctx, ingList); err != nil {
errs = append(errs, fmt.Errorf("[unexpected] error listing Ingresses: %w", err))
return errors.Join(errs...)
}
for _, i := range ingList.Items {
if r.shouldExpose(&i) && hostnameForIngress(&i) == hostname && i.Name != ing.Name {
errs = append(errs, fmt.Errorf("found duplicate Ingress %q for hostname %q - multiple Ingresses for the same hostname in the same cluster are not allowed", i.Name, hostname))
}
}
return errors.Join(errs...) return errors.Join(errs...)
} }
// deleteVIPServiceIfExists attempts to delete the VIPService if it exists and is owned by the given Ingress. // cleanupVIPService deletes any VIPService by the provided name if it is not owned by operator instances other than this one.
func (a *IngressPGReconciler) deleteVIPServiceIfExists(ctx context.Context, name tailcfg.ServiceName, ing *networkingv1.Ingress, logger *zap.SugaredLogger) error { // If a VIPService is found, but contains other owner references, only removes this operator's owner reference.
svc, err := a.tsClient.GetVIPService(ctx, name) // If a VIPService by the given name is not found or does not contain this operator's owner reference, do nothing.
// It returns true if an existing VIPService was updated to remove owner reference, as well as any error that occurred.
func (r *HAIngressReconciler) cleanupVIPService(ctx context.Context, name tailcfg.ServiceName, logger *zap.SugaredLogger) (updated bool, _ error) {
svc, err := r.tsClient.GetVIPService(ctx, name)
if err != nil { if err != nil {
errResp := &tailscale.ErrResponse{} errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound { if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
return nil return false, nil
} }
return fmt.Errorf("error getting VIPService: %w", err) return false, fmt.Errorf("error getting VIPService: %w", err)
} }
if svc == nil {
if !isVIPServiceForIngress(svc, ing) { return false, nil
return nil
} }
c, err := parseComment(svc)
if err != nil {
return false, fmt.Errorf("error parsing VIPService comment")
}
if c == nil || len(c.OwnerRefs) == 0 {
return false, nil
}
// Comparing with the operatorID only means that we will not be able to
// clean up VIPServices in cases where the operator was deleted from the
// cluster before deleting the Ingress. Perhaps the comparison could be
// 'if or.OperatorID === r.operatorID || or.ingressUID == r.ingressUID'.
ix := slices.IndexFunc(c.OwnerRefs, func(or OwnerRef) bool {
return or.OperatorID == r.operatorID
})
if ix == -1 {
return false, nil
}
if len(c.OwnerRefs) == 1 {
logger.Infof("Deleting VIPService %q", name)
return false, r.tsClient.DeleteVIPService(ctx, name)
}
c.OwnerRefs = slices.Delete(c.OwnerRefs, ix, ix+1)
logger.Infof("Deleting VIPService %q", name) logger.Infof("Deleting VIPService %q", name)
if err = a.tsClient.DeleteVIPService(ctx, name); err != nil { json, err := json.Marshal(c)
return fmt.Errorf("error deleting VIPService: %w", err) if err != nil {
return false, fmt.Errorf("error marshalling updated VIPService owner reference: %w", err)
} }
return nil svc.Comment = string(json)
return true, r.tsClient.CreateOrUpdateVIPService(ctx, svc)
} }
// isHTTPEndpointEnabled returns true if the Ingress has been configured to expose an HTTP endpoint to tailnet. // isHTTPEndpointEnabled returns true if the Ingress has been configured to expose an HTTP endpoint to tailnet.
@ -615,7 +689,7 @@ func isHTTPEndpointEnabled(ing *networkingv1.Ingress) bool {
return ing.Annotations[annotationHTTPEndpoint] == "enabled" return ing.Annotations[annotationHTTPEndpoint] == "enabled"
} }
func (a *IngressPGReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, pgName string, serviceName tailcfg.ServiceName, shouldBeAdvertised bool, logger *zap.SugaredLogger) (err error) { func (a *HAIngressReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, pgName string, serviceName tailcfg.ServiceName, shouldBeAdvertised bool, logger *zap.SugaredLogger) (err error) {
logger.Debugf("Updating ProxyGroup tailscaled configs to advertise service %q: %v", serviceName, shouldBeAdvertised) logger.Debugf("Updating ProxyGroup tailscaled configs to advertise service %q: %v", serviceName, shouldBeAdvertised)
// Get all config Secrets for this ProxyGroup. // Get all config Secrets for this ProxyGroup.
@ -665,3 +739,75 @@ func (a *IngressPGReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
return nil return nil
} }
// OwnerRef is an owner reference that uniquely identifies a Tailscale
// Kubernetes operator instance.
type OwnerRef struct {
// OperatorID is the stable ID of the operator's Tailscale device.
OperatorID string `json:"operatorID,omitempty"`
}
// comment is the content of the VIPService.Comment field.
type comment struct {
// OwnerRefs is a list of owner references that identify all operator
// instances that manage this VIPService.
OwnerRefs []OwnerRef `json:"ownerRefs,omitempty"`
}
// ownerRefsComment return VIPService Comment that includes owner reference for this
// operator instance for the provided VIPService. If the VIPService is nil, a
// new comment with owner ref is returned. If the VIPService is not nil, the
// existing comment is returned with the owner reference added, if not already
// present. If the VIPService is not nil, but does not contain a comment we
// return an error as this likely means that the VIPService was created by
// somthing other than a Tailscale Kubernetes operator.
func (r *HAIngressReconciler) ownerRefsComment(svc *tailscale.VIPService) (string, error) {
ref := OwnerRef{
OperatorID: r.operatorID,
}
if svc == nil {
c := &comment{OwnerRefs: []OwnerRef{ref}}
json, err := json.Marshal(c)
if err != nil {
return "", fmt.Errorf("[unexpected] unable to marshal VIPService comment contents: %w, please report this", err)
}
return string(json), nil
}
c, err := parseComment(svc)
if err != nil {
return "", fmt.Errorf("error parsing existing VIPService comment: %w", err)
}
if c == nil || len(c.OwnerRefs) == 0 {
return "", fmt.Errorf("VIPService %s exists, but does not contain Comment field with owner references- not proceeding as this is likely a resource created by something other than a Tailscale Kubernetes Operator", svc.Name)
}
if slices.Contains(c.OwnerRefs, ref) { // up to date
return svc.Comment, nil
}
c.OwnerRefs = append(c.OwnerRefs, ref)
json, err := json.Marshal(c)
if err != nil {
return "", fmt.Errorf("error marshalling updated owner references: %w", err)
}
return string(json), nil
}
// parseComment returns VIPService comment or nil if none found or not matching the expected format.
func parseComment(vipSvc *tailscale.VIPService) (*comment, error) {
if vipSvc.Comment == "" {
return nil, nil
}
c := &comment{}
if err := json.Unmarshal([]byte(vipSvc.Comment), c); err != nil {
return nil, fmt.Errorf("error parsing VIPService Comment field %q: %w", vipSvc.Comment, err)
}
return c, nil
}
// requeueInterval returns a time duration between 5 and 10 minutes, which is
// the period of time after which an HA Ingress, whose VIPService has been newly
// created or changed, needs to be requeued. This is to protect against
// VIPService owner references being overwritten as a result of concurrent
// updates during multi-clutster Ingress create/update operations.
func requeueInterval() time.Duration {
return time.Duration(rand.N(5)+5) * time.Minute
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/fake"
"tailscale.com/internal/client/tailscale"
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/ipn/ipnstate" "tailscale.com/ipn/ipnstate"
tsoperator "tailscale.com/k8s-operator" tsoperator "tailscale.com/k8s-operator"
@ -190,6 +191,15 @@ func TestValidateIngress(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-ingress", Name: "test-ingress",
Namespace: "default", Namespace: "default",
Annotations: map[string]string{
AnnotationProxyGroup: "test-pg",
},
},
Spec: networkingv1.IngressSpec{
IngressClassName: ptr.To("tailscale"),
TLS: []networkingv1.IngressTLS{
{Hosts: []string{"test"}},
},
}, },
} }
@ -213,10 +223,11 @@ func TestValidateIngress(t *testing.T) {
} }
tests := []struct { tests := []struct {
name string name string
ing *networkingv1.Ingress ing *networkingv1.Ingress
pg *tsapi.ProxyGroup pg *tsapi.ProxyGroup
wantErr string existingIngs []networkingv1.Ingress
wantErr string
}{ }{
{ {
name: "valid_ingress_with_hostname", name: "valid_ingress_with_hostname",
@ -306,12 +317,38 @@ func TestValidateIngress(t *testing.T) {
}, },
wantErr: "ProxyGroup \"test-pg\" is not ready", wantErr: "ProxyGroup \"test-pg\" is not ready",
}, },
{
name: "duplicate_hostname",
ing: baseIngress,
pg: readyProxyGroup,
existingIngs: []networkingv1.Ingress{{
ObjectMeta: metav1.ObjectMeta{
Name: "existing-ingress",
Namespace: "default",
Annotations: map[string]string{
AnnotationProxyGroup: "test-pg",
},
},
Spec: networkingv1.IngressSpec{
IngressClassName: ptr.To("tailscale"),
TLS: []networkingv1.IngressTLS{
{Hosts: []string{"test"}},
},
},
}},
wantErr: `found duplicate Ingress "existing-ingress" for hostname "test" - multiple Ingresses for the same hostname in the same cluster are not allowed`,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
r := &IngressPGReconciler{} fc := fake.NewClientBuilder().
err := r.validateIngress(tt.ing, tt.pg) WithScheme(tsapi.GlobalScheme).
WithObjects(tt.ing).
WithLists(&networkingv1.IngressList{Items: tt.existingIngs}).
Build()
r := &HAIngressReconciler{Client: fc}
err := r.validateIngress(context.Background(), tt.ing, tt.pg)
if (err == nil && tt.wantErr != "") || (err != nil && err.Error() != tt.wantErr) { if (err == nil && tt.wantErr != "") || (err != nil && err.Error() != tt.wantErr) {
t.Errorf("validateIngress() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("validateIngress() error = %v, wantErr %v", err, tt.wantErr)
} }
@ -493,8 +530,7 @@ func verifyTailscaledConfig(t *testing.T, fc client.Client, expectedServices []s
}) })
} }
func setupIngressTest(t *testing.T) (*IngressPGReconciler, client.Client, *fakeTSClient) { func setupIngressTest(t *testing.T) (*HAIngressReconciler, client.Client, *fakeTSClient) {
t.Helper()
tsIngressClass := &networkingv1.IngressClass{ tsIngressClass := &networkingv1.IngressClass{
ObjectMeta: metav1.ObjectMeta{Name: "tailscale"}, ObjectMeta: metav1.ObjectMeta{Name: "tailscale"},
@ -552,9 +588,9 @@ func setupIngressTest(t *testing.T) (*IngressPGReconciler, client.Client, *fakeT
if err := fc.Status().Update(context.Background(), pg); err != nil { if err := fc.Status().Update(context.Background(), pg); err != nil {
t.Fatal(err) t.Fatal(err)
} }
fakeTsnetServer := &fakeTSNetServer{certDomains: []string{"foo.com"}}
ft := &fakeTSClient{} ft := &fakeTSClient{}
fakeTsnetServer := &fakeTSNetServer{certDomains: []string{"foo.com"}}
zl, err := zap.NewDevelopment() zl, err := zap.NewDevelopment()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -568,12 +604,12 @@ func setupIngressTest(t *testing.T) (*IngressPGReconciler, client.Client, *fakeT
}, },
} }
ingPGR := &IngressPGReconciler{ ingPGR := &HAIngressReconciler{
Client: fc, Client: fc,
tsClient: ft, tsClient: ft,
tsnetServer: fakeTsnetServer,
defaultTags: []string{"tag:k8s"}, defaultTags: []string{"tag:k8s"},
tsNamespace: "operator-ns", tsNamespace: "operator-ns",
tsnetServer: fakeTsnetServer,
logger: zl.Sugar(), logger: zl.Sugar(),
recorder: record.NewFakeRecorder(10), recorder: record.NewFakeRecorder(10),
lc: lc, lc: lc,
@ -581,3 +617,87 @@ func setupIngressTest(t *testing.T) (*IngressPGReconciler, client.Client, *fakeT
return ingPGR, fc, ft return ingPGR, fc, ft
} }
func TestIngressPGReconciler_MultiCluster(t *testing.T) {
ingPGR, fc, ft := setupIngressTest(t)
ingPGR.operatorID = "operator-1"
// Create initial Ingress
ing := &networkingv1.Ingress{
TypeMeta: metav1.TypeMeta{Kind: "Ingress", APIVersion: "networking.k8s.io/v1"},
ObjectMeta: metav1.ObjectMeta{
Name: "test-ingress",
Namespace: "default",
UID: types.UID("1234-UID"),
Annotations: map[string]string{
"tailscale.com/proxy-group": "test-pg",
},
},
Spec: networkingv1.IngressSpec{
IngressClassName: ptr.To("tailscale"),
TLS: []networkingv1.IngressTLS{
{Hosts: []string{"my-svc"}},
},
},
}
mustCreate(t, fc, ing)
// Simulate existing VIPService from another cluster
existingVIPSvc := &tailscale.VIPService{
Name: "svc:my-svc",
Comment: `{"ownerrefs":[{"operatorID":"operator-2"}]}`,
}
ft.vipServices = map[tailcfg.ServiceName]*tailscale.VIPService{
"svc:my-svc": existingVIPSvc,
}
// Verify reconciliation adds our operator reference
expectReconciled(t, ingPGR, "default", "test-ingress")
vipSvc, err := ft.GetVIPService(context.Background(), "svc:my-svc")
if err != nil {
t.Fatalf("getting VIPService: %v", err)
}
if vipSvc == nil {
t.Fatal("VIPService not found")
}
c := &comment{}
if err := json.Unmarshal([]byte(vipSvc.Comment), c); err != nil {
t.Fatalf("parsing comment: %v", err)
}
wantOwnerRefs := []OwnerRef{
{OperatorID: "operator-2"},
{OperatorID: "operator-1"},
}
if !reflect.DeepEqual(c.OwnerRefs, wantOwnerRefs) {
t.Errorf("incorrect owner refs\ngot: %+v\nwant: %+v", c.OwnerRefs, wantOwnerRefs)
}
// Delete the Ingress and verify VIPService still exists with one owner ref
if err := fc.Delete(context.Background(), ing); err != nil {
t.Fatalf("deleting Ingress: %v", err)
}
expectRequeue(t, ingPGR, "default", "test-ingress")
vipSvc, err = ft.GetVIPService(context.Background(), "svc:my-svc")
if err != nil {
t.Fatalf("getting VIPService after deletion: %v", err)
}
if vipSvc == nil {
t.Fatal("VIPService was incorrectly deleted")
}
c = &comment{}
if err := json.Unmarshal([]byte(vipSvc.Comment), c); err != nil {
t.Fatalf("parsing comment after deletion: %v", err)
}
wantOwnerRefs = []OwnerRef{
{OperatorID: "operator-2"},
}
if !reflect.DeepEqual(c.OwnerRefs, wantOwnerRefs) {
t.Errorf("incorrect owner refs after deletion\ngot: %+v\nwant: %+v", c.OwnerRefs, wantOwnerRefs)
}
}

View File

@ -73,6 +73,7 @@ func (a *IngressReconciler) Reconcile(ctx context.Context, req reconcile.Request
return reconcile.Result{}, fmt.Errorf("failed to get ing: %w", err) return reconcile.Result{}, fmt.Errorf("failed to get ing: %w", err)
} }
if !ing.DeletionTimestamp.IsZero() || !a.shouldExpose(ing) { if !ing.DeletionTimestamp.IsZero() || !a.shouldExpose(ing) {
// TODO(irbekrm): this message is confusing if the Ingress is an HA Ingress
logger.Debugf("ingress is being deleted or should not be exposed, cleaning up") logger.Debugf("ingress is being deleted or should not be exposed, cleaning up")
return reconcile.Result{}, a.maybeCleanup(ctx, logger, ing) return reconcile.Result{}, a.maybeCleanup(ctx, logger, ing)
} }

View File

@ -9,6 +9,7 @@ package main
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"os" "os"
"regexp" "regexp"
@ -39,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals" "sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
"tailscale.com/client/local"
"tailscale.com/client/tailscale" "tailscale.com/client/tailscale"
"tailscale.com/hostinfo" "tailscale.com/hostinfo"
"tailscale.com/ipn" "tailscale.com/ipn"
@ -335,6 +337,10 @@ func runReconcilers(opts reconcilerOpts) {
if err != nil { if err != nil {
startlog.Fatalf("could not get local client: %v", err) startlog.Fatalf("could not get local client: %v", err)
} }
id, err := id(context.Background(), lc)
if err != nil {
startlog.Fatalf("error determining stable ID of the operator's Tailscale device: %v", err)
}
ingressProxyGroupFilter := handler.EnqueueRequestsFromMapFunc(ingressesFromIngressProxyGroup(mgr.GetClient(), opts.log)) ingressProxyGroupFilter := handler.EnqueueRequestsFromMapFunc(ingressesFromIngressProxyGroup(mgr.GetClient(), opts.log))
err = builder. err = builder.
ControllerManagedBy(mgr). ControllerManagedBy(mgr).
@ -342,7 +348,7 @@ func runReconcilers(opts reconcilerOpts) {
Named("ingress-pg-reconciler"). Named("ingress-pg-reconciler").
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(serviceHandlerForIngressPG(mgr.GetClient(), startlog))). Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(serviceHandlerForIngressPG(mgr.GetClient(), startlog))).
Watches(&tsapi.ProxyGroup{}, ingressProxyGroupFilter). Watches(&tsapi.ProxyGroup{}, ingressProxyGroupFilter).
Complete(&IngressPGReconciler{ Complete(&HAIngressReconciler{
recorder: eventRecorder, recorder: eventRecorder,
tsClient: opts.tsClient, tsClient: opts.tsClient,
tsnetServer: opts.tsServer, tsnetServer: opts.tsServer,
@ -350,6 +356,7 @@ func runReconcilers(opts reconcilerOpts) {
Client: mgr.GetClient(), Client: mgr.GetClient(),
logger: opts.log.Named("ingress-pg-reconciler"), logger: opts.log.Named("ingress-pg-reconciler"),
lc: lc, lc: lc,
operatorID: id,
tsNamespace: opts.tailscaleNamespace, tsNamespace: opts.tailscaleNamespace,
}) })
if err != nil { if err != nil {
@ -1262,3 +1269,14 @@ func hasProxyGroupAnnotation(obj client.Object) bool {
ing := obj.(*networkingv1.Ingress) ing := obj.(*networkingv1.Ingress)
return ing.Annotations[AnnotationProxyGroup] != "" return ing.Annotations[AnnotationProxyGroup] != ""
} }
func id(ctx context.Context, lc *local.Client) (string, error) {
st, err := lc.StatusWithoutPeers(ctx)
if err != nil {
return "", fmt.Errorf("error getting tailscale status: %w", err)
}
if st.Self == nil {
return "", fmt.Errorf("unexpected: device's status does not contain node's metadata")
}
return string(st.Self.ID), nil
}