k8s-operator: adding endpointslice based advertising/unadvertising

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
chaosinthecrd 2025-05-07 19:24:09 +01:00
parent 2767a7d4cb
commit 9c49c2e9d3
No known key found for this signature in database
GPG Key ID: 87942E75F71EF65D
2 changed files with 80 additions and 7 deletions

View File

@ -240,6 +240,7 @@ func runReconcilers(opts reconcilerOpts) {
nsFilter := cache.ByObject{
Field: client.InNamespace(opts.tailscaleNamespace).AsSelector(),
}
// We watch the ServiceMonitor CRD to ensure that reconcilers are re-triggered if user's workflows result in the
// ServiceMonitor CRD applied after some of our resources that define ServiceMonitor creation. This selector
// ensures that we only watch the ServiceMonitor CRD and that we don't cache full contents of it.
@ -247,6 +248,7 @@ func runReconcilers(opts reconcilerOpts) {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": serviceMonitorCRD}),
Transform: crdTransformer(startlog),
}
mgrOpts := manager.Options{
// TODO (irbekrm): stricter filtering what we watch/cache/call
// reconcilers on. c/r by default starts a watch on any
@ -259,7 +261,6 @@ func runReconcilers(opts reconcilerOpts) {
&corev1.ConfigMap{}: nsFilter,
&appsv1.StatefulSet{}: nsFilter,
&appsv1.Deployment{}: nsFilter,
&discoveryv1.EndpointSlice{}: nsFilter,
&rbacv1.Role{}: nsFilter,
&rbacv1.RoleBinding{}: nsFilter,
&apiextensionsv1.CustomResourceDefinition{}: serviceMonitorSelector,
@ -368,12 +369,14 @@ func runReconcilers(opts reconcilerOpts) {
}
// svcProxyGroupFilter := handler.EnqueueRequestsFromMapFunc(ingressesFromIngressProxyGroup(mgr.GetClient(), opts.log))
ingressSvcFromEpsFilter := handler.EnqueueRequestsFromMapFunc(ingressSvcFromEps(mgr.GetClient()))
err = builder.
ControllerManagedBy(mgr).
For(&corev1.Service{}).
Named("service-pg-reconciler").
Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(HAServicesFromSecret(mgr.GetClient(), startlog))).
Watches(&tsapi.ProxyGroup{}, ingressProxyGroupFilter).
Watches(&discoveryv1.EndpointSlice{}, ingressSvcFromEpsFilter).
Complete(&HAServiceReconciler{
recorder: eventRecorder,
tsClient: opts.tsClient,
@ -1019,6 +1022,35 @@ func egressEpsFromPGStateSecrets(cl client.Client, ns string) handler.MapFunc {
}
}
func ingressSvcFromEps(cl client.Client) handler.MapFunc {
return func(ctx context.Context, o client.Object) []reconcile.Request {
svcName := o.GetLabels()[discoveryv1.LabelServiceName]
if svcName == "" {
return nil
}
svc := &corev1.Service{}
ns := o.GetNamespace()
if err := cl.Get(ctx, types.NamespacedName{Name: svcName, Namespace: ns}, svc); err != nil {
return nil
}
pgName := svc.Annotations[AnnotationProxyGroup]
if pgName == "" {
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: ns,
Name: svcName,
},
},
}
}
}
// egressSvcFromEps is an event handler for EndpointSlices. If an EndpointSlice is for an egress ExternalName Service
// meant to be exposed on a ProxyGroup, returns a reconcile request for the Service.
func egressSvcFromEps(_ context.Context, o client.Object) []reconcile.Request {

View File

@ -19,6 +19,7 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -63,7 +64,6 @@ const (
// Services that should be exposed on an ingress ProxyGroup (in HA mode).
type HAServiceReconciler struct {
client.Client
isDefaultLoadBalancer bool
recorder record.EventRecorder
logger *zap.SugaredLogger
@ -357,7 +357,7 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin
// IPs to the ProxyGroup Pods' AllowedIPs in the next netmap update if approved.
// NOTE (ChaosInTheCRD): afaik, we conclusively know here that we *want* to advertise (we don't have to wait for certs or anything)
// We do however need to ensure that the service should be unadvertised during cleanup
if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pg.Name, serviceName, &cfg, true, logger); err != nil {
if err = r.maybeUpdateAdvertiseServicesConfig(ctx, svc, pg.Name, serviceName, &cfg, true, logger); err != nil {
return false, fmt.Errorf("failed to update tailscaled config: %w", err)
}
@ -444,7 +444,7 @@ func (r *HAServiceReconciler) maybeCleanup(ctx context.Context, hostname string,
// 2. Unadvertise the VIPService.
pgName := svc.Annotations[AnnotationProxyGroup]
if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pgName, serviceName, nil, false, logger); err != nil {
if err = r.maybeUpdateAdvertiseServicesConfig(ctx, svc, pgName, serviceName, nil, false, logger); err != nil {
return false, fmt.Errorf("failed to update tailscaled config services: %w", err)
}
@ -495,7 +495,7 @@ func (r *HAServiceReconciler) maybeCleanupProxyGroup(ctx context.Context, proxyG
logger.Infof("VIPService %q is not owned by any Service, cleaning up", vipSvcName)
// Make sure the VIPService is not advertised in tailscaled or serve config.
if err = r.maybeUpdateAdvertiseServicesConfig(ctx, proxyGroupName, tailcfg.ServiceName(vipSvcName), &cfg, false, logger); err != nil {
if err = r.maybeUpdateAdvertiseServicesConfig(ctx, nil, proxyGroupName, tailcfg.ServiceName(vipSvcName), &cfg, false, logger); err != nil {
return false, fmt.Errorf("failed to update tailscaled config services: %w", err)
}
@ -684,7 +684,7 @@ func isCurrentStatus(gotCfgs ingressservices.Status, pod *corev1.Pod, logger *za
return true, nil
}
func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, pgName string, serviceName tailcfg.ServiceName, cfg *ingressservices.Config, shouldBeAdvertised bool, logger *zap.SugaredLogger) (err error) {
func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, svc *corev1.Service, pgName string, serviceName tailcfg.ServiceName, cfg *ingressservices.Config, shouldBeAdvertised bool, logger *zap.SugaredLogger) (err error) {
logger.Debugf("checking advertisement for service '%s'", serviceName)
// Get all config Secrets for this ProxyGroup.
// Get all Pods
@ -693,6 +693,13 @@ func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
return fmt.Errorf("failed to list config Secrets: %w", err)
}
if svc != nil && shouldBeAdvertised {
shouldBeAdvertised, err = a.checkEndpointsReady(ctx, svc, logger)
if err != nil {
return fmt.Errorf("failed to check readiness of Service '%s' endpoints: %w", svc.Name, err)
}
}
for _, secret := range secrets.Items {
var updated bool
for fileName, confB := range secret.Data {
@ -713,7 +720,6 @@ func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
case isAdvertised && !shouldBeAdvertised:
logger.Debugf("deleting advertisement for service %q", serviceName)
conf.AdvertiseServices = slices.Delete(conf.AdvertiseServices, idx, idx+1)
case shouldBeAdvertised:
replicaName, ok := strings.CutSuffix(secret.Name, "-config")
if !ok {
@ -850,3 +856,38 @@ func ingressSvcsConfigs(ctx context.Context, cl client.Client, proxyGroupName, t
}
return cm, cfgs, nil
}
func (r *HAServiceReconciler) getEndpointSlicesForService(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) ([]discoveryv1.EndpointSlice, error) {
logger.Debugf("looking for endpoint slices for svc with name '%s' in namespace '%s' matching label '%s=%s'", svc.Name, svc.Namespace, discoveryv1.LabelServiceName, svc.Name)
labels := map[string]string{discoveryv1.LabelServiceName: svc.Name} // https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership
eps := new(discoveryv1.EndpointSliceList)
if err := r.List(ctx, eps, client.InNamespace(svc.Namespace), client.MatchingLabels(labels)); err != nil {
return nil, fmt.Errorf("error listing EndpointSlices: %w", err)
}
if len(eps.Items) == 0 {
logger.Debugf("Service '%s' EndpointSlice does not yet exist. We will reconcile again once it's created", svc.Name)
return nil, nil
}
logger.Debugf("found %d endpoint slices", len(eps.DeepCopy().Items))
return eps.Items, nil
}
func (r *HAServiceReconciler) checkEndpointsReady(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) (bool, error) {
epss, err := r.getEndpointSlicesForService(ctx, svc, logger)
if err != nil {
return false, fmt.Errorf("failed to list endpointslices for svc %q: %w", svc.Name, err)
}
for _, eps := range epss {
for _, ep := range eps.Endpoints {
if *ep.Conditions.Ready {
return true, nil
}
}
}
logger.Debugf("could not find any ready Endpoints in EndpointSlice")
return false, nil
}