From e9ef402d70fe26e5a96d4f28c916c2da6bf5ddc5 Mon Sep 17 00:00:00 2001 From: Irbe Krumina Date: Wed, 30 Apr 2025 08:28:58 +0100 Subject: [PATCH] WIP Signed-off-by: Irbe Krumina --- cmd/containerboot/ingressservices.go | 278 ++++++++++++++++++ cmd/containerboot/main.go | 19 ++ cmd/containerboot/settings.go | 27 +- .../deploy/examples/proxygroup.yaml | 19 +- cmd/k8s-operator/ingress-for-pg.go | 4 +- cmd/k8s-operator/operator.go | 2 + cmd/k8s-operator/proxygroup_specs.go | 5 + cmd/k8s-operator/svc-for-pg.go | 21 +- kube/ingressservices/ingressservices.go | 25 +- util/linuxfw/iptables_for_svcs.go | 56 +++- util/linuxfw/nftables_runner.go | 9 + 11 files changed, 433 insertions(+), 32 deletions(-) create mode 100644 cmd/containerboot/ingressservices.go diff --git a/cmd/containerboot/ingressservices.go b/cmd/containerboot/ingressservices.go new file mode 100644 index 000000000..73bfc889a --- /dev/null +++ b/cmd/containerboot/ingressservices.go @@ -0,0 +1,278 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/netip" + "os" + "path/filepath" + "reflect" + "time" + + "github.com/fsnotify/fsnotify" + "tailscale.com/kube/ingressservices" + "tailscale.com/kube/kubeclient" + "tailscale.com/util/linuxfw" + "tailscale.com/util/mak" +) + +type ingressProxy struct { + cfgPath string // path to a directory with ingress services config files + + nfr linuxfw.NetfilterRunner // never nil + + kc kubeclient.Client // never nil + stateSecret string // name of the kube state Secret + + podIP string // never empty string +} + +func (ep *ingressProxy) run(ctx context.Context, opts ingressProxyOpts) error { + log.Printf("starting ingress proxy...") + ep.configure(opts) + var tickChan <-chan time.Time + var eventChan <-chan fsnotify.Event + // TODO (irbekrm): take a look if this can be pulled into a single func + // shared with serve config loader. + if w, err := fsnotify.NewWatcher(); err != nil { + log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + tickChan = ticker.C + } else { + defer w.Close() + dir := filepath.Dir(ep.cfgPath) + if err := w.Add(dir); err != nil { + return fmt.Errorf("failed to add fsnotify watch for %v: %w", dir, err) + } + eventChan = w.Events + } + + if err := ep.sync(ctx); err != nil { + return err + } + for { + select { + case <-ctx.Done(): + return nil + case <-tickChan: + log.Printf("periodic sync, ensuring firewall config is up to date...") + case <-eventChan: + log.Printf("config file change detected, ensuring firewall config is up to date...") + } + if err := ep.sync(ctx); err != nil { + return fmt.Errorf("error syncing ingress service config: %w", err) + } + } +} + +type ingressProxyOpts struct { + cfgPath string + nfr linuxfw.NetfilterRunner + kc kubeclient.Client + stateSecret string + podIP string // TODO: change to IP hash maybe +} + +// applyOpts configures egress proxy using the provided options. +func (ep *ingressProxy) configure(opts ingressProxyOpts) { + ep.cfgPath = opts.cfgPath + ep.nfr = opts.nfr + ep.kc = opts.kc + ep.stateSecret = opts.stateSecret + ep.podIP = opts.podIP +} + +func (ep *ingressProxy) sync(ctx context.Context) error { + cfgs, err := ep.getConfigs() + if err != nil { + return fmt.Errorf("error retrieving ingress service configs: %w", err) + } + status, err := ep.getStatus(ctx) + if err != nil { + return fmt.Errorf("error retrieving current ingress proxy status: %w", err) + } + // get status + if err := ep.syncIngressConfigs(cfgs, status); err != nil { + return fmt.Errorf("error syncing ingress service configs: %w", err) + } + var existingConfigs *ingressservices.Configs + if status != nil { + existingConfigs = &status.Configs + } + if !ingresServicesStatusIsEqual(cfgs, existingConfigs) { + if err := ep.setStatus(ctx, cfgs); err != nil { + return fmt.Errorf("error setting ingress proxy status: %w", err) + } + } + return nil +} + +func (ep *ingressProxy) getRulesToDelete(cfgs *ingressservices.Configs, status *ingressservices.Status) (rulesToDelete map[string]ingressservices.Config) { + if status == nil { + return nil + } + for svcName, cfg := range status.Configs { + needed := cfgs.GetConfig(svcName) + if reflect.DeepEqual(needed, cfg) { + continue + } + mak.Set(&rulesToDelete, svcName, cfg) + } + return rulesToDelete +} + +func (ep *ingressProxy) getRulesToAdd(cfgs *ingressservices.Configs, status *ingressservices.Status) (rulesToAdd map[string]ingressservices.Config) { + if cfgs == nil { + return nil + } + for svcName, cfg := range *cfgs { + if status == nil { + mak.Set(&rulesToAdd, svcName, cfg) + continue + } + existing := status.Configs.GetConfig(svcName) + if reflect.DeepEqual(existing, cfg) { + continue + } + mak.Set(&rulesToAdd, svcName, cfg) + } + return rulesToAdd +} + +func (ep *ingressProxy) syncIngressConfigs(cfgs *ingressservices.Configs, status *ingressservices.Status) error { + // Add new services, update rules for any that have changed. + rulesToAdd := ep.getRulesToAdd(cfgs, status) + rulesToDelete := ep.getRulesToDelete(cfgs, status) + + if err := ensureIngressRulesDeleted(rulesToDelete, ep.nfr); err != nil { + return fmt.Errorf("error deleting ingress rules: %w", err) + } + if err := ensureIngressRulesAdded(rulesToAdd, ep.nfr); err != nil { + return fmt.Errorf("error adding rules: %w", err) + } + // Maybe SNAT? + return nil +} + +// getConfigs gets the mounted egress service configuration. +func (ep *ingressProxy) getConfigs() (*ingressservices.Configs, error) { + j, err := os.ReadFile(ep.cfgPath) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + return nil, err + } + if len(j) == 0 || string(j) == "" { + return nil, nil + } + cfg := &ingressservices.Configs{} + if err := json.Unmarshal(j, &cfg); err != nil { + return nil, err + } + return cfg, nil +} + +// getStatus gets the current status of the configured firewall. The current +// status is stored in state Secret. Returns nil status if no status that +// applies to the current proxy Pod was found. Uses the Pod IP to determine if a +// status found in the state Secret applies to this proxy Pod. +func (ep *ingressProxy) getStatus(ctx context.Context) (*ingressservices.Status, error) { + secret, err := ep.kc.GetSecret(ctx, ep.stateSecret) + if err != nil { + return nil, fmt.Errorf("error retrieving state secret: %w", err) + } + status := &ingressservices.Status{} + raw, ok := secret.Data[ingressservices.IngressConfigKey] + if !ok { + return nil, nil + } + if err := json.Unmarshal([]byte(raw), status); err != nil { + return nil, fmt.Errorf("error unmarshalling previous config: %w", err) + } + if reflect.DeepEqual(status.PodIP, ep.podIP) { + return status, nil + } + return nil, nil +} + +func (ep *ingressProxy) setStatus(ctx context.Context, newCfg *ingressservices.Configs) error { + // Pod IP is used to determine if a stored status applies to THIS proxy Pod. + status := &ingressservices.Status{Configs: *newCfg} + status.PodIP = ep.podIP + secret, err := ep.kc.GetSecret(ctx, ep.stateSecret) + if err != nil { + return fmt.Errorf("error retrieving state Secret: %w", err) + } + bs, err := json.Marshal(status) + if err != nil { + return fmt.Errorf("error marshalling service config: %w", err) + } + secret.Data[ingressservices.IngressConfigKey] = bs + patch := kubeclient.JSONPatch{ + Op: "replace", + Path: fmt.Sprintf("/data/%s", ingressservices.IngressConfigKey), + Value: bs, + } + if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil { + return fmt.Errorf("error patching state Secret: %w", err) + } + return nil +} + +func ensureIngressRulesAdded(cfgs map[string]ingressservices.Config, nfr linuxfw.NetfilterRunner) error { + for serviceName, cfg := range cfgs { + f := func(svcName string, vipIP, clusterIP netip.Addr) error { + log.Printf("ensureIngressRulesAdded VIPService %s with IP %s to cluster IP %s", serviceName, vipIP, clusterIP) + return nfr.EnsureDNATRuleForSvc(svcName, vipIP, clusterIP) + } + if cfg.IPv4Mapping != nil { + if err := f(serviceName, cfg.IPv4Mapping.VIPServiceIP, cfg.IPv4Mapping.ClusterIP); err != nil { + return fmt.Errorf("error adding ingress rule for %s: %w", serviceName, err) + } + } + if cfg.IPv6Mapping != nil { + if err := f(serviceName, cfg.IPv6Mapping.VIPServiceIP, cfg.IPv6Mapping.ClusterIP); err != nil { + return fmt.Errorf("error adding ingress rule for %s: %w", serviceName, err) + } + } + } + return nil +} +func ensureIngressRulesDeleted(cfgs map[string]ingressservices.Config, nfr linuxfw.NetfilterRunner) error { + for serviceName, cfg := range cfgs { + f := func(svcName string, vipIP, clusterIP netip.Addr) error { + log.Printf("ensureIngressRulesDeleted VIPService %s with IP %s to cluster IP %s", serviceName, vipIP, clusterIP) + return nfr.DeleteDNATRuleForSvc(svcName, vipIP, clusterIP) + } + if cfg.IPv4Mapping != nil { + if err := f(serviceName, cfg.IPv4Mapping.VIPServiceIP, cfg.IPv4Mapping.ClusterIP); err != nil { + return fmt.Errorf("error deleting ingress rule for %s: %w", serviceName, err) + } + } + if cfg.IPv6Mapping != nil { + if err := f(serviceName, cfg.IPv6Mapping.VIPServiceIP, cfg.IPv6Mapping.ClusterIP); err != nil { + return fmt.Errorf("error deleting ingress rule for %s: %w", serviceName, err) + } + } + } + return nil +} + +func ingresServicesStatusIsEqual(st, st1 *ingressservices.Configs) bool { + if st == nil && st1 == nil { + return true + } + if st == nil || st1 == nil { + return false + } + return reflect.DeepEqual(*st, *st1) +} diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index 9425571e6..72e1b8aab 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -441,6 +441,7 @@ authLoop: // egressSvcsErrorChan will get an error sent to it if this containerboot instance is configured to expose 1+ // egress services in HA mode and errored. var egressSvcsErrorChan = make(chan error) + var ingressSvcsErrorChan = make(chan error) defer t.Stop() // resetTimer resets timer for when to next attempt to resolve the DNS // name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The @@ -694,6 +695,22 @@ runLoop: } }() } + ip := ingressProxy{} + if cfg.IngressProxiesCfgPath != "" { + log.Printf("configuring ingress proxy using configuration file at %s", cfg.IngressProxiesCfgPath) + opts := ingressProxyOpts{ + cfgPath: cfg.IngressProxiesCfgPath, + nfr: nfr, + kc: kc, + stateSecret: cfg.KubeSecret, + podIP: cfg.PodIPv4, + } + go func() { + if err := ip.run(ctx, opts); err != nil { + ingressSvcsErrorChan <- err + } + }() + } // Wait on tailscaled process. It won't be cleaned up by default when the // container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the @@ -738,6 +755,8 @@ runLoop: resetTimer(false) case e := <-egressSvcsErrorChan: return fmt.Errorf("egress proxy failed: %v", e) + case e := <-ingressSvcsErrorChan: + return fmt.Errorf("ingress proxy failed: %v", e) } } wg.Wait() diff --git a/cmd/containerboot/settings.go b/cmd/containerboot/settings.go index c62db5340..0ac9c828e 100644 --- a/cmd/containerboot/settings.go +++ b/cmd/containerboot/settings.go @@ -64,16 +64,17 @@ type settings struct { // when setting up rules to proxy cluster traffic to cluster ingress // target. // Deprecated: use PodIPv4, PodIPv6 instead to support dual stack clusters - PodIP string - PodIPv4 string - PodIPv6 string - PodUID string - HealthCheckAddrPort string - LocalAddrPort string - MetricsEnabled bool - HealthCheckEnabled bool - DebugAddrPort string - EgressProxiesCfgPath string + PodIP string + PodIPv4 string + PodIPv6 string + PodUID string + HealthCheckAddrPort string + LocalAddrPort string + MetricsEnabled bool + HealthCheckEnabled bool + DebugAddrPort string + EgressProxiesCfgPath string + IngressProxiesCfgPath string // CertShareMode is set for Kubernetes Pods running cert share mode. // Possible values are empty (containerboot doesn't run any certs // logic), 'ro' (for Pods that shold never attempt to issue/renew @@ -114,6 +115,7 @@ func configFromEnv() (*settings, error) { HealthCheckEnabled: defaultBool("TS_ENABLE_HEALTH_CHECK", false), DebugAddrPort: defaultEnv("TS_DEBUG_ADDR_PORT", ""), EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""), + IngressProxiesCfgPath: defaultEnv("TS_INGRESS_PROXIES_CONFIG_PATH", ""), PodUID: defaultEnv("POD_UID", ""), } podIPs, ok := os.LookupEnv("POD_IPS") @@ -219,6 +221,9 @@ func (s *settings) validate() error { if s.EgressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") { return errors.New("TS_EGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes") } + if s.IngressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") { + return errors.New("TS_INGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes") + } return nil } @@ -308,7 +313,7 @@ func isOneStepConfig(cfg *settings) bool { // as an L3 proxy, proxying to an endpoint provided via one of the config env // vars. func isL3Proxy(cfg *settings) bool { - return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != "" + return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != "" || cfg.IngressProxiesCfgPath != "" } // hasKubeStateStore returns true if the state must be stored in a Kubernetes diff --git a/cmd/k8s-operator/deploy/examples/proxygroup.yaml b/cmd/k8s-operator/deploy/examples/proxygroup.yaml index 337d87f0b..f695b0d9f 100644 --- a/cmd/k8s-operator/deploy/examples/proxygroup.yaml +++ b/cmd/k8s-operator/deploy/examples/proxygroup.yaml @@ -1,7 +1,20 @@ apiVersion: tailscale.com/v1alpha1 kind: ProxyGroup metadata: - name: egress-proxies + name: ingress-proxies spec: - type: egress - replicas: 3 + type: ingress + replicas: 2 + proxyClass: prod +--- +apiVersion: tailscale.com/v1alpha1 +kind: ProxyClass +metadata: + name: prod +spec: + statefulSet: + pod: + tailscaleContainer: + env: + - name: TS_DEBUG_FIREWALL_MODE + value: "iptables" \ No newline at end of file diff --git a/cmd/k8s-operator/ingress-for-pg.go b/cmd/k8s-operator/ingress-for-pg.go index 3df5a07ee..16f890aa9 100644 --- a/cmd/k8s-operator/ingress-for-pg.go +++ b/cmd/k8s-operator/ingress-for-pg.go @@ -54,7 +54,8 @@ const ( // well as the default HTTPS endpoint). annotationHTTPEndpoint = "tailscale.com/http-endpoint" - labelDomain = "tailscale.com/domain" + labelDomain = "tailscale.com/domain" + managedVIPServiceComment = "This VIPService is managed by the Tailscale Kubernetes Operator, do not modify" ) var gaugePGIngressResources = clientmetric.NewGauge(kubetypes.MetricIngressPGResourceCount) @@ -314,7 +315,6 @@ func (r *HAIngressReconciler) maybeProvision(ctx context.Context, hostname strin vipPorts = append(vipPorts, "80") } - const managedVIPServiceComment = "This VIPService is managed by the Tailscale Kubernetes Operator, do not modify" vipSvc := &tailscale.VIPService{ Name: serviceName, Tags: tags, diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index 2315dc835..4d4aa45ce 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -372,6 +372,8 @@ func runReconcilers(opts reconcilerOpts) { ControllerManagedBy(mgr). For(&corev1.Service{}). Named("service-pg-reconciler"). + // TODO: this watch does not seem to work- does not if ProxyGroup created later + // maybe need to watch the ProxyGroup Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(HAServicesFromSecret(mgr.GetClient(), startlog))). Watches(&tsapi.ProxyGroup{}, ingressProxyGroupFilter). Complete(&HAServiceReconciler{ diff --git a/cmd/k8s-operator/proxygroup_specs.go b/cmd/k8s-operator/proxygroup_specs.go index 0cf88b738..1d12c39e0 100644 --- a/cmd/k8s-operator/proxygroup_specs.go +++ b/cmd/k8s-operator/proxygroup_specs.go @@ -18,6 +18,7 @@ import ( "sigs.k8s.io/yaml" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/kube/egressservices" + "tailscale.com/kube/ingressservices" "tailscale.com/kube/kubetypes" "tailscale.com/types/ptr" ) @@ -175,6 +176,10 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string Name: "TS_INTERNAL_APP", Value: kubetypes.AppProxyGroupIngress, }, + corev1.EnvVar{ + Name: "TS_INGRESS_PROXIES_CONFIG_PATH", + Value: fmt.Sprintf("/etc/proxies/%s", ingressservices.IngressConfigKey), + }, corev1.EnvVar{ Name: "TS_SERVE_CONFIG", Value: fmt.Sprintf("/etc/proxies/%s", serveConfigKey), diff --git a/cmd/k8s-operator/svc-for-pg.go b/cmd/k8s-operator/svc-for-pg.go index 58478e925..6356e3292 100644 --- a/cmd/k8s-operator/svc-for-pg.go +++ b/cmd/k8s-operator/svc-for-pg.go @@ -325,17 +325,23 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin } if ip.Is4() { - mak.Set(&cfg.IPv4Mapping, vipv4, ip) + cfg.IPv4Mapping = &ingressservices.Mapping{ + ClusterIP: ip, + VIPServiceIP: vipv4, + } } else if ip.Is6() { - mak.Set(&cfg.IPv6Mapping, vipv6, ip) + cfg.IPv6Mapping = &ingressservices.Mapping{ + ClusterIP: ip, + VIPServiceIP: vipv6, + } } } existingCfg := cfgs[serviceName.String()] if !reflect.DeepEqual(existingCfg, cfg) { - logger.Infof("Updating ingress config") + logger.Infof("Updating ingress config adding %+#v", cfg) mak.Set(&cfgs, serviceName.String(), cfg) - cfgBytes, err := json.Marshal(cfg) + cfgBytes, err := json.Marshal(cfgs) if err != nil { return false, fmt.Errorf("error marshaling ingress config: %w", err) } @@ -347,9 +353,9 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin // 5. Update tailscaled's AdvertiseServices config, which should add the VIPService // IPs to the ProxyGroup Pods' AllowedIPs in the next netmap update if approved. - // if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pg.Name, serviceName, mode, logger); err != nil { - // return false, fmt.Errorf("failed to update tailscaled config: %w", err) - // } + if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pg.Name, serviceName, mode, logger); err != nil { + return false, fmt.Errorf("failed to update tailscaled config: %w", err) + } // 6. Update Ingress status if ProxyGroup Pods are ready. // count, err := r.numberPodsAdvertising(ctx, pg.Name, serviceName) @@ -628,6 +634,7 @@ func (r *HAServiceReconciler) cleanupVIPService(ctx context.Context, name tailcf func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, pgName string, serviceName tailcfg.ServiceName, mode serviceAdvertisementMode, logger *zap.SugaredLogger) (err error) { // Get all config Secrets for this ProxyGroup. + // Get all Pods secrets := &corev1.SecretList{} if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, "config"))); err != nil { return fmt.Errorf("failed to list config Secrets: %w", err) diff --git a/kube/ingressservices/ingressservices.go b/kube/ingressservices/ingressservices.go index c6430fa9c..8a05cdc92 100644 --- a/kube/ingressservices/ingressservices.go +++ b/kube/ingressservices/ingressservices.go @@ -10,10 +10,29 @@ const ( // service name. type Configs map[string]Config -type Mapping map[netip.Addr]netip.Addr +func (cfgs *Configs) GetConfig(name string) *Config { + if cfgs == nil { + return nil + } + if cfg, ok := (*cfgs)[name]; ok { + return &cfg + } + return nil +} + +type Status struct { + Configs Configs `json:"configs,omitempty"` + // PodIP is sufficiently unique to distinguish status that belongs to this Pod. + PodIP string `json:"podIP,omitempty"` +} + +type Mapping struct { + VIPServiceIP netip.Addr `json:"VIPServiceIP"` + ClusterIP netip.Addr `json:"ClusterIP"` +} // Config is an ingress service configuration. type Config struct { - IPv4Mapping Mapping `json:"IPv4Mapping"` - IPv6Mapping Mapping `json:"IPv6Mapping"` + IPv4Mapping *Mapping `json:"IPv4Mapping,omitempty"` + IPv6Mapping *Mapping `json:"IPv6Mapping,omitempty"` } diff --git a/util/linuxfw/iptables_for_svcs.go b/util/linuxfw/iptables_for_svcs.go index 8e0f5d48d..b9e8d1060 100644 --- a/util/linuxfw/iptables_for_svcs.go +++ b/util/linuxfw/iptables_for_svcs.go @@ -24,10 +24,10 @@ func (i *iptablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip if err != nil { return fmt.Errorf("error checking if rule exists: %w", err) } - if !exists { - return table.Append("nat", "PREROUTING", args...) + if exists { + return nil } - return nil + return table.Append("nat", "PREROUTING", args...) } // DeleteMapRuleForSvc constructs a prerouting rule as would be created by @@ -40,10 +40,38 @@ func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, excludeI string, targetIP if err != nil { return fmt.Errorf("error checking if rule exists: %w", err) } - if exists { - return table.Delete("nat", "PREROUTING", args...) + if !exists { + return nil } - return nil + return table.Delete("nat", "PREROUTING", args...) +} + +// origDst is the VIPService IP address, dst is cluster Service address. +func (i *iptablesRunner) EnsureDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error { + table := i.getIPTByAddr(dst) + args := argsForIngressRule(svcName, origDst, dst) + exists, err := table.Exists("nat", "PREROUTING", args...) + if err != nil { + return fmt.Errorf("error checking if rule exists: %w", err) + } + if exists { + return nil + } + return table.Append("nat", "PREROUTING", args...) +} + +// origDst is the VIPService IP address, dst is cluster Service address. +func (i *iptablesRunner) DeleteDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error { + table := i.getIPTByAddr(dst) + args := argsForIngressRule(svcName, origDst, dst) + exists, err := table.Exists("nat", "PREROUTING", args...) + if err != nil { + return fmt.Errorf("error checking if rule exists: %w", err) + } + if !exists { + return nil + } + return table.Delete("nat", "PREROUTING", args...) } // DeleteSvc constructs all possible rules that would have been created by @@ -72,8 +100,24 @@ func argsForPortMapRule(svc, excludeI string, targetIP netip.Addr, pm PortMap) [ } } +func argsForIngressRule(svcName string, origDst, targetIP netip.Addr) []string { + c := commentForIngressSvc(svcName, origDst, targetIP) + return []string{ + "--destination", origDst.String(), + "-m", "comment", "--comment", c, + "-j", "DNAT", + "--to-destination", targetIP.String(), + } +} + // commentForSvc generates a comment to be added to an iptables DNAT rule for a // service. This is for iptables debugging/readability purposes only. func commentForSvc(svc string, pm PortMap) string { return fmt.Sprintf("%s:%s:%d -> %s:%d", svc, pm.Protocol, pm.MatchPort, pm.Protocol, pm.TargetPort) } + +// commentForIngressSvc generates a comment to be added to an iptables DNAT rule for a +// service. This is for iptables debugging/readability purposes only. +func commentForIngressSvc(svc string, vip, clusterIP netip.Addr) string { + return fmt.Sprintf("svc: %s, %s -> %s", svc, vip.String(), clusterIP.String()) +} diff --git a/util/linuxfw/nftables_runner.go b/util/linuxfw/nftables_runner.go index 0f411521b..e8415d4e0 100644 --- a/util/linuxfw/nftables_runner.go +++ b/util/linuxfw/nftables_runner.go @@ -555,6 +555,8 @@ type NetfilterRunner interface { EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error + EnsureDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error + DeleteDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []PortMap) error @@ -2053,3 +2055,10 @@ func snatRule(t *nftables.Table, ch *nftables.Chain, src, dst netip.Addr, meta [ UserData: meta, } } + +func (nfr *nftablesRunner) EnsureDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error { + return nil +} +func (nfr *nftablesRunner) DeleteDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error { + return nil +}