Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2025-05-01 20:59:51 +01:00
parent 01ee64d939
commit 9b846a4b4d
3 changed files with 39 additions and 43 deletions

View File

@ -31,7 +31,7 @@ type ingressProxy struct {
kc kubeclient.Client // never nil
stateSecret string // name of the kube state Secret
podIP string // never empty string
podIP string // never empty
}
func (ep *ingressProxy) run(ctx context.Context, opts ingressProxyOpts) error {
@ -39,8 +39,6 @@ func (ep *ingressProxy) run(ctx context.Context, opts ingressProxyOpts) error {
ep.configure(opts)
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
// TODO (irbekrm): take a look if this can be pulled into a single func
// shared with serve config loader.
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err)
ticker := time.NewTicker(5 * time.Second)
@ -148,7 +146,6 @@ func (ep *ingressProxy) getRulesToAdd(cfgs *ingressservices.Configs, status *ing
}
func (ep *ingressProxy) syncIngressConfigs(cfgs *ingressservices.Configs, status *ingressservices.Status) error {
// Add new services, update rules for any that have changed.
rulesToAdd := ep.getRulesToAdd(cfgs, status)
rulesToDelete := ep.getRulesToDelete(cfgs, status)
@ -158,11 +155,9 @@ func (ep *ingressProxy) syncIngressConfigs(cfgs *ingressservices.Configs, status
if err := ensureIngressRulesAdded(rulesToAdd, ep.nfr); err != nil {
return fmt.Errorf("error adding rules: %w", err)
}
// Maybe SNAT?
return nil
}
// getConfigs gets the mounted egress service configuration.
func (ep *ingressProxy) getConfigs() (*ingressservices.Configs, error) {
j, err := os.ReadFile(ep.cfgPath)
if os.IsNotExist(err) {

View File

@ -372,8 +372,6 @@ func runReconcilers(opts reconcilerOpts) {
ControllerManagedBy(mgr).
For(&corev1.Service{}).
Named("service-pg-reconciler").
// TODO: this watch does not seem to work- does not if ProxyGroup created later
// maybe need to watch the ProxyGroup
Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(HAServicesFromSecret(mgr.GetClient(), startlog))).
Watches(&tsapi.ProxyGroup{}, ingressProxyGroupFilter).
Complete(&HAServiceReconciler{
@ -390,9 +388,9 @@ func runReconcilers(opts reconcilerOpts) {
if err != nil {
startlog.Fatalf("could not create service-pg-reconciler: %v", err)
}
// if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(networkingv1.Ingress), indexIngressProxyGroup, indexPGIngresses); err != nil {
// startlog.Fatalf("failed setting up indexer for HA Ingresses: %v", err)
// }
if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexIngressProxyGroup, indexPGIngresses); err != nil {
startlog.Fatalf("failed setting up indexer for HA Services: %v", err)
}
connectorFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("connector"))
// If a ProxyClassChanges, enqueue all Connectors that have
@ -1137,27 +1135,25 @@ func HAServicesFromSecret(cl client.Client, logger *zap.SugaredLogger) handler.M
if !isPGStateSecret(secret) {
return nil
}
_, ok = secret.ObjectMeta.Labels[LabelParentName]
pgName, ok := secret.ObjectMeta.Labels[LabelParentName]
if !ok {
return nil
}
// svcList := &corev1.ServiceList{}
// if err := cl.List(ctx, ingList, client.MatchingFields{indexIngressProxyGroup: pgName}); err != nil {
// logger.Infof("error listing Ingresses, skipping a reconcile for event on Secret %s: %v", secret.Name, err)
// return nil
// }
// reqs := make([]reconcile.Request, 0)
// for _, ing := range ingList.Items {
// reqs = append(reqs, reconcile.Request{
// NamespacedName: types.NamespacedName{
// Namespace: ing.Namespace,
// Name: ing.Name,
// },
// })
// }
// return reqs
return nil
svcList := &corev1.ServiceList{}
if err := cl.List(ctx, svcList, client.MatchingFields{indexIngressProxyGroup: pgName}); err != nil {
logger.Infof("error listing Services, skipping a reconcile for event on Secret %s: %v", secret.Name, err)
return nil
}
reqs := make([]reconcile.Request, 0)
for _, ing := range svcList.Items {
reqs = append(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: ing.Namespace,
Name: ing.Name,
},
})
}
return reqs
}
}
@ -1341,8 +1337,9 @@ func indexEgressServices(o client.Object) []string {
return []string{o.GetAnnotations()[AnnotationProxyGroup]}
}
// indexPGIngresses adds a local index to a cached Tailscale Ingresses meant to be exposed on a ProxyGroup. The index is
// used a list filter.
// indexPGIngresses adds a local index to a cached Tailscale Ingresses and
// Services meant to be exposed on a ProxyGroup. The index is used a list
// filter.
func indexPGIngresses(o client.Object) []string {
if !hasProxyGroupAnnotation(o) {
return nil
@ -1387,8 +1384,7 @@ func serviceHandlerForIngressPG(cl client.Client, logger *zap.SugaredLogger) han
}
func hasProxyGroupAnnotation(obj client.Object) bool {
ing := obj.(*networkingv1.Ingress)
return ing.Annotations[AnnotationProxyGroup] != ""
return obj.GetAnnotations()[AnnotationProxyGroup] != ""
}
func id(ctx context.Context, lc *local.Client) (string, error) {

View File

@ -438,7 +438,7 @@ func (r *HAServiceReconciler) maybeCleanup(ctx context.Context, hostname string,
if err != nil {
return false, fmt.Errorf("error marshaling ingress config: %w", err)
}
mak.Set(&cm.BinaryData, serveConfigKey, cfgBytes)
mak.Set(&cm.BinaryData, ingressservices.IngressConfigKey, cfgBytes)
return true, r.Update(ctx, cm)
}
@ -631,7 +631,6 @@ func (a *HAServiceReconciler) backendRoutesSetup(ctx context.Context, serviceNam
logger.Debugf("Pod %q has config %q, but wants %q", pod.Name, gotCfgs.Configs.GetConfig(serviceName), wantsCfg)
return false, nil
}
return true, nil
}
@ -656,24 +655,32 @@ func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
isAdvertised := idx >= 0
switch {
case !isAdvertised && !shouldBeAdvertised:
logger.Debugf("service '%s' shouldn't be advertised", serviceName)
logger.Debugf("service %q shouldn't be advertised", serviceName)
continue
case isAdvertised && shouldBeAdvertised:
logger.Debugf("service %q is already advertised", serviceName)
continue
case isAdvertised && !shouldBeAdvertised:
logger.Debugf("deleting advertisement for service '%s'", serviceName)
logger.Debugf("deleting advertisement for service %q", serviceName)
conf.AdvertiseServices = slices.Delete(conf.AdvertiseServices, idx, idx+1)
case shouldBeAdvertised:
ready, err := a.backendRoutesSetup(ctx, serviceName.String(), secret.Name, pgName, cfg, logger)
replicaName, ok := strings.CutSuffix(secret.Name, "-config")
if !ok {
logger.Infof("[unexpected] unable to determine replica name from config secret name %q, unable to determine if backend routing has been configured", secret.Name)
return nil
}
ready, err := a.backendRoutesSetup(ctx, serviceName.String(), replicaName, pgName, cfg, logger)
if err != nil {
return fmt.Errorf("error checking backend routes: %w", err)
}
if !ready {
logger.Debugf("service '%s' is not ready to be advertised", serviceName)
logger.Debugf("service %q is not ready to be advertised", serviceName)
continue
}
logger.Debugf("advertising service '%s' in secret with name '%s'", serviceName, secret.DeepCopy().Name)
conf.AdvertiseServices = append(conf.AdvertiseServices, serviceName.String())
}
confB, err := json.Marshal(conf)
if err != nil {
return fmt.Errorf("error marshalling ProxyGroup config: %w", err)
@ -681,15 +688,13 @@ func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
mak.Set(&secret.Data, fileName, confB)
updated = true
}
if updated {
logger.Debugf("updating secret with name '%s'", &secret.Name)
logger.Debugf("updating Secret %q", secret.Name)
if err := a.Update(ctx, &secret); err != nil {
return fmt.Errorf("error updating ProxyGroup config Secret: %w", err)
}
}
}
return nil
}