cmd/containerboot,kube/ingressservices: proxy VIPService TCP/UDP traffic to cluster Services

This PR is part of the work to implement HA for Kubernetes Operator's
network layer proxy.
Adds logic to containerboot to monitor mounted ingress firewall configuration rules
and update iptables/nftables rules as the config changes.
Also adds new shared types for the ingress configuration.
The implementation is intentionally similar to that for HA for egress proxy.

Updates tailscale/tailscale#15895

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2025-05-07 11:14:38 +01:00
parent 67ecebd9e2
commit 06bbcb1c7d
9 changed files with 1446 additions and 796 deletions

View File

@ -0,0 +1,766 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"net/netip"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"tailscale.com/client/local"
"tailscale.com/ipn"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/kubeclient"
"tailscale.com/kube/kubetypes"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/util/httpm"
"tailscale.com/util/linuxfw"
"tailscale.com/util/mak"
)
const tailscaleTunInterface = "tailscale0"
// Modified using a build flag to speed up tests.
var testSleepDuration string
// This file contains functionality to run containerboot as a proxy that can
// route cluster traffic to one or more tailnet targets, based on portmapping
// rules read from a configfile. Currently (9/2024) this is only used for the
// Kubernetes operator egress proxies.
// egressProxy knows how to configure firewall rules to route cluster traffic to
// one or more tailnet services.
type egressProxy struct {
cfgPath string // path to a directory with egress services config files
nfr linuxfw.NetfilterRunner // never nil
kc kubeclient.Client // never nil
stateSecret string // name of the kube state Secret
tsClient *local.Client // never nil
netmapChan chan ipn.Notify // chan to receive netmap updates on
podIPv4 string // never empty string, currently only IPv4 is supported
// tailnetFQDNs is the egress service FQDN to tailnet IP mappings that
// were last used to configure firewall rules for this proxy.
// TODO(irbekrm): target addresses are also stored in the state Secret.
// Evaluate whether we should retrieve them from there and not store in
// memory at all.
targetFQDNs map[string][]netip.Prefix
tailnetAddrs []netip.Prefix // tailnet IPs of this tailnet device
// shortSleep is the backoff sleep between healthcheck endpoint calls - can be overridden in tests.
shortSleep time.Duration
// longSleep is the time to sleep after the routing rules are updated to increase the chance that kube
// proxies on all nodes have updated their routing configuration. It can be configured to 0 in
// tests.
longSleep time.Duration
// client is a client that can send HTTP requests.
client httpClient
}
// httpClient is a client that can send HTTP requests and can be mocked in tests.
type httpClient interface {
Do(*http.Request) (*http.Response, error)
}
// run configures egress proxy firewall rules and ensures that the firewall rules are reconfigured when:
// - the mounted egress config has changed
// - the proxy's tailnet IP addresses have changed
// - tailnet IPs have changed for any backend targets specified by tailnet FQDN
func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRunOpts) error {
ep.configure(opts)
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
// TODO (irbekrm): take a look if this can be pulled into a single func
// shared with serve config loader.
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
defer w.Close()
if err := w.Add(ep.cfgPath); err != nil {
return fmt.Errorf("failed to add fsnotify watch: %w", err)
}
eventChan = w.Events
}
if err := ep.sync(ctx, n); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case <-tickChan:
log.Printf("periodic sync, ensuring firewall config is up to date...")
case <-eventChan:
log.Printf("config file change detected, ensuring firewall config is up to date...")
case n = <-ep.netmapChan:
shouldResync := ep.shouldResync(n)
if !shouldResync {
continue
}
log.Printf("netmap change detected, ensuring firewall config is up to date...")
}
if err := ep.sync(ctx, n); err != nil {
return fmt.Errorf("error syncing egress service config: %w", err)
}
}
}
type egressProxyRunOpts struct {
cfgPath string
nfr linuxfw.NetfilterRunner
kc kubeclient.Client
tsClient *local.Client
stateSecret string
netmapChan chan ipn.Notify
podIPv4 string
tailnetAddrs []netip.Prefix
}
// applyOpts configures egress proxy using the provided options.
func (ep *egressProxy) configure(opts egressProxyRunOpts) {
ep.cfgPath = opts.cfgPath
ep.nfr = opts.nfr
ep.kc = opts.kc
ep.tsClient = opts.tsClient
ep.stateSecret = opts.stateSecret
ep.netmapChan = opts.netmapChan
ep.podIPv4 = opts.podIPv4
ep.tailnetAddrs = opts.tailnetAddrs
ep.client = &http.Client{} // default HTTP client
sleepDuration := time.Second
if d, err := time.ParseDuration(testSleepDuration); err == nil && d > 0 {
log.Printf("using test sleep duration %v", d)
sleepDuration = d
}
ep.shortSleep = sleepDuration
ep.longSleep = sleepDuration * 10
}
// sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if
// any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current
// firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such
// as failed firewall update
func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error {
cfgs, err := ep.getConfigs()
if err != nil {
return fmt.Errorf("error retrieving egress service configs: %w", err)
}
status, err := ep.getStatus(ctx)
if err != nil {
return fmt.Errorf("error retrieving current egress proxy status: %w", err)
}
newStatus, err := ep.syncEgressConfigs(cfgs, status, n)
if err != nil {
return fmt.Errorf("error syncing egress service configs: %w", err)
}
if !servicesStatusIsEqual(newStatus, status) {
if err := ep.setStatus(ctx, newStatus, n); err != nil {
return fmt.Errorf("error setting egress proxy status: %w", err)
}
}
return nil
}
// addrsHaveChanged returns true if the provided netmap update contains tailnet address change for this proxy node.
// Netmap must not be nil.
func (ep *egressProxy) addrsHaveChanged(n ipn.Notify) bool {
return !reflect.DeepEqual(ep.tailnetAddrs, n.NetMap.SelfNode.Addresses())
}
// syncEgressConfigs adds and deletes firewall rules to match the desired
// configuration. It uses the provided status to determine what is currently
// applied and updates the status after a successful sync.
func (ep *egressProxy) syncEgressConfigs(cfgs *egressservices.Configs, status *egressservices.Status, n ipn.Notify) (*egressservices.Status, error) {
if !(wantsServicesConfigured(cfgs) || hasServicesConfigured(status)) {
return nil, nil
}
// Delete unnecessary services.
if err := ep.deleteUnnecessaryServices(cfgs, status); err != nil {
return nil, fmt.Errorf("error deleting services: %w", err)
}
newStatus := &egressservices.Status{}
if !wantsServicesConfigured(cfgs) {
return newStatus, nil
}
// Add new services, update rules for any that have changed.
rulesPerSvcToAdd := make(map[string][]rule, 0)
rulesPerSvcToDelete := make(map[string][]rule, 0)
for svcName, cfg := range *cfgs {
tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, n)
if err != nil {
return nil, fmt.Errorf("error determining tailnet target IPs: %w", err)
}
rulesToAdd, rulesToDelete, err := updatesForCfg(svcName, cfg, status, tailnetTargetIPs)
if err != nil {
return nil, fmt.Errorf("error validating service changes: %v", err)
}
log.Printf("syncegressservices: looking at svc %s rulesToAdd %d rulesToDelete %d", svcName, len(rulesToAdd), len(rulesToDelete))
if len(rulesToAdd) != 0 {
mak.Set(&rulesPerSvcToAdd, svcName, rulesToAdd)
}
if len(rulesToDelete) != 0 {
mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete)
}
if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) {
// For each tailnet target, set up SNAT from the local tailnet device address of the matching
// family.
for _, t := range tailnetTargetIPs {
var local netip.Addr
for _, pfx := range n.NetMap.SelfNode.Addresses().All() {
if !pfx.IsSingleIP() {
continue
}
if pfx.Addr().Is4() != t.Is4() {
continue
}
local = pfx.Addr()
break
}
if !local.IsValid() {
return nil, fmt.Errorf("no valid local IP: %v", local)
}
if err := ep.nfr.EnsureSNATForDst(local, t); err != nil {
return nil, fmt.Errorf("error setting up SNAT rule: %w", err)
}
}
}
// Update the status. Status will be written back to the state Secret by the caller.
mak.Set(&newStatus.Services, svcName, &egressservices.ServiceStatus{TailnetTargetIPs: tailnetTargetIPs, TailnetTarget: cfg.TailnetTarget, Ports: cfg.Ports})
}
// Actually apply the firewall rules.
if err := ensureRulesAdded(rulesPerSvcToAdd, ep.nfr); err != nil {
return nil, fmt.Errorf("error adding rules: %w", err)
}
if err := ensureRulesDeleted(rulesPerSvcToDelete, ep.nfr); err != nil {
return nil, fmt.Errorf("error deleting rules: %w", err)
}
return newStatus, nil
}
// updatesForCfg calculates any rules that need to be added or deleted for an individucal egress service config.
func updatesForCfg(svcName string, cfg egressservices.Config, status *egressservices.Status, tailnetTargetIPs []netip.Addr) ([]rule, []rule, error) {
rulesToAdd := make([]rule, 0)
rulesToDelete := make([]rule, 0)
currentConfig, ok := lookupCurrentConfig(svcName, status)
// If no rules for service are present yet, add them all.
if !ok {
for _, t := range tailnetTargetIPs {
for ports := range cfg.Ports {
log.Printf("syncegressservices: svc %s adding port %v", svcName, ports)
rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: t})
}
}
return rulesToAdd, rulesToDelete, nil
}
// If there are no backend targets available, delete any currently configured rules.
if len(tailnetTargetIPs) == 0 {
log.Printf("tailnet target for egress service %s does not have any backend addresses, deleting all rules", svcName)
for _, ip := range currentConfig.TailnetTargetIPs {
for ports := range currentConfig.Ports {
rulesToDelete = append(rulesToDelete, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
}
return rulesToAdd, rulesToDelete, nil
}
// If there are rules present for backend targets that no longer match, delete them.
for _, ip := range currentConfig.TailnetTargetIPs {
var found bool
for _, wantsIP := range tailnetTargetIPs {
if reflect.DeepEqual(ip, wantsIP) {
found = true
break
}
}
if !found {
for ports := range currentConfig.Ports {
rulesToDelete = append(rulesToDelete, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
}
}
// Sync rules for the currently wanted backend targets.
for _, ip := range tailnetTargetIPs {
// If the backend target is not yet present in status, add all rules.
var found bool
for _, gotIP := range currentConfig.TailnetTargetIPs {
if reflect.DeepEqual(ip, gotIP) {
found = true
break
}
}
if !found {
for ports := range cfg.Ports {
rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
continue
}
// If the backend target is present in status, check that the
// currently applied rules are up to date.
// Delete any current portmappings that are no longer present in config.
for port := range currentConfig.Ports {
if _, ok := cfg.Ports[port]; ok {
continue
}
rulesToDelete = append(rulesToDelete, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip})
}
// Add any new portmappings.
for port := range cfg.Ports {
if _, ok := currentConfig.Ports[port]; ok {
continue
}
rulesToAdd = append(rulesToAdd, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip})
}
}
return rulesToAdd, rulesToDelete, nil
}
// deleteUnneccessaryServices ensure that any services found on status, but not
// present in config are deleted.
func (ep *egressProxy) deleteUnnecessaryServices(cfgs *egressservices.Configs, status *egressservices.Status) error {
if !hasServicesConfigured(status) {
return nil
}
if !wantsServicesConfigured(cfgs) {
for svcName, svc := range status.Services {
log.Printf("service %s is no longer required, deleting", svcName)
if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
}
return nil
}
for svcName, svc := range status.Services {
if _, ok := (*cfgs)[svcName]; !ok {
log.Printf("service %s is no longer required, deleting", svcName)
if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
// TODO (irbekrm): also delete the SNAT rule here
}
}
return nil
}
// getConfigs gets the mounted egress service configuration.
func (ep *egressProxy) getConfigs() (*egressservices.Configs, error) {
svcsCfg := filepath.Join(ep.cfgPath, egressservices.KeyEgressServices)
j, err := os.ReadFile(svcsCfg)
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
return nil, err
}
if len(j) == 0 || string(j) == "" {
return nil, nil
}
cfg := &egressservices.Configs{}
if err := json.Unmarshal(j, &cfg); err != nil {
return nil, err
}
return cfg, nil
}
// getStatus gets the current status of the configured firewall. The current
// status is stored in state Secret. Returns nil status if no status that
// applies to the current proxy Pod was found. Uses the Pod IP to determine if a
// status found in the state Secret applies to this proxy Pod.
func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, error) {
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return nil, fmt.Errorf("error retrieving state secret: %w", err)
}
status := &egressservices.Status{}
raw, ok := secret.Data[egressservices.KeyEgressServices]
if !ok {
return nil, nil
}
if err := json.Unmarshal([]byte(raw), status); err != nil {
return nil, fmt.Errorf("error unmarshalling previous config: %w", err)
}
if reflect.DeepEqual(status.PodIPv4, ep.podIPv4) {
return status, nil
}
return nil, nil
}
// setStatus writes egress proxy's currently configured firewall to the state
// Secret and updates proxy's tailnet addresses.
func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error {
// Pod IP is used to determine if a stored status applies to THIS proxy Pod.
if status == nil {
status = &egressservices.Status{}
}
status.PodIPv4 = ep.podIPv4
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return fmt.Errorf("error retrieving state Secret: %w", err)
}
bs, err := json.Marshal(status)
if err != nil {
return fmt.Errorf("error marshalling service config: %w", err)
}
secret.Data[egressservices.KeyEgressServices] = bs
patch := kubeclient.JSONPatch{
Op: "replace",
Path: fmt.Sprintf("/data/%s", egressservices.KeyEgressServices),
Value: bs,
}
if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil {
return fmt.Errorf("error patching state Secret: %w", err)
}
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice()
return nil
}
// tailnetTargetIPsForSvc returns the tailnet IPs to which traffic for this
// egress service should be proxied. The egress service can be configured by IP
// or by FQDN. If it's configured by IP, just return that. If it's configured by
// FQDN, resolve the FQDN and return the resolved IPs. It checks if the
// netfilter runner supports IPv6 NAT and skips any IPv6 addresses if it
// doesn't.
func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) {
if svc.TailnetTarget.IP != "" {
addr, err := netip.ParseAddr(svc.TailnetTarget.IP)
if err != nil {
return nil, fmt.Errorf("error parsing tailnet target IP: %w", err)
}
if addr.Is6() && !ep.nfr.HasIPV6NAT() {
log.Printf("tailnet target is an IPv6 address, but this host does not support IPv6 in the chosen firewall mode. This will probably not work.")
return addrs, nil
}
return []netip.Addr{addr}, nil
}
if svc.TailnetTarget.FQDN == "" {
return nil, errors.New("unexpected egress service config- neither tailnet target IP nor FQDN is set")
}
if n.NetMap == nil {
log.Printf("netmap is not available, unable to determine backend addresses for %s", svc.TailnetTarget.FQDN)
return addrs, nil
}
var (
node tailcfg.NodeView
nodeFound bool
)
for _, nn := range n.NetMap.Peers {
if equalFQDNs(nn.Name(), svc.TailnetTarget.FQDN) {
node = nn
nodeFound = true
break
}
}
if nodeFound {
for _, addr := range node.Addresses().AsSlice() {
if addr.Addr().Is6() && !ep.nfr.HasIPV6NAT() {
log.Printf("tailnet target %v is an IPv6 address, but this host does not support IPv6 in the chosen firewall mode, skipping.", addr.Addr().String())
continue
}
addrs = append(addrs, addr.Addr())
}
// Egress target endpoints configured via FQDN are stored, so
// that we can determine if a netmap update should trigger a
// resync.
mak.Set(&ep.targetFQDNs, svc.TailnetTarget.FQDN, node.Addresses().AsSlice())
}
return addrs, nil
}
// shouldResync parses netmap update and returns true if the update contains
// changes for which the egress proxy's firewall should be reconfigured.
func (ep *egressProxy) shouldResync(n ipn.Notify) bool {
if n.NetMap == nil {
return false
}
// If proxy's tailnet addresses have changed, resync.
if !reflect.DeepEqual(n.NetMap.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) {
log.Printf("node addresses have changed, trigger egress config resync")
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice()
return true
}
// If the IPs for any of the egress services configured via FQDN have
// changed, resync.
for fqdn, ips := range ep.targetFQDNs {
for _, nn := range n.NetMap.Peers {
if equalFQDNs(nn.Name(), fqdn) {
if !reflect.DeepEqual(ips, nn.Addresses().AsSlice()) {
log.Printf("backend addresses for egress target %q have changed old IPs %v, new IPs %v trigger egress config resync", nn.Name(), ips, nn.Addresses().AsSlice())
return true
}
break
}
}
}
return false
}
// ensureServiceDeleted ensures that any rules for an egress service are removed
// from the firewall configuration.
func ensureServiceDeleted(svcName string, svc *egressservices.ServiceStatus, nfr linuxfw.NetfilterRunner) error {
// Note that the portmap is needed for iptables based firewall only.
// Nftables group rules for a service in a chain, so there is no need to
// specify individual portmapping based rules.
pms := make([]linuxfw.PortMap, 0)
for pm := range svc.Ports {
pms = append(pms, linuxfw.PortMap{MatchPort: pm.MatchPort, TargetPort: pm.TargetPort, Protocol: pm.Protocol})
}
if err := nfr.DeleteSvc(svcName, tailscaleTunInterface, svc.TailnetTargetIPs, pms); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
return nil
}
// ensureRulesAdded ensures that all portmapping rules are added to the firewall
// configuration. For any rules that already exist, calling this function is a
// no-op. In case of nftables, a service consists of one or two (one per IP
// family) chains that conain the portmapping rules for the service and the
// chains as needed when this function is called.
func ensureRulesAdded(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc {
for _, rule := range rules {
log.Printf("ensureRulesAdded svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.EnsurePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error ensuring rule: %w", err)
}
}
}
return nil
}
// ensureRulesDeleted ensures that the given rules are deleted from the firewall
// configuration. For any rules that do not exist, calling this funcion is a
// no-op.
func ensureRulesDeleted(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc {
for _, rule := range rules {
log.Printf("ensureRulesDeleted svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.DeletePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error deleting rule: %w", err)
}
}
}
return nil
}
func lookupCurrentConfig(svcName string, status *egressservices.Status) (*egressservices.ServiceStatus, bool) {
if status == nil || len(status.Services) == 0 {
return nil, false
}
c, ok := status.Services[svcName]
return c, ok
}
func equalFQDNs(s, s1 string) bool {
s, _ = strings.CutSuffix(s, ".")
s1, _ = strings.CutSuffix(s1, ".")
return strings.EqualFold(s, s1)
}
// rule contains configuration for an egress proxy firewall rule.
type rule struct {
containerPort uint16 // port to match incoming traffic
tailnetPort uint16 // tailnet service port
tailnetIP netip.Addr // tailnet service IP
protocol string
}
func wantsServicesConfigured(cfgs *egressservices.Configs) bool {
return cfgs != nil && len(*cfgs) != 0
}
func hasServicesConfigured(status *egressservices.Status) bool {
return status != nil && len(status.Services) != 0
}
func servicesStatusIsEqual(st, st1 *egressservices.Status) bool {
if st == nil && st1 == nil {
return true
}
if st == nil || st1 == nil {
return false
}
st.PodIPv4 = ""
st1.PodIPv4 = ""
return reflect.DeepEqual(*st, *st1)
}
// registerHandlers adds a new handler to the provided ServeMux that can be called as a Kubernetes prestop hook to
// delay shutdown till it's safe to do so.
func (ep *egressProxy) registerHandlers(mux *http.ServeMux) {
mux.Handle(fmt.Sprintf("GET %s", kubetypes.EgessServicesPreshutdownEP), ep)
}
// ServeHTTP serves /internal-egress-services-preshutdown endpoint, when it receives a request, it periodically polls
// the configured health check endpoint for each egress service till it the health check endpoint no longer hits this
// proxy Pod. It uses the Pod-IPv4 header to verify if health check response is received from this Pod.
func (ep *egressProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cfgs, err := ep.getConfigs()
if err != nil {
http.Error(w, fmt.Sprintf("error retrieving egress services configs: %v", err), http.StatusInternalServerError)
return
}
if cfgs == nil {
if _, err := w.Write([]byte("safe to terminate")); err != nil {
http.Error(w, fmt.Sprintf("error writing termination status: %v", err), http.StatusInternalServerError)
}
return
}
hp, err := ep.getHEPPings()
if err != nil {
http.Error(w, fmt.Sprintf("error determining the number of times health check endpoint should be pinged: %v", err), http.StatusInternalServerError)
return
}
ep.waitTillSafeToShutdown(r.Context(), cfgs, hp)
}
// waitTillSafeToShutdown looks up all egress targets configured to be proxied via this instance and, for each target
// whose configuration includes a healthcheck endpoint, pings the endpoint till none of the responses
// are returned by this instance or till the HTTP request times out. In practice, the endpoint will be a Kubernetes Service for whom one of the backends
// would normally be this Pod. When this Pod is being deleted, the operator should have removed it from the Service
// backends and eventually kube proxy routing rules should be updated to no longer route traffic for the Service to this
// Pod.
func (ep *egressProxy) waitTillSafeToShutdown(ctx context.Context, cfgs *egressservices.Configs, hp int) {
if cfgs == nil || len(*cfgs) == 0 { // avoid sleeping if no services are configured
return
}
log.Printf("Ensuring that cluster traffic for egress targets is no longer routed via this Pod...")
wg := syncs.WaitGroup{}
for s, cfg := range *cfgs {
hep := cfg.HealthCheckEndpoint
if hep == "" {
log.Printf("Tailnet target %q does not have a cluster healthcheck specified, unable to verify if cluster traffic for the target is still routed via this Pod", s)
continue
}
svc := s
wg.Go(func() {
log.Printf("Ensuring that cluster traffic is no longer routed to %q via this Pod...", svc)
for {
if ctx.Err() != nil { // kubelet's HTTP request timeout
log.Printf("Cluster traffic for %s did not stop being routed to this Pod.", svc)
return
}
found, err := lookupPodRoute(ctx, hep, ep.podIPv4, hp, ep.client)
if err != nil {
log.Printf("unable to reach endpoint %q, assuming the routing rules for this Pod have been deleted: %v", hep, err)
break
}
if !found {
log.Printf("service %q is no longer routed through this Pod", svc)
break
}
log.Printf("service %q is still routed through this Pod, waiting...", svc)
time.Sleep(ep.shortSleep)
}
})
}
wg.Wait()
// The check above really only checked that the routing rules are updated on this node. Sleep for a bit to
// ensure that the routing rules are updated on other nodes. TODO(irbekrm): this may or may not be good enough.
// If it's not good enough, we'd probably want to do something more complex, where the proxies check each other.
log.Printf("Sleeping for %s before shutdown to ensure that kube proxies on all nodes have updated routing configuration", ep.longSleep)
time.Sleep(ep.longSleep)
}
// lookupPodRoute calls the healthcheck endpoint repeat times and returns true if the endpoint returns with the podIP
// header at least once.
func lookupPodRoute(ctx context.Context, hep, podIP string, repeat int, client httpClient) (bool, error) {
for range repeat {
f, err := lookup(ctx, hep, podIP, client)
if err != nil {
return false, err
}
if f {
return true, nil
}
}
return false, nil
}
// lookup calls the healthcheck endpoint and returns true if the response contains the podIP header.
func lookup(ctx context.Context, hep, podIP string, client httpClient) (bool, error) {
req, err := http.NewRequestWithContext(ctx, httpm.GET, hep, nil)
if err != nil {
return false, fmt.Errorf("error creating new HTTP request: %v", err)
}
// Close the TCP connection to ensure that the next request is routed to a different backend.
req.Close = true
resp, err := client.Do(req)
if err != nil {
log.Printf("Endpoint %q can not be reached: %v, likely because there are no (more) healthy backends", hep, err)
return true, nil
}
defer resp.Body.Close()
gotIP := resp.Header.Get(kubetypes.PodIPv4Header)
return strings.EqualFold(podIP, gotIP), nil
}
// getHEPPings gets the number of pings that should be sent to a health check endpoint to ensure that each configured
// backend is hit. This assumes that a health check endpoint is a Kubernetes Service and traffic to backend Pods is
// round robin load balanced.
func (ep *egressProxy) getHEPPings() (int, error) {
hepPingsPath := filepath.Join(ep.cfgPath, egressservices.KeyHEPPings)
j, err := os.ReadFile(hepPingsPath)
if os.IsNotExist(err) {
return 0, nil
}
if err != nil {
return -1, err
}
if len(j) == 0 || string(j) == "" {
return 0, nil
}
hp, err := strconv.Atoi(string(j))
if err != nil {
return -1, fmt.Errorf("error parsing hep pings as int: %v", err)
}
if hp < 0 {
log.Printf("[unexpected] hep pings is negative: %d", hp)
return 0, nil
}
return hp, nil
}

View File

@ -0,0 +1,329 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/netip"
"os"
"path/filepath"
"reflect"
"time"
"github.com/fsnotify/fsnotify"
"tailscale.com/kube/ingressservices"
"tailscale.com/kube/kubeclient"
"tailscale.com/util/linuxfw"
"tailscale.com/util/mak"
)
// ingressProxy corresponds to a Kubernetes Operator's network layer ingress
// proxy. It configures firewall rules (iptables or nftables) to proxy tailnet
// traffic to Kubernetes Services. Currently this is only used for network
// layer proxies in HA mode.
type ingressProxy struct {
cfgPath string // path to ingress configfile.
// nfr is the netfilter runner used to configure firewall rules.
// This is going to be either iptables or nftables based runner.
// Never nil.
nfr linuxfw.NetfilterRunner
kc kubeclient.Client // never nil
stateSecret string // Secret that holds Tailscale state
// Pod's IP addresses are used as an identifier of this particular Pod.
podIPv4 string // empty if Pod does not have IPv4 address
podIPv6 string // empty if Pod does not have IPv6 address
}
// run starts the ingress proxy and ensures that firewall rules are set on start
// and refreshed as ingress config changes.
func (ep *ingressProxy) run(ctx context.Context, opts ingressProxyOpts) error {
log.Printf("starting ingress proxy...")
ep.configure(opts)
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
defer w.Close()
dir := filepath.Dir(ep.cfgPath)
if err := w.Add(dir); err != nil {
return fmt.Errorf("failed to add fsnotify watch for %v: %w", dir, err)
}
eventChan = w.Events
}
if err := ep.sync(ctx); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case <-tickChan:
log.Printf("periodic sync, ensuring firewall config is up to date...")
case <-eventChan:
log.Printf("config file change detected, ensuring firewall config is up to date...")
}
if err := ep.sync(ctx); err != nil {
return fmt.Errorf("error syncing ingress service config: %w", err)
}
}
}
// sync reconciles proxy's firewall rules (iptables or nftables) on ingress config changes:
// - ensures that new firewall rules are added
// - ensures that old firewall rules are deleted
// - updates ingress proxy's status in the state Secret
func (ep *ingressProxy) sync(ctx context.Context) error {
// 1. Get the desired firewall configuration
cfgs, err := ep.getConfigs()
if err != nil {
return fmt.Errorf("ingress proxy: error retrieving configs: %w", err)
}
// 2. Get the recorded firewall status
status, err := ep.getStatus(ctx)
if err != nil {
return fmt.Errorf("ingress proxy: error retrieving current status: %w", err)
}
// 3. Ensure that firewall configuration is up to date
if err := ep.syncIngressConfigs(cfgs, status); err != nil {
return fmt.Errorf("ingress proxy: error syncing configs: %w", err)
}
var existingConfigs *ingressservices.Configs
if status != nil {
existingConfigs = &status.Configs
}
// 4. Update the recorded firewall status
if !(ingressServicesStatusIsEqual(cfgs, existingConfigs) && ep.isCurrentStatus(status)) {
if err := ep.recordStatus(ctx, cfgs); err != nil {
return fmt.Errorf("ingress proxy: error setting status: %w", err)
}
}
return nil
}
// getConfigs returns the desired ingress service configuration from the mounted
// configfile.
func (ep *ingressProxy) getConfigs() (*ingressservices.Configs, error) {
j, err := os.ReadFile(ep.cfgPath)
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
return nil, err
}
if len(j) == 0 || string(j) == "" {
return nil, nil
}
cfg := &ingressservices.Configs{}
if err := json.Unmarshal(j, &cfg); err != nil {
return nil, err
}
return cfg, nil
}
// getStatus gets the recorded status of the configured firewall. The status is
// stored in the proxy's state Secret. Note that the recorded status might not
// be the current status of the firewall if it belongs to a previous Pod- we
// take that into account further down the line when determining if the desired
// rules are actually present.
func (ep *ingressProxy) getStatus(ctx context.Context) (*ingressservices.Status, error) {
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return nil, fmt.Errorf("error retrieving state secret: %w", err)
}
status := &ingressservices.Status{}
raw, ok := secret.Data[ingressservices.IngressConfigKey]
if !ok {
return nil, nil
}
if err := json.Unmarshal([]byte(raw), status); err != nil {
return nil, fmt.Errorf("error unmarshalling previous config: %w", err)
}
return status, nil
}
// syncIngressConfigs takes the desired firewall configuration and the recorded
// status and ensures that any missing rules are added and no longer needed
// rules are deleted.
func (ep *ingressProxy) syncIngressConfigs(cfgs *ingressservices.Configs, status *ingressservices.Status) error {
rulesToAdd := ep.getRulesToAdd(cfgs, status)
rulesToDelete := ep.getRulesToDelete(cfgs, status)
if err := ensureIngressRulesDeleted(rulesToDelete, ep.nfr); err != nil {
return fmt.Errorf("error deleting ingress rules: %w", err)
}
if err := ensureIngressRulesAdded(rulesToAdd, ep.nfr); err != nil {
return fmt.Errorf("error adding ingress rules: %w", err)
}
return nil
}
// recordStatus writes the configured firewall status to the proxy's state
// Secret. This allows the Kubernetes Operator to determine whether this proxy
// Pod has setup firewall rules to route traffic for an ingress service.
func (ep *ingressProxy) recordStatus(ctx context.Context, newCfg *ingressservices.Configs) error {
status := &ingressservices.Status{}
if newCfg != nil {
status.Configs = *newCfg
}
// Pod IPs are used to determine if recorded status applies to THIS proxy Pod.
status.PodIPv4 = ep.podIPv4
status.PodIPv6 = ep.podIPv6
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return fmt.Errorf("error retrieving state Secret: %w", err)
}
bs, err := json.Marshal(status)
if err != nil {
return fmt.Errorf("error marshalling status: %w", err)
}
secret.Data[ingressservices.IngressConfigKey] = bs
patch := kubeclient.JSONPatch{
Op: "replace",
Path: fmt.Sprintf("/data/%s", ingressservices.IngressConfigKey),
Value: bs,
}
if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil {
return fmt.Errorf("error patching state Secret: %w", err)
}
return nil
}
// getRulesToAdd takes the desired firewall configuration and the recorded
// firewall status and returns a map of missing VIPServices and rules.
func (ep *ingressProxy) getRulesToAdd(cfgs *ingressservices.Configs, status *ingressservices.Status) map[string]ingressservices.Config {
if cfgs == nil {
return nil
}
var rulesToAdd map[string]ingressservices.Config
for vipSvc, wantsCfg := range *cfgs {
if status == nil || !ep.isCurrentStatus(status) {
mak.Set(&rulesToAdd, vipSvc, wantsCfg)
continue
}
gotCfg := status.Configs.GetConfig(vipSvc)
if gotCfg == nil || !reflect.DeepEqual(wantsCfg, *gotCfg) {
mak.Set(&rulesToAdd, vipSvc, wantsCfg)
}
}
return rulesToAdd
}
// getRulesToDelete takes the desired firewall configuration and the recorded
// status and returns a map of VIPServices and rules that need to be deleted.
func (ep *ingressProxy) getRulesToDelete(cfgs *ingressservices.Configs, status *ingressservices.Status) map[string]ingressservices.Config {
if status == nil || !ep.isCurrentStatus(status) {
return nil
}
var rulesToDelete map[string]ingressservices.Config
for vipSvc, gotCfg := range status.Configs {
if cfgs == nil {
mak.Set(&rulesToDelete, vipSvc, gotCfg)
continue
}
wantsCfg := cfgs.GetConfig(vipSvc)
if wantsCfg != nil && reflect.DeepEqual(*wantsCfg, gotCfg) {
continue
}
mak.Set(&rulesToDelete, vipSvc, gotCfg)
}
return rulesToDelete
}
// ensureIngressRulesAdded takes a map of VIPServices and rules and ensures that the firewall rules are added.
func ensureIngressRulesAdded(cfgs map[string]ingressservices.Config, nfr linuxfw.NetfilterRunner) error {
for serviceName, cfg := range cfgs {
f := func(svcName string, vipIP, clusterIP netip.Addr) error {
log.Printf("ensureIngressRulesAdded VIPService %s with IP %s to cluster IP %s", serviceName, vipIP, clusterIP)
return nfr.EnsureDNATRuleForSvc(svcName, vipIP, clusterIP)
}
if cfg.IPv4Mapping != nil {
if err := f(serviceName, cfg.IPv4Mapping.VIPServiceIP, cfg.IPv4Mapping.ClusterIP); err != nil {
return fmt.Errorf("error adding ingress rule for %s: %w", serviceName, err)
}
}
if cfg.IPv6Mapping != nil {
if err := f(serviceName, cfg.IPv6Mapping.VIPServiceIP, cfg.IPv6Mapping.ClusterIP); err != nil {
return fmt.Errorf("error adding ingress rule for %s: %w", serviceName, err)
}
}
}
return nil
}
// ensureIngressRulesDeleted takes a map of VIPServices and rules and ensures that the firewall rules are deleted.
func ensureIngressRulesDeleted(cfgs map[string]ingressservices.Config, nfr linuxfw.NetfilterRunner) error {
for serviceName, cfg := range cfgs {
f := func(svcName string, vipIP, clusterIP netip.Addr) error {
log.Printf("ensureIngressRulesDeleted VIPService %s with IP %s to cluster IP %s", serviceName, vipIP, clusterIP)
return nfr.DeleteDNATRuleForSvc(svcName, vipIP, clusterIP)
}
if cfg.IPv4Mapping != nil {
if err := f(serviceName, cfg.IPv4Mapping.VIPServiceIP, cfg.IPv4Mapping.ClusterIP); err != nil {
return fmt.Errorf("error deleting ingress rule for %s: %w", serviceName, err)
}
}
if cfg.IPv6Mapping != nil {
if err := f(serviceName, cfg.IPv6Mapping.VIPServiceIP, cfg.IPv6Mapping.ClusterIP); err != nil {
return fmt.Errorf("error deleting ingress rule for %s: %w", serviceName, err)
}
}
}
return nil
}
// isCurrentStatus returns true if the status of an ingress proxy as read from
// the proxy's state Secret is the status of the current proxy Pod. We use
// Pod's IP addresses to determine that the status is for this Pod.
func (ep *ingressProxy) isCurrentStatus(status *ingressservices.Status) bool {
if status == nil {
return true
}
return status.PodIPv4 == ep.podIPv4 && status.PodIPv6 == ep.podIPv6
}
type ingressProxyOpts struct {
cfgPath string
nfr linuxfw.NetfilterRunner // never nil
kc kubeclient.Client // never nil
stateSecret string
podIPv4 string
podIPv6 string
}
// configure sets the ingress proxy's configuration. It is called once on start
// so we don't care about concurrent access to fields.
func (ep *ingressProxy) configure(opts ingressProxyOpts) {
ep.cfgPath = opts.cfgPath
ep.nfr = opts.nfr
ep.kc = opts.kc
ep.stateSecret = opts.stateSecret
ep.podIPv4 = opts.podIPv4
ep.podIPv6 = opts.podIPv6
}
func ingressServicesStatusIsEqual(st, st1 *ingressservices.Configs) bool {
if st == nil && st1 == nil {
return true
}
if st == nil || st1 == nil {
return false
}
return reflect.DeepEqual(*st, *st1)
}

View File

@ -0,0 +1,223 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
import (
"net/netip"
"testing"
"tailscale.com/kube/ingressservices"
"tailscale.com/util/linuxfw"
)
func TestSyncIngressConfigs(t *testing.T) {
tests := []struct {
name string
currentConfigs *ingressservices.Configs
currentStatus *ingressservices.Status
wantServices map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}
}{
{
name: "add_new_rules_when_no_existing_config",
currentConfigs: &ingressservices.Configs{
"svc:foo": makeServiceConfig("100.64.0.1", "10.0.0.1", "", ""),
},
currentStatus: nil,
wantServices: map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{
"svc:foo": makeWantService("100.64.0.1", "10.0.0.1"),
},
},
{
name: "add_multiple_services",
currentConfigs: &ingressservices.Configs{
"svc:foo": makeServiceConfig("100.64.0.1", "10.0.0.1", "", ""),
"svc:bar": makeServiceConfig("100.64.0.2", "10.0.0.2", "", ""),
"svc:baz": makeServiceConfig("100.64.0.3", "10.0.0.3", "", ""),
},
currentStatus: nil,
wantServices: map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{
"svc:foo": makeWantService("100.64.0.1", "10.0.0.1"),
"svc:bar": makeWantService("100.64.0.2", "10.0.0.2"),
"svc:baz": makeWantService("100.64.0.3", "10.0.0.3"),
},
},
{
name: "add_both_ipv4_and_ipv6_rules",
currentConfigs: &ingressservices.Configs{
"svc:foo": makeServiceConfig("100.64.0.1", "10.0.0.1", "2001:db8::1", "2001:db8::2"),
},
currentStatus: nil,
wantServices: map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{
"svc:foo": makeWantService("2001:db8::1", "2001:db8::2"),
},
},
{
name: "add_ipv6_only_rules",
currentConfigs: &ingressservices.Configs{
"svc:ipv6": makeServiceConfig("", "", "2001:db8::10", "2001:db8::20"),
},
currentStatus: nil,
wantServices: map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{
"svc:ipv6": makeWantService("2001:db8::10", "2001:db8::20"),
},
},
{
name: "delete_all_rules_when_config_removed",
currentConfigs: nil,
currentStatus: &ingressservices.Status{
Configs: ingressservices.Configs{
"svc:foo": makeServiceConfig("100.64.0.1", "10.0.0.1", "", ""),
"svc:bar": makeServiceConfig("100.64.0.2", "10.0.0.2", "", ""),
},
PodIPv4: "10.0.0.2", // Current pod IPv4
PodIPv6: "2001:db8::2", // Current pod IPv6
},
wantServices: map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{},
},
{
name: "add_remove_modify",
currentConfigs: &ingressservices.Configs{
"svc:foo": makeServiceConfig("100.64.0.1", "10.0.0.2", "", ""), // Changed cluster IP
"svc:new": makeServiceConfig("100.64.0.4", "10.0.0.4", "", ""),
},
currentStatus: &ingressservices.Status{
Configs: ingressservices.Configs{
"svc:foo": makeServiceConfig("100.64.0.1", "10.0.0.1", "", ""),
"svc:bar": makeServiceConfig("100.64.0.2", "10.0.0.2", "", ""),
"svc:baz": makeServiceConfig("100.64.0.3", "10.0.0.3", "", ""),
},
PodIPv4: "10.0.0.2", // Current pod IPv4
PodIPv6: "2001:db8::2", // Current pod IPv6
},
wantServices: map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{
"svc:foo": makeWantService("100.64.0.1", "10.0.0.2"),
"svc:new": makeWantService("100.64.0.4", "10.0.0.4"),
},
},
{
name: "update_with_outdated_status",
currentConfigs: &ingressservices.Configs{
"svc:web": makeServiceConfig("100.64.0.10", "10.0.0.10", "", ""),
"svc:web-ipv6": {
IPv6Mapping: &ingressservices.Mapping{
VIPServiceIP: netip.MustParseAddr("2001:db8::10"),
ClusterIP: netip.MustParseAddr("2001:db8::20"),
},
},
"svc:api": makeServiceConfig("100.64.0.20", "10.0.0.20", "", ""),
},
currentStatus: &ingressservices.Status{
Configs: ingressservices.Configs{
"svc:web": makeServiceConfig("100.64.0.10", "10.0.0.10", "", ""),
"svc:web-ipv6": {
IPv6Mapping: &ingressservices.Mapping{
VIPServiceIP: netip.MustParseAddr("2001:db8::10"),
ClusterIP: netip.MustParseAddr("2001:db8::20"),
},
},
"svc:old": makeServiceConfig("100.64.0.30", "10.0.0.30", "", ""),
},
PodIPv4: "10.0.0.1", // Outdated pod IP
PodIPv6: "2001:db8::1", // Outdated pod IP
},
wantServices: map[string]struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{
"svc:web": makeWantService("100.64.0.10", "10.0.0.10"),
"svc:web-ipv6": makeWantService("2001:db8::10", "2001:db8::20"),
"svc:api": makeWantService("100.64.0.20", "10.0.0.20"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var nfr linuxfw.NetfilterRunner = linuxfw.NewFakeNetfilterRunner()
ep := &ingressProxy{
nfr: nfr,
podIPv4: "10.0.0.2", // Current pod IPv4
podIPv6: "2001:db8::2", // Current pod IPv6
}
err := ep.syncIngressConfigs(tt.currentConfigs, tt.currentStatus)
if err != nil {
t.Fatalf("syncIngressConfigs failed: %v", err)
}
fake := nfr.(*linuxfw.FakeNetfilterRunner)
gotServices := fake.GetServiceState()
if len(gotServices) != len(tt.wantServices) {
t.Errorf("got %d services, want %d", len(gotServices), len(tt.wantServices))
}
for svc, want := range tt.wantServices {
got, ok := gotServices[svc]
if !ok {
t.Errorf("service %s not found", svc)
continue
}
if got.VIPServiceIP != want.VIPServiceIP {
t.Errorf("service %s: got VIPServiceIP %v, want %v", svc, got.VIPServiceIP, want.VIPServiceIP)
}
if got.ClusterIP != want.ClusterIP {
t.Errorf("service %s: got ClusterIP %v, want %v", svc, got.ClusterIP, want.ClusterIP)
}
}
})
}
}
func makeServiceConfig(vipIP, clusterIP string, vipIP6, clusterIP6 string) ingressservices.Config {
cfg := ingressservices.Config{}
if vipIP != "" && clusterIP != "" {
cfg.IPv4Mapping = &ingressservices.Mapping{
VIPServiceIP: netip.MustParseAddr(vipIP),
ClusterIP: netip.MustParseAddr(clusterIP),
}
}
if vipIP6 != "" && clusterIP6 != "" {
cfg.IPv6Mapping = &ingressservices.Mapping{
VIPServiceIP: netip.MustParseAddr(vipIP6),
ClusterIP: netip.MustParseAddr(clusterIP6),
}
}
return cfg
}
func makeWantService(vipIP, clusterIP string) struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
} {
return struct {
VIPServiceIP netip.Addr
ClusterIP netip.Addr
}{
VIPServiceIP: netip.MustParseAddr(vipIP),
ClusterIP: netip.MustParseAddr(clusterIP),
}
}

View File

@ -441,6 +441,7 @@ authLoop:
// egressSvcsErrorChan will get an error sent to it if this containerboot instance is configured to expose 1+
// egress services in HA mode and errored.
var egressSvcsErrorChan = make(chan error)
var ingressSvcsErrorChan = make(chan error)
defer t.Stop()
// resetTimer resets timer for when to next attempt to resolve the DNS
// name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The
@ -694,6 +695,23 @@ runLoop:
}
}()
}
ip := ingressProxy{}
if cfg.IngressProxiesCfgPath != "" {
log.Printf("configuring ingress proxy using configuration file at %s", cfg.IngressProxiesCfgPath)
opts := ingressProxyOpts{
cfgPath: cfg.IngressProxiesCfgPath,
nfr: nfr,
kc: kc,
stateSecret: cfg.KubeSecret,
podIPv4: cfg.PodIPv4,
podIPv6: cfg.PodIPv6,
}
go func() {
if err := ip.run(ctx, opts); err != nil {
ingressSvcsErrorChan <- err
}
}()
}
// Wait on tailscaled process. It won't be cleaned up by default when the
// container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the
@ -738,6 +756,8 @@ runLoop:
resetTimer(false)
case e := <-egressSvcsErrorChan:
return fmt.Errorf("egress proxy failed: %v", e)
case e := <-ingressSvcsErrorChan:
return fmt.Errorf("ingress proxy failed: %v", e)
}
}
wg.Wait()

View File

@ -9,7 +9,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
@ -170,46 +169,3 @@ func readServeConfig(path, certDomain string) (*ipn.ServeConfig, error) {
}
return &sc, nil
}
func ensureServicesNotAdvertised(ctx context.Context, lc *local.Client) error {
prefs, err := lc.GetPrefs(ctx)
if err != nil {
return fmt.Errorf("error getting prefs: %w", err)
}
if len(prefs.AdvertiseServices) == 0 {
return nil
}
log.Printf("serve proxy: unadvertising services: %v", prefs.AdvertiseServices)
if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseServicesSet: true,
Prefs: ipn.Prefs{
AdvertiseServices: nil,
},
}); err != nil {
// EditPrefs only returns an error if it fails _set_ its local prefs.
// If it fails to _persist_ the prefs in state, we don't get an error
// and we continue waiting below, as control will failover as usual.
return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
}
// Services use the same (failover XOR regional routing) mechanism that
// HA subnet routers use. Unfortunately we don't yet get a reliable signal
// from control that it's responded to our unadvertisement, so the best we
// can do is wait for 20 seconds, where 15s is the approximate maximum time
// it should take for control to choose a new primary, and 5s is for buffer.
//
// Note: There is no guarantee that clients have been _informed_ of the new
// primary no matter how long we wait. We would need a mechanism to await
// netmap updates for peers to know for sure.
//
// See https://tailscale.com/kb/1115/high-availability for more details.
// TODO(tomhjp): Wait for a netmap update instead of sleeping when control
// supports that.
select {
case <-ctx.Done():
return nil
case <-time.After(20 * time.Second):
return nil
}
}

View File

@ -7,759 +7,57 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"net/netip"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"tailscale.com/client/local"
"tailscale.com/ipn"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/kubeclient"
"tailscale.com/kube/kubetypes"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/util/httpm"
"tailscale.com/util/linuxfw"
"tailscale.com/util/mak"
)
const tailscaleTunInterface = "tailscale0"
// Modified using a build flag to speed up tests.
var testSleepDuration string
// This file contains functionality to run containerboot as a proxy that can
// route cluster traffic to one or more tailnet targets, based on portmapping
// rules read from a configfile. Currently (9/2024) this is only used for the
// Kubernetes operator egress proxies.
// egressProxy knows how to configure firewall rules to route cluster traffic to
// one or more tailnet services.
type egressProxy struct {
cfgPath string // path to a directory with egress services config files
nfr linuxfw.NetfilterRunner // never nil
kc kubeclient.Client // never nil
stateSecret string // name of the kube state Secret
tsClient *local.Client // never nil
netmapChan chan ipn.Notify // chan to receive netmap updates on
podIPv4 string // never empty string, currently only IPv4 is supported
// tailnetFQDNs is the egress service FQDN to tailnet IP mappings that
// were last used to configure firewall rules for this proxy.
// TODO(irbekrm): target addresses are also stored in the state Secret.
// Evaluate whether we should retrieve them from there and not store in
// memory at all.
targetFQDNs map[string][]netip.Prefix
tailnetAddrs []netip.Prefix // tailnet IPs of this tailnet device
// shortSleep is the backoff sleep between healthcheck endpoint calls - can be overridden in tests.
shortSleep time.Duration
// longSleep is the time to sleep after the routing rules are updated to increase the chance that kube
// proxies on all nodes have updated their routing configuration. It can be configured to 0 in
// tests.
longSleep time.Duration
// client is a client that can send HTTP requests.
client httpClient
}
// httpClient is a client that can send HTTP requests and can be mocked in tests.
type httpClient interface {
Do(*http.Request) (*http.Response, error)
}
// run configures egress proxy firewall rules and ensures that the firewall rules are reconfigured when:
// - the mounted egress config has changed
// - the proxy's tailnet IP addresses have changed
// - tailnet IPs have changed for any backend targets specified by tailnet FQDN
func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRunOpts) error {
ep.configure(opts)
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
// TODO (irbekrm): take a look if this can be pulled into a single func
// shared with serve config loader.
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
defer w.Close()
if err := w.Add(ep.cfgPath); err != nil {
return fmt.Errorf("failed to add fsnotify watch: %w", err)
}
eventChan = w.Events
}
if err := ep.sync(ctx, n); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case <-tickChan:
log.Printf("periodic sync, ensuring firewall config is up to date...")
case <-eventChan:
log.Printf("config file change detected, ensuring firewall config is up to date...")
case n = <-ep.netmapChan:
shouldResync := ep.shouldResync(n)
if !shouldResync {
continue
}
log.Printf("netmap change detected, ensuring firewall config is up to date...")
}
if err := ep.sync(ctx, n); err != nil {
return fmt.Errorf("error syncing egress service config: %w", err)
}
}
}
type egressProxyRunOpts struct {
cfgPath string
nfr linuxfw.NetfilterRunner
kc kubeclient.Client
tsClient *local.Client
stateSecret string
netmapChan chan ipn.Notify
podIPv4 string
tailnetAddrs []netip.Prefix
}
// applyOpts configures egress proxy using the provided options.
func (ep *egressProxy) configure(opts egressProxyRunOpts) {
ep.cfgPath = opts.cfgPath
ep.nfr = opts.nfr
ep.kc = opts.kc
ep.tsClient = opts.tsClient
ep.stateSecret = opts.stateSecret
ep.netmapChan = opts.netmapChan
ep.podIPv4 = opts.podIPv4
ep.tailnetAddrs = opts.tailnetAddrs
ep.client = &http.Client{} // default HTTP client
sleepDuration := time.Second
if d, err := time.ParseDuration(testSleepDuration); err == nil && d > 0 {
log.Printf("using test sleep duration %v", d)
sleepDuration = d
}
ep.shortSleep = sleepDuration
ep.longSleep = sleepDuration * 10
}
// sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if
// any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current
// firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such
// as failed firewall update
func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error {
cfgs, err := ep.getConfigs()
// ensureServicesNotAdvertised is a function that gets called on containerboot
// termination and ensures that any currently advertised VIPServices get
// unadvertised to give clients time to switch to another node before this one
// is shut down.
func ensureServicesNotAdvertised(ctx context.Context, lc *local.Client) error {
prefs, err := lc.GetPrefs(ctx)
if err != nil {
return fmt.Errorf("error retrieving egress service configs: %w", err)
return fmt.Errorf("error getting prefs: %w", err)
}
status, err := ep.getStatus(ctx)
if err != nil {
return fmt.Errorf("error retrieving current egress proxy status: %w", err)
}
newStatus, err := ep.syncEgressConfigs(cfgs, status, n)
if err != nil {
return fmt.Errorf("error syncing egress service configs: %w", err)
}
if !servicesStatusIsEqual(newStatus, status) {
if err := ep.setStatus(ctx, newStatus, n); err != nil {
return fmt.Errorf("error setting egress proxy status: %w", err)
}
}
return nil
}
// addrsHaveChanged returns true if the provided netmap update contains tailnet address change for this proxy node.
// Netmap must not be nil.
func (ep *egressProxy) addrsHaveChanged(n ipn.Notify) bool {
return !reflect.DeepEqual(ep.tailnetAddrs, n.NetMap.SelfNode.Addresses())
}
// syncEgressConfigs adds and deletes firewall rules to match the desired
// configuration. It uses the provided status to determine what is currently
// applied and updates the status after a successful sync.
func (ep *egressProxy) syncEgressConfigs(cfgs *egressservices.Configs, status *egressservices.Status, n ipn.Notify) (*egressservices.Status, error) {
if !(wantsServicesConfigured(cfgs) || hasServicesConfigured(status)) {
return nil, nil
}
// Delete unnecessary services.
if err := ep.deleteUnnecessaryServices(cfgs, status); err != nil {
return nil, fmt.Errorf("error deleting services: %w", err)
}
newStatus := &egressservices.Status{}
if !wantsServicesConfigured(cfgs) {
return newStatus, nil
}
// Add new services, update rules for any that have changed.
rulesPerSvcToAdd := make(map[string][]rule, 0)
rulesPerSvcToDelete := make(map[string][]rule, 0)
for svcName, cfg := range *cfgs {
tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, n)
if err != nil {
return nil, fmt.Errorf("error determining tailnet target IPs: %w", err)
}
rulesToAdd, rulesToDelete, err := updatesForCfg(svcName, cfg, status, tailnetTargetIPs)
if err != nil {
return nil, fmt.Errorf("error validating service changes: %v", err)
}
log.Printf("syncegressservices: looking at svc %s rulesToAdd %d rulesToDelete %d", svcName, len(rulesToAdd), len(rulesToDelete))
if len(rulesToAdd) != 0 {
mak.Set(&rulesPerSvcToAdd, svcName, rulesToAdd)
}
if len(rulesToDelete) != 0 {
mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete)
}
if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) {
// For each tailnet target, set up SNAT from the local tailnet device address of the matching
// family.
for _, t := range tailnetTargetIPs {
var local netip.Addr
for _, pfx := range n.NetMap.SelfNode.Addresses().All() {
if !pfx.IsSingleIP() {
continue
}
if pfx.Addr().Is4() != t.Is4() {
continue
}
local = pfx.Addr()
break
}
if !local.IsValid() {
return nil, fmt.Errorf("no valid local IP: %v", local)
}
if err := ep.nfr.EnsureSNATForDst(local, t); err != nil {
return nil, fmt.Errorf("error setting up SNAT rule: %w", err)
}
}
}
// Update the status. Status will be written back to the state Secret by the caller.
mak.Set(&newStatus.Services, svcName, &egressservices.ServiceStatus{TailnetTargetIPs: tailnetTargetIPs, TailnetTarget: cfg.TailnetTarget, Ports: cfg.Ports})
}
// Actually apply the firewall rules.
if err := ensureRulesAdded(rulesPerSvcToAdd, ep.nfr); err != nil {
return nil, fmt.Errorf("error adding rules: %w", err)
}
if err := ensureRulesDeleted(rulesPerSvcToDelete, ep.nfr); err != nil {
return nil, fmt.Errorf("error deleting rules: %w", err)
}
return newStatus, nil
}
// updatesForCfg calculates any rules that need to be added or deleted for an individucal egress service config.
func updatesForCfg(svcName string, cfg egressservices.Config, status *egressservices.Status, tailnetTargetIPs []netip.Addr) ([]rule, []rule, error) {
rulesToAdd := make([]rule, 0)
rulesToDelete := make([]rule, 0)
currentConfig, ok := lookupCurrentConfig(svcName, status)
// If no rules for service are present yet, add them all.
if !ok {
for _, t := range tailnetTargetIPs {
for ports := range cfg.Ports {
log.Printf("syncegressservices: svc %s adding port %v", svcName, ports)
rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: t})
}
}
return rulesToAdd, rulesToDelete, nil
}
// If there are no backend targets available, delete any currently configured rules.
if len(tailnetTargetIPs) == 0 {
log.Printf("tailnet target for egress service %s does not have any backend addresses, deleting all rules", svcName)
for _, ip := range currentConfig.TailnetTargetIPs {
for ports := range currentConfig.Ports {
rulesToDelete = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
}
return rulesToAdd, rulesToDelete, nil
}
// If there are rules present for backend targets that no longer match, delete them.
for _, ip := range currentConfig.TailnetTargetIPs {
var found bool
for _, wantsIP := range tailnetTargetIPs {
if reflect.DeepEqual(ip, wantsIP) {
found = true
break
}
}
if !found {
for ports := range currentConfig.Ports {
rulesToDelete = append(rulesToDelete, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
}
}
// Sync rules for the currently wanted backend targets.
for _, ip := range tailnetTargetIPs {
// If the backend target is not yet present in status, add all rules.
var found bool
for _, gotIP := range currentConfig.TailnetTargetIPs {
if reflect.DeepEqual(ip, gotIP) {
found = true
break
}
}
if !found {
for ports := range cfg.Ports {
rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
continue
}
// If the backend target is present in status, check that the
// currently applied rules are up to date.
// Delete any current portmappings that are no longer present in config.
for port := range currentConfig.Ports {
if _, ok := cfg.Ports[port]; ok {
continue
}
rulesToDelete = append(rulesToDelete, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip})
}
// Add any new portmappings.
for port := range cfg.Ports {
if _, ok := currentConfig.Ports[port]; ok {
continue
}
rulesToAdd = append(rulesToAdd, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip})
}
}
return rulesToAdd, rulesToDelete, nil
}
// deleteUnneccessaryServices ensure that any services found on status, but not
// present in config are deleted.
func (ep *egressProxy) deleteUnnecessaryServices(cfgs *egressservices.Configs, status *egressservices.Status) error {
if !hasServicesConfigured(status) {
return nil
}
if !wantsServicesConfigured(cfgs) {
for svcName, svc := range status.Services {
log.Printf("service %s is no longer required, deleting", svcName)
if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
}
if len(prefs.AdvertiseServices) == 0 {
return nil
}
for svcName, svc := range status.Services {
if _, ok := (*cfgs)[svcName]; !ok {
log.Printf("service %s is no longer required, deleting", svcName)
if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
// TODO (irbekrm): also delete the SNAT rule here
}
log.Printf("unadvertising services: %v", prefs.AdvertiseServices)
if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseServicesSet: true,
Prefs: ipn.Prefs{
AdvertiseServices: nil,
},
}); err != nil {
// EditPrefs only returns an error if it fails _set_ its local prefs.
// If it fails to _persist_ the prefs in state, we don't get an error
// and we continue waiting below, as control will failover as usual.
return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
}
// Services use the same (failover XOR regional routing) mechanism that
// HA subnet routers use. Unfortunately we don't yet get a reliable signal
// from control that it's responded to our unadvertisement, so the best we
// can do is wait for 20 seconds, where 15s is the approximate maximum time
// it should take for control to choose a new primary, and 5s is for buffer.
//
// Note: There is no guarantee that clients have been _informed_ of the new
// primary no matter how long we wait. We would need a mechanism to await
// netmap updates for peers to know for sure.
//
// See https://tailscale.com/kb/1115/high-availability for more details.
// TODO(tomhjp): Wait for a netmap update instead of sleeping when control
// supports that.
select {
case <-ctx.Done():
return nil
case <-time.After(20 * time.Second):
return nil
}
return nil
}
// getConfigs gets the mounted egress service configuration.
func (ep *egressProxy) getConfigs() (*egressservices.Configs, error) {
svcsCfg := filepath.Join(ep.cfgPath, egressservices.KeyEgressServices)
j, err := os.ReadFile(svcsCfg)
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
return nil, err
}
if len(j) == 0 || string(j) == "" {
return nil, nil
}
cfg := &egressservices.Configs{}
if err := json.Unmarshal(j, &cfg); err != nil {
return nil, err
}
return cfg, nil
}
// getStatus gets the current status of the configured firewall. The current
// status is stored in state Secret. Returns nil status if no status that
// applies to the current proxy Pod was found. Uses the Pod IP to determine if a
// status found in the state Secret applies to this proxy Pod.
func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, error) {
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return nil, fmt.Errorf("error retrieving state secret: %w", err)
}
status := &egressservices.Status{}
raw, ok := secret.Data[egressservices.KeyEgressServices]
if !ok {
return nil, nil
}
if err := json.Unmarshal([]byte(raw), status); err != nil {
return nil, fmt.Errorf("error unmarshalling previous config: %w", err)
}
if reflect.DeepEqual(status.PodIPv4, ep.podIPv4) {
return status, nil
}
return nil, nil
}
// setStatus writes egress proxy's currently configured firewall to the state
// Secret and updates proxy's tailnet addresses.
func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error {
// Pod IP is used to determine if a stored status applies to THIS proxy Pod.
if status == nil {
status = &egressservices.Status{}
}
status.PodIPv4 = ep.podIPv4
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return fmt.Errorf("error retrieving state Secret: %w", err)
}
bs, err := json.Marshal(status)
if err != nil {
return fmt.Errorf("error marshalling service config: %w", err)
}
secret.Data[egressservices.KeyEgressServices] = bs
patch := kubeclient.JSONPatch{
Op: "replace",
Path: fmt.Sprintf("/data/%s", egressservices.KeyEgressServices),
Value: bs,
}
if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil {
return fmt.Errorf("error patching state Secret: %w", err)
}
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice()
return nil
}
// tailnetTargetIPsForSvc returns the tailnet IPs to which traffic for this
// egress service should be proxied. The egress service can be configured by IP
// or by FQDN. If it's configured by IP, just return that. If it's configured by
// FQDN, resolve the FQDN and return the resolved IPs. It checks if the
// netfilter runner supports IPv6 NAT and skips any IPv6 addresses if it
// doesn't.
func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) {
if svc.TailnetTarget.IP != "" {
addr, err := netip.ParseAddr(svc.TailnetTarget.IP)
if err != nil {
return nil, fmt.Errorf("error parsing tailnet target IP: %w", err)
}
if addr.Is6() && !ep.nfr.HasIPV6NAT() {
log.Printf("tailnet target is an IPv6 address, but this host does not support IPv6 in the chosen firewall mode. This will probably not work.")
return addrs, nil
}
return []netip.Addr{addr}, nil
}
if svc.TailnetTarget.FQDN == "" {
return nil, errors.New("unexpected egress service config- neither tailnet target IP nor FQDN is set")
}
if n.NetMap == nil {
log.Printf("netmap is not available, unable to determine backend addresses for %s", svc.TailnetTarget.FQDN)
return addrs, nil
}
var (
node tailcfg.NodeView
nodeFound bool
)
for _, nn := range n.NetMap.Peers {
if equalFQDNs(nn.Name(), svc.TailnetTarget.FQDN) {
node = nn
nodeFound = true
break
}
}
if nodeFound {
for _, addr := range node.Addresses().AsSlice() {
if addr.Addr().Is6() && !ep.nfr.HasIPV6NAT() {
log.Printf("tailnet target %v is an IPv6 address, but this host does not support IPv6 in the chosen firewall mode, skipping.", addr.Addr().String())
continue
}
addrs = append(addrs, addr.Addr())
}
// Egress target endpoints configured via FQDN are stored, so
// that we can determine if a netmap update should trigger a
// resync.
mak.Set(&ep.targetFQDNs, svc.TailnetTarget.FQDN, node.Addresses().AsSlice())
}
return addrs, nil
}
// shouldResync parses netmap update and returns true if the update contains
// changes for which the egress proxy's firewall should be reconfigured.
func (ep *egressProxy) shouldResync(n ipn.Notify) bool {
if n.NetMap == nil {
return false
}
// If proxy's tailnet addresses have changed, resync.
if !reflect.DeepEqual(n.NetMap.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) {
log.Printf("node addresses have changed, trigger egress config resync")
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice()
return true
}
// If the IPs for any of the egress services configured via FQDN have
// changed, resync.
for fqdn, ips := range ep.targetFQDNs {
for _, nn := range n.NetMap.Peers {
if equalFQDNs(nn.Name(), fqdn) {
if !reflect.DeepEqual(ips, nn.Addresses().AsSlice()) {
log.Printf("backend addresses for egress target %q have changed old IPs %v, new IPs %v trigger egress config resync", nn.Name(), ips, nn.Addresses().AsSlice())
}
return true
}
}
}
return false
}
// ensureServiceDeleted ensures that any rules for an egress service are removed
// from the firewall configuration.
func ensureServiceDeleted(svcName string, svc *egressservices.ServiceStatus, nfr linuxfw.NetfilterRunner) error {
// Note that the portmap is needed for iptables based firewall only.
// Nftables group rules for a service in a chain, so there is no need to
// specify individual portmapping based rules.
pms := make([]linuxfw.PortMap, 0)
for pm := range svc.Ports {
pms = append(pms, linuxfw.PortMap{MatchPort: pm.MatchPort, TargetPort: pm.TargetPort, Protocol: pm.Protocol})
}
if err := nfr.DeleteSvc(svcName, tailscaleTunInterface, svc.TailnetTargetIPs, pms); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
return nil
}
// ensureRulesAdded ensures that all portmapping rules are added to the firewall
// configuration. For any rules that already exist, calling this function is a
// no-op. In case of nftables, a service consists of one or two (one per IP
// family) chains that conain the portmapping rules for the service and the
// chains as needed when this function is called.
func ensureRulesAdded(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc {
for _, rule := range rules {
log.Printf("ensureRulesAdded svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.EnsurePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error ensuring rule: %w", err)
}
}
}
return nil
}
// ensureRulesDeleted ensures that the given rules are deleted from the firewall
// configuration. For any rules that do not exist, calling this funcion is a
// no-op.
func ensureRulesDeleted(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc {
for _, rule := range rules {
log.Printf("ensureRulesDeleted svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.DeletePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error deleting rule: %w", err)
}
}
}
return nil
}
func lookupCurrentConfig(svcName string, status *egressservices.Status) (*egressservices.ServiceStatus, bool) {
if status == nil || len(status.Services) == 0 {
return nil, false
}
c, ok := status.Services[svcName]
return c, ok
}
func equalFQDNs(s, s1 string) bool {
s, _ = strings.CutSuffix(s, ".")
s1, _ = strings.CutSuffix(s1, ".")
return strings.EqualFold(s, s1)
}
// rule contains configuration for an egress proxy firewall rule.
type rule struct {
containerPort uint16 // port to match incoming traffic
tailnetPort uint16 // tailnet service port
tailnetIP netip.Addr // tailnet service IP
protocol string
}
func wantsServicesConfigured(cfgs *egressservices.Configs) bool {
return cfgs != nil && len(*cfgs) != 0
}
func hasServicesConfigured(status *egressservices.Status) bool {
return status != nil && len(status.Services) != 0
}
func servicesStatusIsEqual(st, st1 *egressservices.Status) bool {
if st == nil && st1 == nil {
return true
}
if st == nil || st1 == nil {
return false
}
st.PodIPv4 = ""
st1.PodIPv4 = ""
return reflect.DeepEqual(*st, *st1)
}
// registerHandlers adds a new handler to the provided ServeMux that can be called as a Kubernetes prestop hook to
// delay shutdown till it's safe to do so.
func (ep *egressProxy) registerHandlers(mux *http.ServeMux) {
mux.Handle(fmt.Sprintf("GET %s", kubetypes.EgessServicesPreshutdownEP), ep)
}
// ServeHTTP serves /internal-egress-services-preshutdown endpoint, when it receives a request, it periodically polls
// the configured health check endpoint for each egress service till it the health check endpoint no longer hits this
// proxy Pod. It uses the Pod-IPv4 header to verify if health check response is received from this Pod.
func (ep *egressProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cfgs, err := ep.getConfigs()
if err != nil {
http.Error(w, fmt.Sprintf("error retrieving egress services configs: %v", err), http.StatusInternalServerError)
return
}
if cfgs == nil {
if _, err := w.Write([]byte("safe to terminate")); err != nil {
http.Error(w, fmt.Sprintf("error writing termination status: %v", err), http.StatusInternalServerError)
return
}
}
hp, err := ep.getHEPPings()
if err != nil {
http.Error(w, fmt.Sprintf("error determining the number of times health check endpoint should be pinged: %v", err), http.StatusInternalServerError)
return
}
ep.waitTillSafeToShutdown(r.Context(), cfgs, hp)
}
// waitTillSafeToShutdown looks up all egress targets configured to be proxied via this instance and, for each target
// whose configuration includes a healthcheck endpoint, pings the endpoint till none of the responses
// are returned by this instance or till the HTTP request times out. In practice, the endpoint will be a Kubernetes Service for whom one of the backends
// would normally be this Pod. When this Pod is being deleted, the operator should have removed it from the Service
// backends and eventually kube proxy routing rules should be updated to no longer route traffic for the Service to this
// Pod.
func (ep *egressProxy) waitTillSafeToShutdown(ctx context.Context, cfgs *egressservices.Configs, hp int) {
if cfgs == nil || len(*cfgs) == 0 { // avoid sleeping if no services are configured
return
}
log.Printf("Ensuring that cluster traffic for egress targets is no longer routed via this Pod...")
wg := syncs.WaitGroup{}
for s, cfg := range *cfgs {
hep := cfg.HealthCheckEndpoint
if hep == "" {
log.Printf("Tailnet target %q does not have a cluster healthcheck specified, unable to verify if cluster traffic for the target is still routed via this Pod", s)
continue
}
svc := s
wg.Go(func() {
log.Printf("Ensuring that cluster traffic is no longer routed to %q via this Pod...", svc)
for {
if ctx.Err() != nil { // kubelet's HTTP request timeout
log.Printf("Cluster traffic for %s did not stop being routed to this Pod.", svc)
return
}
found, err := lookupPodRoute(ctx, hep, ep.podIPv4, hp, ep.client)
if err != nil {
log.Printf("unable to reach endpoint %q, assuming the routing rules for this Pod have been deleted: %v", hep, err)
break
}
if !found {
log.Printf("service %q is no longer routed through this Pod", svc)
break
}
log.Printf("service %q is still routed through this Pod, waiting...", svc)
time.Sleep(ep.shortSleep)
}
})
}
wg.Wait()
// The check above really only checked that the routing rules are updated on this node. Sleep for a bit to
// ensure that the routing rules are updated on other nodes. TODO(irbekrm): this may or may not be good enough.
// If it's not good enough, we'd probably want to do something more complex, where the proxies check each other.
log.Printf("Sleeping for %s before shutdown to ensure that kube proxies on all nodes have updated routing configuration", ep.longSleep)
time.Sleep(ep.longSleep)
}
// lookupPodRoute calls the healthcheck endpoint repeat times and returns true if the endpoint returns with the podIP
// header at least once.
func lookupPodRoute(ctx context.Context, hep, podIP string, repeat int, client httpClient) (bool, error) {
for range repeat {
f, err := lookup(ctx, hep, podIP, client)
if err != nil {
return false, err
}
if f {
return true, nil
}
}
return false, nil
}
// lookup calls the healthcheck endpoint and returns true if the response contains the podIP header.
func lookup(ctx context.Context, hep, podIP string, client httpClient) (bool, error) {
req, err := http.NewRequestWithContext(ctx, httpm.GET, hep, nil)
if err != nil {
return false, fmt.Errorf("error creating new HTTP request: %v", err)
}
// Close the TCP connection to ensure that the next request is routed to a different backend.
req.Close = true
resp, err := client.Do(req)
if err != nil {
log.Printf("Endpoint %q can not be reached: %v, likely because there are no (more) healthy backends", hep, err)
return true, nil
}
defer resp.Body.Close()
gotIP := resp.Header.Get(kubetypes.PodIPv4Header)
return strings.EqualFold(podIP, gotIP), nil
}
// getHEPPings gets the number of pings that should be sent to a health check endpoint to ensure that each configured
// backend is hit. This assumes that a health check endpoint is a Kubernetes Service and traffic to backend Pods is
// round robin load balanced.
func (ep *egressProxy) getHEPPings() (int, error) {
hepPingsPath := filepath.Join(ep.cfgPath, egressservices.KeyHEPPings)
j, err := os.ReadFile(hepPingsPath)
if os.IsNotExist(err) {
return 0, nil
}
if err != nil {
return -1, err
}
if len(j) == 0 || string(j) == "" {
return 0, nil
}
hp, err := strconv.Atoi(string(j))
if err != nil {
return -1, fmt.Errorf("error parsing hep pings as int: %v", err)
}
if hp < 0 {
log.Printf("[unexpected] hep pings is negative: %d", hp)
return 0, nil
}
return hp, nil
}

View File

@ -64,16 +64,17 @@ type settings struct {
// when setting up rules to proxy cluster traffic to cluster ingress
// target.
// Deprecated: use PodIPv4, PodIPv6 instead to support dual stack clusters
PodIP string
PodIPv4 string
PodIPv6 string
PodUID string
HealthCheckAddrPort string
LocalAddrPort string
MetricsEnabled bool
HealthCheckEnabled bool
DebugAddrPort string
EgressProxiesCfgPath string
PodIP string
PodIPv4 string
PodIPv6 string
PodUID string
HealthCheckAddrPort string
LocalAddrPort string
MetricsEnabled bool
HealthCheckEnabled bool
DebugAddrPort string
EgressProxiesCfgPath string
IngressProxiesCfgPath string
// CertShareMode is set for Kubernetes Pods running cert share mode.
// Possible values are empty (containerboot doesn't run any certs
// logic), 'ro' (for Pods that shold never attempt to issue/renew
@ -114,6 +115,7 @@ func configFromEnv() (*settings, error) {
HealthCheckEnabled: defaultBool("TS_ENABLE_HEALTH_CHECK", false),
DebugAddrPort: defaultEnv("TS_DEBUG_ADDR_PORT", ""),
EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""),
IngressProxiesCfgPath: defaultEnv("TS_INGRESS_PROXIES_CONFIG_PATH", ""),
PodUID: defaultEnv("POD_UID", ""),
}
podIPs, ok := os.LookupEnv("POD_IPS")
@ -219,6 +221,9 @@ func (s *settings) validate() error {
if s.EgressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") {
return errors.New("TS_EGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes")
}
if s.IngressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") {
return errors.New("TS_INGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes")
}
return nil
}
@ -308,7 +313,7 @@ func isOneStepConfig(cfg *settings) bool {
// as an L3 proxy, proxying to an endpoint provided via one of the config env
// vars.
func isL3Proxy(cfg *settings) bool {
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != ""
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != "" || cfg.IngressProxiesCfgPath != ""
}
// hasKubeStateStore returns true if the state must be stored in a Kubernetes

View File

@ -0,0 +1,53 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package ingressservices contains shared types for exposing Kubernetes Services to tailnet.
// These are split into a separate package for consumption of
// non-Kubernetes shared libraries and binaries. Be mindful of not increasing
// dependency size for those consumers when adding anything new here.
package ingressservices
import "net/netip"
// IngressConfigKey is the key at which both the desired ingress firewall
// configuration is stored in the ingress proxies' ConfigMap and at which the
// recorded firewall configuration status is stored in the proxies' state
// Secrets.
const IngressConfigKey = "ingress-config.json"
// Configs contains the desired configuration for ingress proxies firewall. Map
// keys are VIPService names.
type Configs map[string]Config
// GetConfig returns the desired configuration for the given VIPService name.
func (cfgs *Configs) GetConfig(name string) *Config {
if cfgs == nil {
return nil
}
if cfg, ok := (*cfgs)[name]; ok {
return &cfg
}
return nil
}
// Status contains the recorded firewall configuration status for a specific
// ingress proxy Pod.
// Pod IPs are used to identify the ingress proxy Pod.
type Status struct {
Configs Configs `json:"configs,omitempty"`
PodIPv4 string `json:"podIPv4,omitempty"`
PodIPv6 string `json:"podIPv6,omitempty"`
}
// Config is an ingress service configuration.
type Config struct {
IPv4Mapping *Mapping `json:"IPv4Mapping,omitempty"`
IPv6Mapping *Mapping `json:"IPv6Mapping,omitempty"`
}
// Mapping describes a rule that forwads traffic from VIPService IP to a
// Kubernetes Service IP.
type Mapping struct {
VIPServiceIP netip.Addr `json:"VIPServiceIP"`
ClusterIP netip.Addr `json:"ClusterIP"`
}