Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2025-04-30 08:28:58 +01:00
parent 0eb1ccccce
commit e9ef402d70
11 changed files with 433 additions and 32 deletions

View File

@ -0,0 +1,278 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/netip"
"os"
"path/filepath"
"reflect"
"time"
"github.com/fsnotify/fsnotify"
"tailscale.com/kube/ingressservices"
"tailscale.com/kube/kubeclient"
"tailscale.com/util/linuxfw"
"tailscale.com/util/mak"
)
type ingressProxy struct {
cfgPath string // path to a directory with ingress services config files
nfr linuxfw.NetfilterRunner // never nil
kc kubeclient.Client // never nil
stateSecret string // name of the kube state Secret
podIP string // never empty string
}
func (ep *ingressProxy) run(ctx context.Context, opts ingressProxyOpts) error {
log.Printf("starting ingress proxy...")
ep.configure(opts)
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
// TODO (irbekrm): take a look if this can be pulled into a single func
// shared with serve config loader.
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
defer w.Close()
dir := filepath.Dir(ep.cfgPath)
if err := w.Add(dir); err != nil {
return fmt.Errorf("failed to add fsnotify watch for %v: %w", dir, err)
}
eventChan = w.Events
}
if err := ep.sync(ctx); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case <-tickChan:
log.Printf("periodic sync, ensuring firewall config is up to date...")
case <-eventChan:
log.Printf("config file change detected, ensuring firewall config is up to date...")
}
if err := ep.sync(ctx); err != nil {
return fmt.Errorf("error syncing ingress service config: %w", err)
}
}
}
type ingressProxyOpts struct {
cfgPath string
nfr linuxfw.NetfilterRunner
kc kubeclient.Client
stateSecret string
podIP string // TODO: change to IP hash maybe
}
// applyOpts configures egress proxy using the provided options.
func (ep *ingressProxy) configure(opts ingressProxyOpts) {
ep.cfgPath = opts.cfgPath
ep.nfr = opts.nfr
ep.kc = opts.kc
ep.stateSecret = opts.stateSecret
ep.podIP = opts.podIP
}
func (ep *ingressProxy) sync(ctx context.Context) error {
cfgs, err := ep.getConfigs()
if err != nil {
return fmt.Errorf("error retrieving ingress service configs: %w", err)
}
status, err := ep.getStatus(ctx)
if err != nil {
return fmt.Errorf("error retrieving current ingress proxy status: %w", err)
}
// get status
if err := ep.syncIngressConfigs(cfgs, status); err != nil {
return fmt.Errorf("error syncing ingress service configs: %w", err)
}
var existingConfigs *ingressservices.Configs
if status != nil {
existingConfigs = &status.Configs
}
if !ingresServicesStatusIsEqual(cfgs, existingConfigs) {
if err := ep.setStatus(ctx, cfgs); err != nil {
return fmt.Errorf("error setting ingress proxy status: %w", err)
}
}
return nil
}
func (ep *ingressProxy) getRulesToDelete(cfgs *ingressservices.Configs, status *ingressservices.Status) (rulesToDelete map[string]ingressservices.Config) {
if status == nil {
return nil
}
for svcName, cfg := range status.Configs {
needed := cfgs.GetConfig(svcName)
if reflect.DeepEqual(needed, cfg) {
continue
}
mak.Set(&rulesToDelete, svcName, cfg)
}
return rulesToDelete
}
func (ep *ingressProxy) getRulesToAdd(cfgs *ingressservices.Configs, status *ingressservices.Status) (rulesToAdd map[string]ingressservices.Config) {
if cfgs == nil {
return nil
}
for svcName, cfg := range *cfgs {
if status == nil {
mak.Set(&rulesToAdd, svcName, cfg)
continue
}
existing := status.Configs.GetConfig(svcName)
if reflect.DeepEqual(existing, cfg) {
continue
}
mak.Set(&rulesToAdd, svcName, cfg)
}
return rulesToAdd
}
func (ep *ingressProxy) syncIngressConfigs(cfgs *ingressservices.Configs, status *ingressservices.Status) error {
// Add new services, update rules for any that have changed.
rulesToAdd := ep.getRulesToAdd(cfgs, status)
rulesToDelete := ep.getRulesToDelete(cfgs, status)
if err := ensureIngressRulesDeleted(rulesToDelete, ep.nfr); err != nil {
return fmt.Errorf("error deleting ingress rules: %w", err)
}
if err := ensureIngressRulesAdded(rulesToAdd, ep.nfr); err != nil {
return fmt.Errorf("error adding rules: %w", err)
}
// Maybe SNAT?
return nil
}
// getConfigs gets the mounted egress service configuration.
func (ep *ingressProxy) getConfigs() (*ingressservices.Configs, error) {
j, err := os.ReadFile(ep.cfgPath)
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
return nil, err
}
if len(j) == 0 || string(j) == "" {
return nil, nil
}
cfg := &ingressservices.Configs{}
if err := json.Unmarshal(j, &cfg); err != nil {
return nil, err
}
return cfg, nil
}
// getStatus gets the current status of the configured firewall. The current
// status is stored in state Secret. Returns nil status if no status that
// applies to the current proxy Pod was found. Uses the Pod IP to determine if a
// status found in the state Secret applies to this proxy Pod.
func (ep *ingressProxy) getStatus(ctx context.Context) (*ingressservices.Status, error) {
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return nil, fmt.Errorf("error retrieving state secret: %w", err)
}
status := &ingressservices.Status{}
raw, ok := secret.Data[ingressservices.IngressConfigKey]
if !ok {
return nil, nil
}
if err := json.Unmarshal([]byte(raw), status); err != nil {
return nil, fmt.Errorf("error unmarshalling previous config: %w", err)
}
if reflect.DeepEqual(status.PodIP, ep.podIP) {
return status, nil
}
return nil, nil
}
func (ep *ingressProxy) setStatus(ctx context.Context, newCfg *ingressservices.Configs) error {
// Pod IP is used to determine if a stored status applies to THIS proxy Pod.
status := &ingressservices.Status{Configs: *newCfg}
status.PodIP = ep.podIP
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return fmt.Errorf("error retrieving state Secret: %w", err)
}
bs, err := json.Marshal(status)
if err != nil {
return fmt.Errorf("error marshalling service config: %w", err)
}
secret.Data[ingressservices.IngressConfigKey] = bs
patch := kubeclient.JSONPatch{
Op: "replace",
Path: fmt.Sprintf("/data/%s", ingressservices.IngressConfigKey),
Value: bs,
}
if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil {
return fmt.Errorf("error patching state Secret: %w", err)
}
return nil
}
func ensureIngressRulesAdded(cfgs map[string]ingressservices.Config, nfr linuxfw.NetfilterRunner) error {
for serviceName, cfg := range cfgs {
f := func(svcName string, vipIP, clusterIP netip.Addr) error {
log.Printf("ensureIngressRulesAdded VIPService %s with IP %s to cluster IP %s", serviceName, vipIP, clusterIP)
return nfr.EnsureDNATRuleForSvc(svcName, vipIP, clusterIP)
}
if cfg.IPv4Mapping != nil {
if err := f(serviceName, cfg.IPv4Mapping.VIPServiceIP, cfg.IPv4Mapping.ClusterIP); err != nil {
return fmt.Errorf("error adding ingress rule for %s: %w", serviceName, err)
}
}
if cfg.IPv6Mapping != nil {
if err := f(serviceName, cfg.IPv6Mapping.VIPServiceIP, cfg.IPv6Mapping.ClusterIP); err != nil {
return fmt.Errorf("error adding ingress rule for %s: %w", serviceName, err)
}
}
}
return nil
}
func ensureIngressRulesDeleted(cfgs map[string]ingressservices.Config, nfr linuxfw.NetfilterRunner) error {
for serviceName, cfg := range cfgs {
f := func(svcName string, vipIP, clusterIP netip.Addr) error {
log.Printf("ensureIngressRulesDeleted VIPService %s with IP %s to cluster IP %s", serviceName, vipIP, clusterIP)
return nfr.DeleteDNATRuleForSvc(svcName, vipIP, clusterIP)
}
if cfg.IPv4Mapping != nil {
if err := f(serviceName, cfg.IPv4Mapping.VIPServiceIP, cfg.IPv4Mapping.ClusterIP); err != nil {
return fmt.Errorf("error deleting ingress rule for %s: %w", serviceName, err)
}
}
if cfg.IPv6Mapping != nil {
if err := f(serviceName, cfg.IPv6Mapping.VIPServiceIP, cfg.IPv6Mapping.ClusterIP); err != nil {
return fmt.Errorf("error deleting ingress rule for %s: %w", serviceName, err)
}
}
}
return nil
}
func ingresServicesStatusIsEqual(st, st1 *ingressservices.Configs) bool {
if st == nil && st1 == nil {
return true
}
if st == nil || st1 == nil {
return false
}
return reflect.DeepEqual(*st, *st1)
}

View File

@ -441,6 +441,7 @@ authLoop:
// egressSvcsErrorChan will get an error sent to it if this containerboot instance is configured to expose 1+
// egress services in HA mode and errored.
var egressSvcsErrorChan = make(chan error)
var ingressSvcsErrorChan = make(chan error)
defer t.Stop()
// resetTimer resets timer for when to next attempt to resolve the DNS
// name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The
@ -694,6 +695,22 @@ runLoop:
}
}()
}
ip := ingressProxy{}
if cfg.IngressProxiesCfgPath != "" {
log.Printf("configuring ingress proxy using configuration file at %s", cfg.IngressProxiesCfgPath)
opts := ingressProxyOpts{
cfgPath: cfg.IngressProxiesCfgPath,
nfr: nfr,
kc: kc,
stateSecret: cfg.KubeSecret,
podIP: cfg.PodIPv4,
}
go func() {
if err := ip.run(ctx, opts); err != nil {
ingressSvcsErrorChan <- err
}
}()
}
// Wait on tailscaled process. It won't be cleaned up by default when the
// container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the
@ -738,6 +755,8 @@ runLoop:
resetTimer(false)
case e := <-egressSvcsErrorChan:
return fmt.Errorf("egress proxy failed: %v", e)
case e := <-ingressSvcsErrorChan:
return fmt.Errorf("ingress proxy failed: %v", e)
}
}
wg.Wait()

View File

@ -64,16 +64,17 @@ type settings struct {
// when setting up rules to proxy cluster traffic to cluster ingress
// target.
// Deprecated: use PodIPv4, PodIPv6 instead to support dual stack clusters
PodIP string
PodIPv4 string
PodIPv6 string
PodUID string
HealthCheckAddrPort string
LocalAddrPort string
MetricsEnabled bool
HealthCheckEnabled bool
DebugAddrPort string
EgressProxiesCfgPath string
PodIP string
PodIPv4 string
PodIPv6 string
PodUID string
HealthCheckAddrPort string
LocalAddrPort string
MetricsEnabled bool
HealthCheckEnabled bool
DebugAddrPort string
EgressProxiesCfgPath string
IngressProxiesCfgPath string
// CertShareMode is set for Kubernetes Pods running cert share mode.
// Possible values are empty (containerboot doesn't run any certs
// logic), 'ro' (for Pods that shold never attempt to issue/renew
@ -114,6 +115,7 @@ func configFromEnv() (*settings, error) {
HealthCheckEnabled: defaultBool("TS_ENABLE_HEALTH_CHECK", false),
DebugAddrPort: defaultEnv("TS_DEBUG_ADDR_PORT", ""),
EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""),
IngressProxiesCfgPath: defaultEnv("TS_INGRESS_PROXIES_CONFIG_PATH", ""),
PodUID: defaultEnv("POD_UID", ""),
}
podIPs, ok := os.LookupEnv("POD_IPS")
@ -219,6 +221,9 @@ func (s *settings) validate() error {
if s.EgressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") {
return errors.New("TS_EGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes")
}
if s.IngressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") {
return errors.New("TS_INGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes")
}
return nil
}
@ -308,7 +313,7 @@ func isOneStepConfig(cfg *settings) bool {
// as an L3 proxy, proxying to an endpoint provided via one of the config env
// vars.
func isL3Proxy(cfg *settings) bool {
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != ""
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != "" || cfg.IngressProxiesCfgPath != ""
}
// hasKubeStateStore returns true if the state must be stored in a Kubernetes

View File

@ -1,7 +1,20 @@
apiVersion: tailscale.com/v1alpha1
kind: ProxyGroup
metadata:
name: egress-proxies
name: ingress-proxies
spec:
type: egress
replicas: 3
type: ingress
replicas: 2
proxyClass: prod
---
apiVersion: tailscale.com/v1alpha1
kind: ProxyClass
metadata:
name: prod
spec:
statefulSet:
pod:
tailscaleContainer:
env:
- name: TS_DEBUG_FIREWALL_MODE
value: "iptables"

View File

@ -54,7 +54,8 @@ const (
// well as the default HTTPS endpoint).
annotationHTTPEndpoint = "tailscale.com/http-endpoint"
labelDomain = "tailscale.com/domain"
labelDomain = "tailscale.com/domain"
managedVIPServiceComment = "This VIPService is managed by the Tailscale Kubernetes Operator, do not modify"
)
var gaugePGIngressResources = clientmetric.NewGauge(kubetypes.MetricIngressPGResourceCount)
@ -314,7 +315,6 @@ func (r *HAIngressReconciler) maybeProvision(ctx context.Context, hostname strin
vipPorts = append(vipPorts, "80")
}
const managedVIPServiceComment = "This VIPService is managed by the Tailscale Kubernetes Operator, do not modify"
vipSvc := &tailscale.VIPService{
Name: serviceName,
Tags: tags,

View File

@ -372,6 +372,8 @@ func runReconcilers(opts reconcilerOpts) {
ControllerManagedBy(mgr).
For(&corev1.Service{}).
Named("service-pg-reconciler").
// TODO: this watch does not seem to work- does not if ProxyGroup created later
// maybe need to watch the ProxyGroup
Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(HAServicesFromSecret(mgr.GetClient(), startlog))).
Watches(&tsapi.ProxyGroup{}, ingressProxyGroupFilter).
Complete(&HAServiceReconciler{

View File

@ -18,6 +18,7 @@ import (
"sigs.k8s.io/yaml"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/ingressservices"
"tailscale.com/kube/kubetypes"
"tailscale.com/types/ptr"
)
@ -175,6 +176,10 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
Name: "TS_INTERNAL_APP",
Value: kubetypes.AppProxyGroupIngress,
},
corev1.EnvVar{
Name: "TS_INGRESS_PROXIES_CONFIG_PATH",
Value: fmt.Sprintf("/etc/proxies/%s", ingressservices.IngressConfigKey),
},
corev1.EnvVar{
Name: "TS_SERVE_CONFIG",
Value: fmt.Sprintf("/etc/proxies/%s", serveConfigKey),

View File

@ -325,17 +325,23 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin
}
if ip.Is4() {
mak.Set(&cfg.IPv4Mapping, vipv4, ip)
cfg.IPv4Mapping = &ingressservices.Mapping{
ClusterIP: ip,
VIPServiceIP: vipv4,
}
} else if ip.Is6() {
mak.Set(&cfg.IPv6Mapping, vipv6, ip)
cfg.IPv6Mapping = &ingressservices.Mapping{
ClusterIP: ip,
VIPServiceIP: vipv6,
}
}
}
existingCfg := cfgs[serviceName.String()]
if !reflect.DeepEqual(existingCfg, cfg) {
logger.Infof("Updating ingress config")
logger.Infof("Updating ingress config adding %+#v", cfg)
mak.Set(&cfgs, serviceName.String(), cfg)
cfgBytes, err := json.Marshal(cfg)
cfgBytes, err := json.Marshal(cfgs)
if err != nil {
return false, fmt.Errorf("error marshaling ingress config: %w", err)
}
@ -347,9 +353,9 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin
// 5. Update tailscaled's AdvertiseServices config, which should add the VIPService
// IPs to the ProxyGroup Pods' AllowedIPs in the next netmap update if approved.
// if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pg.Name, serviceName, mode, logger); err != nil {
// return false, fmt.Errorf("failed to update tailscaled config: %w", err)
// }
if err = r.maybeUpdateAdvertiseServicesConfig(ctx, pg.Name, serviceName, mode, logger); err != nil {
return false, fmt.Errorf("failed to update tailscaled config: %w", err)
}
// 6. Update Ingress status if ProxyGroup Pods are ready.
// count, err := r.numberPodsAdvertising(ctx, pg.Name, serviceName)
@ -628,6 +634,7 @@ func (r *HAServiceReconciler) cleanupVIPService(ctx context.Context, name tailcf
func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, pgName string, serviceName tailcfg.ServiceName, mode serviceAdvertisementMode, logger *zap.SugaredLogger) (err error) {
// Get all config Secrets for this ProxyGroup.
// Get all Pods
secrets := &corev1.SecretList{}
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, "config"))); err != nil {
return fmt.Errorf("failed to list config Secrets: %w", err)

View File

@ -10,10 +10,29 @@ const (
// service name.
type Configs map[string]Config
type Mapping map[netip.Addr]netip.Addr
func (cfgs *Configs) GetConfig(name string) *Config {
if cfgs == nil {
return nil
}
if cfg, ok := (*cfgs)[name]; ok {
return &cfg
}
return nil
}
type Status struct {
Configs Configs `json:"configs,omitempty"`
// PodIP is sufficiently unique to distinguish status that belongs to this Pod.
PodIP string `json:"podIP,omitempty"`
}
type Mapping struct {
VIPServiceIP netip.Addr `json:"VIPServiceIP"`
ClusterIP netip.Addr `json:"ClusterIP"`
}
// Config is an ingress service configuration.
type Config struct {
IPv4Mapping Mapping `json:"IPv4Mapping"`
IPv6Mapping Mapping `json:"IPv6Mapping"`
IPv4Mapping *Mapping `json:"IPv4Mapping,omitempty"`
IPv6Mapping *Mapping `json:"IPv6Mapping,omitempty"`
}

View File

@ -24,10 +24,10 @@ func (i *iptablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip
if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err)
}
if !exists {
return table.Append("nat", "PREROUTING", args...)
if exists {
return nil
}
return nil
return table.Append("nat", "PREROUTING", args...)
}
// DeleteMapRuleForSvc constructs a prerouting rule as would be created by
@ -40,10 +40,38 @@ func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, excludeI string, targetIP
if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err)
}
if exists {
return table.Delete("nat", "PREROUTING", args...)
if !exists {
return nil
}
return nil
return table.Delete("nat", "PREROUTING", args...)
}
// origDst is the VIPService IP address, dst is cluster Service address.
func (i *iptablesRunner) EnsureDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error {
table := i.getIPTByAddr(dst)
args := argsForIngressRule(svcName, origDst, dst)
exists, err := table.Exists("nat", "PREROUTING", args...)
if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err)
}
if exists {
return nil
}
return table.Append("nat", "PREROUTING", args...)
}
// origDst is the VIPService IP address, dst is cluster Service address.
func (i *iptablesRunner) DeleteDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error {
table := i.getIPTByAddr(dst)
args := argsForIngressRule(svcName, origDst, dst)
exists, err := table.Exists("nat", "PREROUTING", args...)
if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err)
}
if !exists {
return nil
}
return table.Delete("nat", "PREROUTING", args...)
}
// DeleteSvc constructs all possible rules that would have been created by
@ -72,8 +100,24 @@ func argsForPortMapRule(svc, excludeI string, targetIP netip.Addr, pm PortMap) [
}
}
func argsForIngressRule(svcName string, origDst, targetIP netip.Addr) []string {
c := commentForIngressSvc(svcName, origDst, targetIP)
return []string{
"--destination", origDst.String(),
"-m", "comment", "--comment", c,
"-j", "DNAT",
"--to-destination", targetIP.String(),
}
}
// commentForSvc generates a comment to be added to an iptables DNAT rule for a
// service. This is for iptables debugging/readability purposes only.
func commentForSvc(svc string, pm PortMap) string {
return fmt.Sprintf("%s:%s:%d -> %s:%d", svc, pm.Protocol, pm.MatchPort, pm.Protocol, pm.TargetPort)
}
// commentForIngressSvc generates a comment to be added to an iptables DNAT rule for a
// service. This is for iptables debugging/readability purposes only.
func commentForIngressSvc(svc string, vip, clusterIP netip.Addr) string {
return fmt.Sprintf("svc: %s, %s -> %s", svc, vip.String(), clusterIP.String())
}

View File

@ -555,6 +555,8 @@ type NetfilterRunner interface {
EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error
DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error
EnsureDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error
DeleteDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error
DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []PortMap) error
@ -2053,3 +2055,10 @@ func snatRule(t *nftables.Table, ch *nftables.Chain, src, dst netip.Addr, meta [
UserData: meta,
}
}
func (nfr *nftablesRunner) EnsureDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error {
return nil
}
func (nfr *nftablesRunner) DeleteDNATRuleForSvc(svcName string, origDst, dst netip.Addr) error {
return nil
}