cmd/k8s-operator,kube/kubetypes,k8s-operator/apis: reconcile L3 HA Services (#15961)

This reconciler allows users to make applications highly available at L3 by
leveraging Tailscale Virtual Services. Many Kubernetes Service's
(irrespective of the cluster they reside in) can be mapped to a
Tailscale Virtual Service, allowing access to these Services at L3.

Updates #15895

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
Tom Meadows
2025-05-19 12:58:32 +01:00
committed by GitHub
parent d89aa29081
commit df8d51023e
13 changed files with 1431 additions and 21 deletions

View File

@@ -241,6 +241,7 @@ func runReconcilers(opts reconcilerOpts) {
nsFilter := cache.ByObject{
Field: client.InNamespace(opts.tailscaleNamespace).AsSelector(),
}
// We watch the ServiceMonitor CRD to ensure that reconcilers are re-triggered if user's workflows result in the
// ServiceMonitor CRD applied after some of our resources that define ServiceMonitor creation. This selector
// ensures that we only watch the ServiceMonitor CRD and that we don't cache full contents of it.
@@ -248,10 +249,13 @@ func runReconcilers(opts reconcilerOpts) {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": serviceMonitorCRD}),
Transform: crdTransformer(startlog),
}
// TODO (irbekrm): stricter filtering what we watch/cache/call
// reconcilers on. c/r by default starts a watch on any
// resources that we GET via the controller manager's client.
mgrOpts := manager.Options{
// TODO (irbekrm): stricter filtering what we watch/cache/call
// reconcilers on. c/r by default starts a watch on any
// resources that we GET via the controller manager's client.
// The cache will apply the specified filters only to the object types listed below via ByObject.
// Other object types (e.g., EndpointSlices) can still be fetched or watched using the cached client, but they will not have any filtering applied.
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&corev1.Secret{}: nsFilter,
@@ -260,7 +264,6 @@ func runReconcilers(opts reconcilerOpts) {
&corev1.ConfigMap{}: nsFilter,
&appsv1.StatefulSet{}: nsFilter,
&appsv1.Deployment{}: nsFilter,
&discoveryv1.EndpointSlice{}: nsFilter,
&rbacv1.Role{}: nsFilter,
&rbacv1.RoleBinding{}: nsFilter,
&apiextensionsv1.CustomResourceDefinition{}: serviceMonitorSelector,
@@ -368,6 +371,33 @@ func runReconcilers(opts reconcilerOpts) {
startlog.Fatalf("failed setting up indexer for HA Ingresses: %v", err)
}
ingressSvcFromEpsFilter := handler.EnqueueRequestsFromMapFunc(ingressSvcFromEps(mgr.GetClient(), opts.log.Named("service-pg-reconciler")))
err = builder.
ControllerManagedBy(mgr).
For(&corev1.Service{}).
Named("service-pg-reconciler").
Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(HAServicesFromSecret(mgr.GetClient(), startlog))).
Watches(&tsapi.ProxyGroup{}, ingressProxyGroupFilter).
Watches(&discoveryv1.EndpointSlice{}, ingressSvcFromEpsFilter).
Complete(&HAServiceReconciler{
recorder: eventRecorder,
tsClient: opts.tsClient,
tsnetServer: opts.tsServer,
defaultTags: strings.Split(opts.proxyTags, ","),
Client: mgr.GetClient(),
logger: opts.log.Named("service-pg-reconciler"),
lc: lc,
clock: tstime.DefaultClock{},
operatorID: id,
tsNamespace: opts.tailscaleNamespace,
})
if err != nil {
startlog.Fatalf("could not create service-pg-reconciler: %v", err)
}
if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexIngressProxyGroup, indexPGIngresses); err != nil {
startlog.Fatalf("failed setting up indexer for HA Services: %v", err)
}
connectorFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("connector"))
// If a ProxyClassChanges, enqueue all Connectors that have
// .spec.proxyClass set to the name of this ProxyClass.
@@ -994,6 +1024,36 @@ func egressEpsFromPGStateSecrets(cl client.Client, ns string) handler.MapFunc {
}
}
func ingressSvcFromEps(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
return func(ctx context.Context, o client.Object) []reconcile.Request {
svcName := o.GetLabels()[discoveryv1.LabelServiceName]
if svcName == "" {
return nil
}
svc := &corev1.Service{}
ns := o.GetNamespace()
if err := cl.Get(ctx, types.NamespacedName{Name: svcName, Namespace: ns}, svc); err != nil {
logger.Errorf("failed to get service: %v", err)
return nil
}
pgName := svc.Annotations[AnnotationProxyGroup]
if pgName == "" {
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: ns,
Name: svcName,
},
},
}
}
}
// egressSvcFromEps is an event handler for EndpointSlices. If an EndpointSlice is for an egress ExternalName Service
// meant to be exposed on a ProxyGroup, returns a reconcile request for the Service.
func egressSvcFromEps(_ context.Context, o client.Object) []reconcile.Request {
@@ -1099,6 +1159,40 @@ func HAIngressesFromSecret(cl client.Client, logger *zap.SugaredLogger) handler.
}
}
// HAServiceFromSecret returns a handler that returns reconcile requests for
// all HA Services that should be reconciled in response to a Secret event.
func HAServicesFromSecret(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
return func(ctx context.Context, o client.Object) []reconcile.Request {
secret, ok := o.(*corev1.Secret)
if !ok {
logger.Infof("[unexpected] Secret handler triggered for an object that is not a Secret")
return nil
}
if !isPGStateSecret(secret) {
return nil
}
pgName, ok := secret.ObjectMeta.Labels[LabelParentName]
if !ok {
return nil
}
svcList := &corev1.ServiceList{}
if err := cl.List(ctx, svcList, client.MatchingFields{indexIngressProxyGroup: pgName}); err != nil {
logger.Infof("error listing Services, skipping a reconcile for event on Secret %s: %v", secret.Name, err)
return nil
}
reqs := make([]reconcile.Request, 0)
for _, svc := range svcList.Items {
reqs = append(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
},
})
}
return reqs
}
}
// egressSvcsFromEgressProxyGroup is an event handler for egress ProxyGroups. It returns reconcile requests for all
// user-created ExternalName Services that should be exposed on this ProxyGroup.
func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
@@ -1270,7 +1364,7 @@ func crdTransformer(log *zap.SugaredLogger) toolscache.TransformFunc {
}
}
// indexEgressServices adds a local index to a cached Tailscale egress Services meant to be exposed on a ProxyGroup. The
// indexEgressServices adds a local index to cached Tailscale egress Services meant to be exposed on a ProxyGroup. The
// index is used a list filter.
func indexEgressServices(o client.Object) []string {
if !isEgressSvcForProxyGroup(o) {
@@ -1279,8 +1373,8 @@ func indexEgressServices(o client.Object) []string {
return []string{o.GetAnnotations()[AnnotationProxyGroup]}
}
// indexPGIngresses adds a local index to a cached Tailscale Ingresses meant to be exposed on a ProxyGroup. The index is
// used a list filter.
// indexPGIngresses is used to select ProxyGroup-backed Services which are
// locally indexed in the cache for efficient listing without requiring labels.
func indexPGIngresses(o client.Object) []string {
if !hasProxyGroupAnnotation(o) {
return nil
@@ -1325,8 +1419,7 @@ func serviceHandlerForIngressPG(cl client.Client, logger *zap.SugaredLogger) han
}
func hasProxyGroupAnnotation(obj client.Object) bool {
ing := obj.(*networkingv1.Ingress)
return ing.Annotations[AnnotationProxyGroup] != ""
return obj.GetAnnotations()[AnnotationProxyGroup] != ""
}
func id(ctx context.Context, lc *local.Client) (string, error) {