From 31c2c61e77a3d2b3978de3a0ca306f979fdca900 Mon Sep 17 00:00:00 2001 From: Irbe Krumina Date: Fri, 2 May 2025 12:10:40 +0100 Subject: [PATCH] WIP Signed-off-by: Irbe Krumina --- cmd/containerboot/ingressservices.go | 95 ++++++++++++++++--------- cmd/containerboot/main.go | 3 +- cmd/k8s-operator/svc-for-pg.go | 39 +++++++++- kube/ingressservices/ingressservices.go | 4 +- 4 files changed, 103 insertions(+), 38 deletions(-) diff --git a/cmd/containerboot/ingressservices.go b/cmd/containerboot/ingressservices.go index 1e8ecd59e..dab731654 100644 --- a/cmd/containerboot/ingressservices.go +++ b/cmd/containerboot/ingressservices.go @@ -23,15 +23,23 @@ import ( "tailscale.com/util/mak" ) +// ingressProxy corresponds to a Kubernetes Operator's network layer ingress +// proxy. It configures firewall rules (iptables or nftables) to proxy tailnet +// traffic to Kubernetes Services. Currently this is only used for network +// layer proxies in HA mode. type ingressProxy struct { - cfgPath string // path to a directory with ingress services config files + cfgPath string // path to ingress configfile. - nfr linuxfw.NetfilterRunner // never nil + // nfr is the netfilter runner used to configure firewall rules. + // Never nil. + nfr linuxfw.NetfilterRunner kc kubeclient.Client // never nil - stateSecret string // name of the kube state Secret + stateSecret string // Secret that holds Tailscale state - podIP string // never empty + // Pod's IP addresses are used as an identifier of this partcular Pod. + podIPv4 string // empty if Pod does not have IPv4 address + podIPv6 string // empty if Pod does not have IPv6 address } func (ep *ingressProxy) run(ctx context.Context, opts ingressProxyOpts) error { @@ -73,41 +81,45 @@ func (ep *ingressProxy) run(ctx context.Context, opts ingressProxyOpts) error { type ingressProxyOpts struct { cfgPath string - nfr linuxfw.NetfilterRunner - kc kubeclient.Client + nfr linuxfw.NetfilterRunner // never nil + kc kubeclient.Client // never nil stateSecret string - podIP string // TODO: change to IP hash maybe + podIPv4 string + podIPv6 string } -// applyOpts configures egress proxy using the provided options. func (ep *ingressProxy) configure(opts ingressProxyOpts) { ep.cfgPath = opts.cfgPath ep.nfr = opts.nfr ep.kc = opts.kc ep.stateSecret = opts.stateSecret - ep.podIP = opts.podIP + ep.podIPv4 = opts.podIPv4 + ep.podIPv6 = opts.podIPv6 } +// sync reconciles proxy's firewall rules (iptables or nftables) on ingress config changes: +// - ensures that new firewall rules are added +// - ensures that old firewall rules are deleted +// - updates ingress proxy's status in the state Secret func (ep *ingressProxy) sync(ctx context.Context) error { cfgs, err := ep.getConfigs() if err != nil { - return fmt.Errorf("error retrieving ingress service configs: %w", err) + return fmt.Errorf("ingress proxy: error retrieving configs: %w", err) } status, err := ep.getStatus(ctx) if err != nil { - return fmt.Errorf("error retrieving current ingress proxy status: %w", err) + return fmt.Errorf("ingress proxy: error retrieving current status: %w", err) } - // get status if err := ep.syncIngressConfigs(cfgs, status); err != nil { - return fmt.Errorf("error syncing ingress service configs: %w", err) + return fmt.Errorf("ingress proxy: error syncing configs: %w", err) } var existingConfigs *ingressservices.Configs if status != nil { existingConfigs = &status.Configs } - if !ingresServicesStatusIsEqual(cfgs, existingConfigs) { + if !(ingresServicesStatusIsEqual(cfgs, existingConfigs) && ep.isCurrentStatus(status)) { if err := ep.setStatus(ctx, cfgs); err != nil { - return fmt.Errorf("error setting ingress proxy status: %w", err) + return fmt.Errorf("ingress proxy: error setting status: %w", err) } } return nil @@ -117,12 +129,16 @@ func (ep *ingressProxy) getRulesToDelete(cfgs *ingressservices.Configs, status * if status == nil { return nil } - for svcName, cfg := range status.Configs { - needed := cfgs.GetConfig(svcName) - if reflect.DeepEqual(needed, cfg) { + for vipSvc, gotCfg := range status.Configs { + if cfgs == nil { + mak.Set(&rulesToDelete, vipSvc, gotCfg) continue } - mak.Set(&rulesToDelete, svcName, cfg) + wantsCfg := cfgs.GetConfig(vipSvc) + if wantsCfg != nil && reflect.DeepEqual(*wantsCfg, gotCfg) { + continue + } + mak.Set(&rulesToDelete, vipSvc, gotCfg) } return rulesToDelete } @@ -131,29 +147,41 @@ func (ep *ingressProxy) getRulesToAdd(cfgs *ingressservices.Configs, status *ing if cfgs == nil { return nil } - for svcName, cfg := range *cfgs { - if status == nil { - mak.Set(&rulesToAdd, svcName, cfg) + for vipSvc, wantsCfg := range *cfgs { + if status == nil || !ep.isCurrentStatus(status) { + mak.Set(&rulesToAdd, vipSvc, wantsCfg) continue } - existing := status.Configs.GetConfig(svcName) - if reflect.DeepEqual(existing, cfg) { - continue + gotCfg := status.Configs.GetConfig(vipSvc) + if gotCfg == nil || !reflect.DeepEqual(wantsCfg, *gotCfg) { + mak.Set(&rulesToAdd, vipSvc, wantsCfg) } - mak.Set(&rulesToAdd, svcName, cfg) } return rulesToAdd } +// isCurrentStatus returns true if the status of an ingress proxy as read from +// the proxy's state Secret is the status of the current proxy Pod. We use +// Pod's IP address to determine that the status is for this Pod. +func (ep *ingressProxy) isCurrentStatus(status *ingressservices.Status) bool { + if status == nil { + return true + } + return status.PodIPv4 == ep.podIPv4 && status.PodIPv6 == ep.podIPv6 +} + func (ep *ingressProxy) syncIngressConfigs(cfgs *ingressservices.Configs, status *ingressservices.Status) error { + log.Printf("syncing ingress service configs with status %+#v", status) rulesToAdd := ep.getRulesToAdd(cfgs, status) rulesToDelete := ep.getRulesToDelete(cfgs, status) + log.Printf("ingress rules to add: %v", rulesToAdd) + log.Printf("ingress rules to delete: %v", rulesToDelete) if err := ensureIngressRulesDeleted(rulesToDelete, ep.nfr); err != nil { return fmt.Errorf("error deleting ingress rules: %w", err) } if err := ensureIngressRulesAdded(rulesToAdd, ep.nfr); err != nil { - return fmt.Errorf("error adding rules: %w", err) + return fmt.Errorf("error adding ingress rules: %w", err) } return nil } @@ -193,16 +221,17 @@ func (ep *ingressProxy) getStatus(ctx context.Context) (*ingressservices.Status, if err := json.Unmarshal([]byte(raw), status); err != nil { return nil, fmt.Errorf("error unmarshalling previous config: %w", err) } - if reflect.DeepEqual(status.PodIP, ep.podIP) { - return status, nil - } - return nil, nil + return status, nil } func (ep *ingressProxy) setStatus(ctx context.Context, newCfg *ingressservices.Configs) error { // Pod IP is used to determine if a stored status applies to THIS proxy Pod. - status := &ingressservices.Status{Configs: *newCfg} - status.PodIP = ep.podIP + status := &ingressservices.Status{} + if newCfg != nil { + status.Configs = *newCfg + } + status.PodIPv4 = ep.podIPv4 + status.PodIPv6 = ep.podIPv6 secret, err := ep.kc.GetSecret(ctx, ep.stateSecret) if err != nil { return fmt.Errorf("error retrieving state Secret: %w", err) diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index 72e1b8aab..954330897 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -703,7 +703,8 @@ runLoop: nfr: nfr, kc: kc, stateSecret: cfg.KubeSecret, - podIP: cfg.PodIPv4, + podIPv4: cfg.PodIPv4, + podIPv6: cfg.PodIPv6, } go func() { if err := ip.run(ctx, opts); err != nil { diff --git a/cmd/k8s-operator/svc-for-pg.go b/cmd/k8s-operator/svc-for-pg.go index 35133cac4..ec8cd5b92 100644 --- a/cmd/k8s-operator/svc-for-pg.go +++ b/cmd/k8s-operator/svc-for-pg.go @@ -599,9 +599,11 @@ func (r *HAServiceReconciler) cleanupVIPService(ctx context.Context, name tailcf } func (a *HAServiceReconciler) backendRoutesSetup(ctx context.Context, serviceName, replicaName, pgName string, wantsCfg *ingressservices.Config, logger *zap.SugaredLogger) (bool, error) { + logger.Debugf("checking backend routes for service '%s'", serviceName) pod := &corev1.Pod{} err := a.Get(ctx, client.ObjectKey{Namespace: a.tsNamespace, Name: replicaName}, pod) if apierrors.IsNotFound(err) { + logger.Debugf("Pod %q not found", replicaName) return false, nil } if err != nil { @@ -610,6 +612,7 @@ func (a *HAServiceReconciler) backendRoutesSetup(ctx context.Context, serviceNam secret := &corev1.Secret{} err = a.Get(ctx, client.ObjectKey{Namespace: a.tsNamespace, Name: replicaName}, secret) if apierrors.IsNotFound(err) { + logger.Debugf("Secret %q not found", replicaName) return false, nil } if err != nil { @@ -623,8 +626,12 @@ func (a *HAServiceReconciler) backendRoutesSetup(ctx context.Context, serviceNam if err := json.Unmarshal(gotCfgB, &gotCfgs); err != nil { return false, fmt.Errorf("error unmarshalling ingress config: %w", err) } - if gotCfgs.PodIP != pod.Status.PodIP { // TODO: consider multiple IPs - logger.Debugf("Pod %q has IP %q, but wants %q", pod.Name, gotCfgs.PodIP, pod.Status.PodIP) + statusUpToDate, err := isCurrentStatus(gotCfgs, pod, logger) + if err != nil { + return false, fmt.Errorf("error checking ingress config status: %w", err) + } + if !statusUpToDate { + logger.Debugf("Pod %q is not ready to advertise VIPService", pod.Name) return false, nil } if !reflect.DeepEqual(gotCfgs.Configs.GetConfig(serviceName), wantsCfg) { @@ -634,6 +641,34 @@ func (a *HAServiceReconciler) backendRoutesSetup(ctx context.Context, serviceNam return true, nil } +func isCurrentStatus(gotCfgs ingressservices.Status, pod *corev1.Pod, logger *zap.SugaredLogger) (bool, error) { + ips := pod.Status.PodIPs + if len(ips) == 0 { + logger.Debugf("Pod %q does not yet have IPs, unable to determine if status is up to date", pod.Name) + return false, nil + } + + if len(ips) > 2 { + return false, fmt.Errorf("POD_IPs can contain at most 2 IPs, got %d (%v)", len(ips), ips) + } + var podIPv4, podIPv6 string + for _, ip := range ips { + parsed, err := netip.ParseAddr(ip.IP) + if err != nil { + return false, fmt.Errorf("error parsing IP address %s: %w", ip.IP, err) + } + if parsed.Is4() { + podIPv4 = parsed.String() + continue + } + podIPv6 = parsed.String() + } + if podIPv4 != gotCfgs.PodIPv4 || podIPv6 != gotCfgs.PodIPv6 { + return false, nil + } + return true, nil +} + func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, pgName string, serviceName tailcfg.ServiceName, cfg *ingressservices.Config, shouldBeAdvertised bool, logger *zap.SugaredLogger) (err error) { logger.Debugf("checking advertisement for service '%s'", serviceName) // Get all config Secrets for this ProxyGroup. diff --git a/kube/ingressservices/ingressservices.go b/kube/ingressservices/ingressservices.go index 8a05cdc92..57f57d168 100644 --- a/kube/ingressservices/ingressservices.go +++ b/kube/ingressservices/ingressservices.go @@ -22,8 +22,8 @@ func (cfgs *Configs) GetConfig(name string) *Config { type Status struct { Configs Configs `json:"configs,omitempty"` - // PodIP is sufficiently unique to distinguish status that belongs to this Pod. - PodIP string `json:"podIP,omitempty"` + PodIPv4 string `json:"podIPv4,omitempty"` + PodIPv6 string `json:"podIPv6,omitempty"` } type Mapping struct {