mirror of
https://github.com/tailscale/tailscale.git
synced 2025-02-19 19:38:40 +00:00
cmd/{k8s-operator,containerboot},kube: ensure egress ProxyGroup proxies don't terminate while cluster traffic is still routed to them (#14436)
cmd/{containerboot,k8s-operator},kube: add preshutdown hook for egress PG proxies This change is part of work towards minimizing downtime during update rollouts of egress ProxyGroup replicas. This change: - updates the containerboot health check logic to return Pod IP in headers, if set - always runs the health check for egress PG proxies - updates ClusterIP Services created for PG egress endpoints to include the health check endpoint - implements preshutdown endpoint in proxies. The preshutdown endpoint logic waits till, for all currently configured egress services, the ClusterIP Service health check endpoint is no longer returned by the shutting-down Pod (by looking at the new Pod IP header). - ensures that kubelet is configured to call the preshutdown endpoint This reduces the possibility that, as replicas are terminated during an update, a replica gets terminated to which cluster traffic is still being routed via the ClusterIP Service because kube proxy has not yet updated routig rules. This is not a perfect check as in practice, it only checks that the kube proxy on the node on which the proxy runs has updated rules. However, overall this might be good enough. The preshutdown logic is disabled if users have configured a custom health check port via TS_LOCAL_ADDR_PORT env var. This change throws a warnign if so and in future setting of that env var for operator proxies might be disallowed (as users shouldn't need to configure this for a Pod directly). This is backwards compatible with earlier proxy versions. Updates tailscale/tailscale#14326 Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
parent
eb299302ba
commit
b406f209c3
@ -6,9 +6,12 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"tailscale.com/kube/kubetypes"
|
||||
)
|
||||
|
||||
// healthz is a simple health check server, if enabled it returns 200 OK if
|
||||
@ -17,6 +20,7 @@ import (
|
||||
type healthz struct {
|
||||
sync.Mutex
|
||||
hasAddrs bool
|
||||
podIPv4 string
|
||||
}
|
||||
|
||||
func (h *healthz) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
@ -24,7 +28,10 @@ func (h *healthz) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer h.Unlock()
|
||||
|
||||
if h.hasAddrs {
|
||||
w.Write([]byte("ok"))
|
||||
w.Header().Add(kubetypes.PodIPv4Header, h.podIPv4)
|
||||
if _, err := w.Write([]byte("ok")); err != nil {
|
||||
http.Error(w, fmt.Sprintf("error writing status: %v", err), http.StatusInternalServerError)
|
||||
}
|
||||
} else {
|
||||
http.Error(w, "node currently has no tailscale IPs", http.StatusServiceUnavailable)
|
||||
}
|
||||
@ -43,8 +50,8 @@ func (h *healthz) update(healthy bool) {
|
||||
// healthHandlers registers a simple health handler at /healthz.
|
||||
// A containerized tailscale instance is considered healthy if
|
||||
// it has at least one tailnet IP address.
|
||||
func healthHandlers(mux *http.ServeMux) *healthz {
|
||||
h := &healthz{}
|
||||
func healthHandlers(mux *http.ServeMux, podIPv4 string) *healthz {
|
||||
h := &healthz{podIPv4: podIPv4}
|
||||
mux.Handle("GET /healthz", h)
|
||||
return h
|
||||
}
|
||||
|
@ -191,17 +191,18 @@ func main() {
|
||||
defer killTailscaled()
|
||||
|
||||
var healthCheck *healthz
|
||||
ep := &egressProxy{}
|
||||
if cfg.HealthCheckAddrPort != "" {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
log.Printf("Running healthcheck endpoint at %s/healthz", cfg.HealthCheckAddrPort)
|
||||
healthCheck = healthHandlers(mux)
|
||||
healthCheck = healthHandlers(mux, cfg.PodIPv4)
|
||||
|
||||
close := runHTTPServer(mux, cfg.HealthCheckAddrPort)
|
||||
defer close()
|
||||
}
|
||||
|
||||
if cfg.localMetricsEnabled() || cfg.localHealthEnabled() {
|
||||
if cfg.localMetricsEnabled() || cfg.localHealthEnabled() || cfg.egressSvcsTerminateEPEnabled() {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
if cfg.localMetricsEnabled() {
|
||||
@ -211,7 +212,11 @@ func main() {
|
||||
|
||||
if cfg.localHealthEnabled() {
|
||||
log.Printf("Running healthcheck endpoint at %s/healthz", cfg.LocalAddrPort)
|
||||
healthCheck = healthHandlers(mux)
|
||||
healthCheck = healthHandlers(mux, cfg.PodIPv4)
|
||||
}
|
||||
if cfg.EgressProxiesCfgPath != "" {
|
||||
log.Printf("Running preshutdown hook at %s%s", cfg.LocalAddrPort, kubetypes.EgessServicesPreshutdownEP)
|
||||
ep.registerHandlers(mux)
|
||||
}
|
||||
|
||||
close := runHTTPServer(mux, cfg.LocalAddrPort)
|
||||
@ -639,20 +644,21 @@ runLoop:
|
||||
// will then continuously monitor the config file and netmap updates and
|
||||
// reconfigure the firewall rules as needed. If any of its operations fail, it
|
||||
// will crash this node.
|
||||
if cfg.EgressSvcsCfgPath != "" {
|
||||
log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressSvcsCfgPath)
|
||||
if cfg.EgressProxiesCfgPath != "" {
|
||||
log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressProxiesCfgPath)
|
||||
egressSvcsNotify = make(chan ipn.Notify)
|
||||
ep := egressProxy{
|
||||
cfgPath: cfg.EgressSvcsCfgPath,
|
||||
opts := egressProxyRunOpts{
|
||||
cfgPath: cfg.EgressProxiesCfgPath,
|
||||
nfr: nfr,
|
||||
kc: kc,
|
||||
tsClient: client,
|
||||
stateSecret: cfg.KubeSecret,
|
||||
netmapChan: egressSvcsNotify,
|
||||
podIPv4: cfg.PodIPv4,
|
||||
tailnetAddrs: addrs,
|
||||
}
|
||||
go func() {
|
||||
if err := ep.run(ctx, n); err != nil {
|
||||
if err := ep.run(ctx, n, opts); err != nil {
|
||||
egressSvcsErrorChan <- err
|
||||
}
|
||||
}()
|
||||
|
@ -32,6 +32,8 @@ import (
|
||||
"golang.org/x/sys/unix"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/egressservices"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/types/netmap"
|
||||
@ -54,20 +56,9 @@ func TestContainerBoot(t *testing.T) {
|
||||
defer kube.Close()
|
||||
|
||||
tailscaledConf := &ipn.ConfigVAlpha{AuthKey: ptr.To("foo"), Version: "alpha0"}
|
||||
tailscaledConfBytes, err := json.Marshal(tailscaledConf)
|
||||
if err != nil {
|
||||
t.Fatalf("error unmarshaling tailscaled config: %v", err)
|
||||
}
|
||||
serveConf := ipn.ServeConfig{TCP: map[uint16]*ipn.TCPPortHandler{80: {HTTP: true}}}
|
||||
serveConfBytes, err := json.Marshal(serveConf)
|
||||
if err != nil {
|
||||
t.Fatalf("error unmarshaling serve config: %v", err)
|
||||
}
|
||||
egressSvcsCfg := egressservices.Configs{"foo": {TailnetTarget: egressservices.TailnetTarget{FQDN: "foo.tailnetxyx.ts.net"}}}
|
||||
egressSvcsCfgBytes, err := json.Marshal(egressSvcsCfg)
|
||||
if err != nil {
|
||||
t.Fatalf("error unmarshaling egress services config: %v", err)
|
||||
}
|
||||
egressCfg := egressSvcConfig("foo", "foo.tailnetxyz.ts.net")
|
||||
egressStatus := egressSvcStatus("foo", "foo.tailnetxyz.ts.net")
|
||||
|
||||
dirs := []string{
|
||||
"var/lib",
|
||||
@ -84,16 +75,17 @@ func TestContainerBoot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
files := map[string][]byte{
|
||||
"usr/bin/tailscaled": fakeTailscaled,
|
||||
"usr/bin/tailscale": fakeTailscale,
|
||||
"usr/bin/iptables": fakeTailscale,
|
||||
"usr/bin/ip6tables": fakeTailscale,
|
||||
"dev/net/tun": []byte(""),
|
||||
"proc/sys/net/ipv4/ip_forward": []byte("0"),
|
||||
"proc/sys/net/ipv6/conf/all/forwarding": []byte("0"),
|
||||
"etc/tailscaled/cap-95.hujson": tailscaledConfBytes,
|
||||
"etc/tailscaled/serve-config.json": serveConfBytes,
|
||||
"etc/tailscaled/egress-services-config.json": egressSvcsCfgBytes,
|
||||
"usr/bin/tailscaled": fakeTailscaled,
|
||||
"usr/bin/tailscale": fakeTailscale,
|
||||
"usr/bin/iptables": fakeTailscale,
|
||||
"usr/bin/ip6tables": fakeTailscale,
|
||||
"dev/net/tun": []byte(""),
|
||||
"proc/sys/net/ipv4/ip_forward": []byte("0"),
|
||||
"proc/sys/net/ipv6/conf/all/forwarding": []byte("0"),
|
||||
"etc/tailscaled/cap-95.hujson": mustJSON(t, tailscaledConf),
|
||||
"etc/tailscaled/serve-config.json": mustJSON(t, serveConf),
|
||||
filepath.Join("etc/tailscaled/", egressservices.KeyEgressServices): mustJSON(t, egressCfg),
|
||||
filepath.Join("etc/tailscaled/", egressservices.KeyHEPPings): []byte("4"),
|
||||
}
|
||||
resetFiles := func() {
|
||||
for path, content := range files {
|
||||
@ -132,6 +124,9 @@ func TestContainerBoot(t *testing.T) {
|
||||
healthURL := func(port int) string {
|
||||
return fmt.Sprintf("http://127.0.0.1:%d/healthz", port)
|
||||
}
|
||||
egressSvcTerminateURL := func(port int) string {
|
||||
return fmt.Sprintf("http://127.0.0.1:%d%s", port, kubetypes.EgessServicesPreshutdownEP)
|
||||
}
|
||||
|
||||
capver := fmt.Sprintf("%d", tailcfg.CurrentCapabilityVersion)
|
||||
|
||||
@ -896,9 +891,10 @@ func TestContainerBoot(t *testing.T) {
|
||||
{
|
||||
Name: "egress_svcs_config_kube",
|
||||
Env: map[string]string{
|
||||
"KUBERNETES_SERVICE_HOST": kube.Host,
|
||||
"KUBERNETES_SERVICE_PORT_HTTPS": kube.Port,
|
||||
"TS_EGRESS_SERVICES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled/egress-services-config.json"),
|
||||
"KUBERNETES_SERVICE_HOST": kube.Host,
|
||||
"KUBERNETES_SERVICE_PORT_HTTPS": kube.Port,
|
||||
"TS_EGRESS_PROXIES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled"),
|
||||
"TS_LOCAL_ADDR_PORT": fmt.Sprintf("[::]:%d", localAddrPort),
|
||||
},
|
||||
KubeSecret: map[string]string{
|
||||
"authkey": "tskey-key",
|
||||
@ -912,28 +908,35 @@ func TestContainerBoot(t *testing.T) {
|
||||
WantKubeSecret: map[string]string{
|
||||
"authkey": "tskey-key",
|
||||
},
|
||||
EndpointStatuses: map[string]int{
|
||||
egressSvcTerminateURL(localAddrPort): 200,
|
||||
},
|
||||
},
|
||||
{
|
||||
Notify: runningNotify,
|
||||
WantKubeSecret: map[string]string{
|
||||
"egress-services": mustBase64(t, egressStatus),
|
||||
"authkey": "tskey-key",
|
||||
"device_fqdn": "test-node.test.ts.net",
|
||||
"device_id": "myID",
|
||||
"device_ips": `["100.64.0.1"]`,
|
||||
"tailscale_capver": capver,
|
||||
},
|
||||
EndpointStatuses: map[string]int{
|
||||
egressSvcTerminateURL(localAddrPort): 200,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "egress_svcs_config_no_kube",
|
||||
Env: map[string]string{
|
||||
"TS_EGRESS_SERVICES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled/egress-services-config.json"),
|
||||
"TS_AUTHKEY": "tskey-key",
|
||||
"TS_EGRESS_PROXIES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled"),
|
||||
"TS_AUTHKEY": "tskey-key",
|
||||
},
|
||||
Phases: []phase{
|
||||
{
|
||||
WantFatalLog: "TS_EGRESS_SERVICES_CONFIG_PATH is only supported for Tailscale running on Kubernetes",
|
||||
WantFatalLog: "TS_EGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes",
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -1394,13 +1397,31 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) {
|
||||
panic(fmt.Sprintf("json decode failed: %v. Body:\n\n%s", err, string(bs)))
|
||||
}
|
||||
for _, op := range req {
|
||||
if op.Op != "remove" {
|
||||
if op.Op == "remove" {
|
||||
if !strings.HasPrefix(op.Path, "/data/") {
|
||||
panic(fmt.Sprintf("unsupported json-patch path %q", op.Path))
|
||||
}
|
||||
delete(k.secret, strings.TrimPrefix(op.Path, "/data/"))
|
||||
} else if op.Op == "replace" {
|
||||
path, ok := strings.CutPrefix(op.Path, "/data/")
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("unsupported json-patch path %q", op.Path))
|
||||
}
|
||||
req := make([]kubeclient.JSONPatch, 0)
|
||||
if err := json.Unmarshal(bs, &req); err != nil {
|
||||
panic(fmt.Sprintf("json decode failed: %v. Body:\n\n%s", err, string(bs)))
|
||||
}
|
||||
|
||||
for _, patch := range req {
|
||||
val, ok := patch.Value.(string)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("unsupported json patch value %v: cannot be converted to string", patch.Value))
|
||||
}
|
||||
k.secret[path] = val
|
||||
}
|
||||
} else {
|
||||
panic(fmt.Sprintf("unsupported json-patch op %q", op.Op))
|
||||
}
|
||||
if !strings.HasPrefix(op.Path, "/data/") {
|
||||
panic(fmt.Sprintf("unsupported json-patch path %q", op.Path))
|
||||
}
|
||||
delete(k.secret, strings.TrimPrefix(op.Path, "/data/"))
|
||||
}
|
||||
case "application/strategic-merge-patch+json":
|
||||
req := struct {
|
||||
@ -1419,3 +1440,41 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) {
|
||||
panic(fmt.Sprintf("unhandled HTTP method %q", r.Method))
|
||||
}
|
||||
}
|
||||
|
||||
func mustBase64(t *testing.T, v any) string {
|
||||
b := mustJSON(t, v)
|
||||
s := base64.StdEncoding.WithPadding('=').EncodeToString(b)
|
||||
return s
|
||||
}
|
||||
|
||||
func mustJSON(t *testing.T, v any) []byte {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
t.Fatalf("error converting %v to json: %v", v, err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// egress services status given one named tailnet target specified by FQDN. As written by the proxy to its state Secret.
|
||||
func egressSvcStatus(name, fqdn string) egressservices.Status {
|
||||
return egressservices.Status{
|
||||
Services: map[string]*egressservices.ServiceStatus{
|
||||
name: {
|
||||
TailnetTarget: egressservices.TailnetTarget{
|
||||
FQDN: fqdn,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// egress config given one named tailnet target specified by FQDN.
|
||||
func egressSvcConfig(name, fqdn string) egressservices.Configs {
|
||||
return egressservices.Configs{
|
||||
name: egressservices.Config{
|
||||
TailnetTarget: egressservices.TailnetTarget{
|
||||
FQDN: fqdn,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -11,18 +11,24 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"tailscale.com/client/tailscale"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/egressservices"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/util/httpm"
|
||||
"tailscale.com/util/linuxfw"
|
||||
"tailscale.com/util/mak"
|
||||
)
|
||||
@ -37,13 +43,15 @@ const tailscaleTunInterface = "tailscale0"
|
||||
// egressProxy knows how to configure firewall rules to route cluster traffic to
|
||||
// one or more tailnet services.
|
||||
type egressProxy struct {
|
||||
cfgPath string // path to egress service config file
|
||||
cfgPath string // path to a directory with egress services config files
|
||||
|
||||
nfr linuxfw.NetfilterRunner // never nil
|
||||
|
||||
kc kubeclient.Client // never nil
|
||||
stateSecret string // name of the kube state Secret
|
||||
|
||||
tsClient *tailscale.LocalClient // never nil
|
||||
|
||||
netmapChan chan ipn.Notify // chan to receive netmap updates on
|
||||
|
||||
podIPv4 string // never empty string, currently only IPv4 is supported
|
||||
@ -55,15 +63,29 @@ type egressProxy struct {
|
||||
// memory at all.
|
||||
targetFQDNs map[string][]netip.Prefix
|
||||
|
||||
// used to configure firewall rules.
|
||||
tailnetAddrs []netip.Prefix
|
||||
tailnetAddrs []netip.Prefix // tailnet IPs of this tailnet device
|
||||
|
||||
// shortSleep is the backoff sleep between healthcheck endpoint calls - can be overridden in tests.
|
||||
shortSleep time.Duration
|
||||
// longSleep is the time to sleep after the routing rules are updated to increase the chance that kube
|
||||
// proxies on all nodes have updated their routing configuration. It can be configured to 0 in
|
||||
// tests.
|
||||
longSleep time.Duration
|
||||
// client is a client that can send HTTP requests.
|
||||
client httpClient
|
||||
}
|
||||
|
||||
// httpClient is a client that can send HTTP requests and can be mocked in tests.
|
||||
type httpClient interface {
|
||||
Do(*http.Request) (*http.Response, error)
|
||||
}
|
||||
|
||||
// run configures egress proxy firewall rules and ensures that the firewall rules are reconfigured when:
|
||||
// - the mounted egress config has changed
|
||||
// - the proxy's tailnet IP addresses have changed
|
||||
// - tailnet IPs have changed for any backend targets specified by tailnet FQDN
|
||||
func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error {
|
||||
func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRunOpts) error {
|
||||
ep.configure(opts)
|
||||
var tickChan <-chan time.Time
|
||||
var eventChan <-chan fsnotify.Event
|
||||
// TODO (irbekrm): take a look if this can be pulled into a single func
|
||||
@ -75,7 +97,7 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error {
|
||||
tickChan = ticker.C
|
||||
} else {
|
||||
defer w.Close()
|
||||
if err := w.Add(filepath.Dir(ep.cfgPath)); err != nil {
|
||||
if err := w.Add(ep.cfgPath); err != nil {
|
||||
return fmt.Errorf("failed to add fsnotify watch: %w", err)
|
||||
}
|
||||
eventChan = w.Events
|
||||
@ -85,28 +107,52 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
var err error
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-tickChan:
|
||||
err = ep.sync(ctx, n)
|
||||
log.Printf("periodic sync, ensuring firewall config is up to date...")
|
||||
case <-eventChan:
|
||||
log.Printf("config file change detected, ensuring firewall config is up to date...")
|
||||
err = ep.sync(ctx, n)
|
||||
case n = <-ep.netmapChan:
|
||||
shouldResync := ep.shouldResync(n)
|
||||
if shouldResync {
|
||||
log.Printf("netmap change detected, ensuring firewall config is up to date...")
|
||||
err = ep.sync(ctx, n)
|
||||
if !shouldResync {
|
||||
continue
|
||||
}
|
||||
log.Printf("netmap change detected, ensuring firewall config is up to date...")
|
||||
}
|
||||
if err != nil {
|
||||
if err := ep.sync(ctx, n); err != nil {
|
||||
return fmt.Errorf("error syncing egress service config: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type egressProxyRunOpts struct {
|
||||
cfgPath string
|
||||
nfr linuxfw.NetfilterRunner
|
||||
kc kubeclient.Client
|
||||
tsClient *tailscale.LocalClient
|
||||
stateSecret string
|
||||
netmapChan chan ipn.Notify
|
||||
podIPv4 string
|
||||
tailnetAddrs []netip.Prefix
|
||||
}
|
||||
|
||||
// applyOpts configures egress proxy using the provided options.
|
||||
func (ep *egressProxy) configure(opts egressProxyRunOpts) {
|
||||
ep.cfgPath = opts.cfgPath
|
||||
ep.nfr = opts.nfr
|
||||
ep.kc = opts.kc
|
||||
ep.tsClient = opts.tsClient
|
||||
ep.stateSecret = opts.stateSecret
|
||||
ep.netmapChan = opts.netmapChan
|
||||
ep.podIPv4 = opts.podIPv4
|
||||
ep.tailnetAddrs = opts.tailnetAddrs
|
||||
ep.client = &http.Client{} // default HTTP client
|
||||
ep.shortSleep = time.Second
|
||||
ep.longSleep = time.Second * 10
|
||||
}
|
||||
|
||||
// sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if
|
||||
// any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current
|
||||
// firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such
|
||||
@ -327,7 +373,8 @@ func (ep *egressProxy) deleteUnnecessaryServices(cfgs *egressservices.Configs, s
|
||||
|
||||
// getConfigs gets the mounted egress service configuration.
|
||||
func (ep *egressProxy) getConfigs() (*egressservices.Configs, error) {
|
||||
j, err := os.ReadFile(ep.cfgPath)
|
||||
svcsCfg := filepath.Join(ep.cfgPath, egressservices.KeyEgressServices)
|
||||
j, err := os.ReadFile(svcsCfg)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
@ -569,3 +616,142 @@ func servicesStatusIsEqual(st, st1 *egressservices.Status) bool {
|
||||
st1.PodIPv4 = ""
|
||||
return reflect.DeepEqual(*st, *st1)
|
||||
}
|
||||
|
||||
// registerHandlers adds a new handler to the provided ServeMux that can be called as a Kubernetes prestop hook to
|
||||
// delay shutdown till it's safe to do so.
|
||||
func (ep *egressProxy) registerHandlers(mux *http.ServeMux) {
|
||||
mux.Handle(fmt.Sprintf("GET %s", kubetypes.EgessServicesPreshutdownEP), ep)
|
||||
}
|
||||
|
||||
// ServeHTTP serves /internal-egress-services-preshutdown endpoint, when it receives a request, it periodically polls
|
||||
// the configured health check endpoint for each egress service till it the health check endpoint no longer hits this
|
||||
// proxy Pod. It uses the Pod-IPv4 header to verify if health check response is received from this Pod.
|
||||
func (ep *egressProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
cfgs, err := ep.getConfigs()
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("error retrieving egress services configs: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if cfgs == nil {
|
||||
if _, err := w.Write([]byte("safe to terminate")); err != nil {
|
||||
http.Error(w, fmt.Sprintf("error writing termination status: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
hp, err := ep.getHEPPings()
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("error determining the number of times health check endpoint should be pinged: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
ep.waitTillSafeToShutdown(r.Context(), cfgs, hp)
|
||||
}
|
||||
|
||||
// waitTillSafeToShutdown looks up all egress targets configured to be proxied via this instance and, for each target
|
||||
// whose configuration includes a healthcheck endpoint, pings the endpoint till none of the responses
|
||||
// are returned by this instance or till the HTTP request times out. In practice, the endpoint will be a Kubernetes Service for whom one of the backends
|
||||
// would normally be this Pod. When this Pod is being deleted, the operator should have removed it from the Service
|
||||
// backends and eventually kube proxy routing rules should be updated to no longer route traffic for the Service to this
|
||||
// Pod.
|
||||
func (ep *egressProxy) waitTillSafeToShutdown(ctx context.Context, cfgs *egressservices.Configs, hp int) {
|
||||
if cfgs == nil || len(*cfgs) == 0 { // avoid sleeping if no services are configured
|
||||
return
|
||||
}
|
||||
log.Printf("Ensuring that cluster traffic for egress targets is no longer routed via this Pod...")
|
||||
wg := syncs.WaitGroup{}
|
||||
|
||||
for s, cfg := range *cfgs {
|
||||
hep := cfg.HealthCheckEndpoint
|
||||
if hep == "" {
|
||||
log.Printf("Tailnet target %q does not have a cluster healthcheck specified, unable to verify if cluster traffic for the target is still routed via this Pod", s)
|
||||
continue
|
||||
}
|
||||
svc := s
|
||||
wg.Go(func() {
|
||||
log.Printf("Ensuring that cluster traffic is no longer routed to %q via this Pod...", svc)
|
||||
for {
|
||||
if ctx.Err() != nil { // kubelet's HTTP request timeout
|
||||
log.Printf("Cluster traffic for %s did not stop being routed to this Pod.", svc)
|
||||
return
|
||||
}
|
||||
found, err := lookupPodRoute(ctx, hep, ep.podIPv4, hp, ep.client)
|
||||
if err != nil {
|
||||
log.Printf("unable to reach endpoint %q, assuming the routing rules for this Pod have been deleted: %v", hep, err)
|
||||
break
|
||||
}
|
||||
if !found {
|
||||
log.Printf("service %q is no longer routed through this Pod", svc)
|
||||
break
|
||||
}
|
||||
log.Printf("service %q is still routed through this Pod, waiting...", svc)
|
||||
time.Sleep(ep.shortSleep)
|
||||
}
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
// The check above really only checked that the routing rules are updated on this node. Sleep for a bit to
|
||||
// ensure that the routing rules are updated on other nodes. TODO(irbekrm): this may or may not be good enough.
|
||||
// If it's not good enough, we'd probably want to do something more complex, where the proxies check each other.
|
||||
log.Printf("Sleeping for %s before shutdown to ensure that kube proxies on all nodes have updated routing configuration", ep.longSleep)
|
||||
time.Sleep(ep.longSleep)
|
||||
}
|
||||
|
||||
// lookupPodRoute calls the healthcheck endpoint repeat times and returns true if the endpoint returns with the podIP
|
||||
// header at least once.
|
||||
func lookupPodRoute(ctx context.Context, hep, podIP string, repeat int, client httpClient) (bool, error) {
|
||||
for range repeat {
|
||||
f, err := lookup(ctx, hep, podIP, client)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if f {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// lookup calls the healthcheck endpoint and returns true if the response contains the podIP header.
|
||||
func lookup(ctx context.Context, hep, podIP string, client httpClient) (bool, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, httpm.GET, hep, nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error creating new HTTP request: %v", err)
|
||||
}
|
||||
|
||||
// Close the TCP connection to ensure that the next request is routed to a different backend.
|
||||
req.Close = true
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("Endpoint %q can not be reached: %v, likely because there are no (more) healthy backends", hep, err)
|
||||
return true, nil
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
gotIP := resp.Header.Get(kubetypes.PodIPv4Header)
|
||||
return strings.EqualFold(podIP, gotIP), nil
|
||||
}
|
||||
|
||||
// getHEPPings gets the number of pings that should be sent to a health check endpoint to ensure that each configured
|
||||
// backend is hit. This assumes that a health check endpoint is a Kubernetes Service and traffic to backend Pods is
|
||||
// round robin load balanced.
|
||||
func (ep *egressProxy) getHEPPings() (int, error) {
|
||||
hepPingsPath := filepath.Join(ep.cfgPath, egressservices.KeyHEPPings)
|
||||
j, err := os.ReadFile(hepPingsPath)
|
||||
if os.IsNotExist(err) {
|
||||
return 0, nil
|
||||
}
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
if len(j) == 0 || string(j) == "" {
|
||||
return 0, nil
|
||||
}
|
||||
hp, err := strconv.Atoi(string(j))
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("error parsing hep pings as int: %v", err)
|
||||
}
|
||||
if hp < 0 {
|
||||
log.Printf("[unexpected] hep pings is negative: %d", hp)
|
||||
return 0, nil
|
||||
}
|
||||
return hp, nil
|
||||
}
|
||||
|
@ -6,11 +6,18 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"tailscale.com/kube/egressservices"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
)
|
||||
|
||||
func Test_updatesForSvc(t *testing.T) {
|
||||
@ -173,3 +180,145 @@ func Test_updatesForSvc(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// A failure of this test will most likely look like a timeout.
|
||||
func TestWaitTillSafeToShutdown(t *testing.T) {
|
||||
podIP := "10.0.0.1"
|
||||
anotherIP := "10.0.0.2"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
// services is a map of service name to the number of calls to make to the healthcheck endpoint before
|
||||
// returning a response that does NOT contain this Pod's IP in headers.
|
||||
services map[string]int
|
||||
replicas int
|
||||
healthCheckSet bool
|
||||
}{
|
||||
{
|
||||
name: "no_configs",
|
||||
},
|
||||
{
|
||||
name: "one_service_immediately_safe_to_shutdown",
|
||||
services: map[string]int{
|
||||
"svc1": 0,
|
||||
},
|
||||
replicas: 2,
|
||||
healthCheckSet: true,
|
||||
},
|
||||
{
|
||||
name: "multiple_services_immediately_safe_to_shutdown",
|
||||
services: map[string]int{
|
||||
"svc1": 0,
|
||||
"svc2": 0,
|
||||
"svc3": 0,
|
||||
},
|
||||
replicas: 2,
|
||||
healthCheckSet: true,
|
||||
},
|
||||
{
|
||||
name: "multiple_services_no_healthcheck_endpoints",
|
||||
services: map[string]int{
|
||||
"svc1": 0,
|
||||
"svc2": 0,
|
||||
"svc3": 0,
|
||||
},
|
||||
replicas: 2,
|
||||
},
|
||||
{
|
||||
name: "one_service_eventually_safe_to_shutdown",
|
||||
services: map[string]int{
|
||||
"svc1": 3, // After 3 calls to health check endpoint, no longer returns this Pod's IP
|
||||
},
|
||||
replicas: 2,
|
||||
healthCheckSet: true,
|
||||
},
|
||||
{
|
||||
name: "multiple_services_eventually_safe_to_shutdown",
|
||||
services: map[string]int{
|
||||
"svc1": 1, // After 1 call to health check endpoint, no longer returns this Pod's IP
|
||||
"svc2": 3, // After 3 calls to health check endpoint, no longer returns this Pod's IP
|
||||
"svc3": 5, // After 5 calls to the health check endpoint, no longer returns this Pod's IP
|
||||
},
|
||||
replicas: 2,
|
||||
healthCheckSet: true,
|
||||
},
|
||||
{
|
||||
name: "multiple_services_eventually_safe_to_shutdown_with_higher_replica_count",
|
||||
services: map[string]int{
|
||||
"svc1": 7,
|
||||
"svc2": 10,
|
||||
},
|
||||
replicas: 5,
|
||||
healthCheckSet: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cfgs := &egressservices.Configs{}
|
||||
switches := make(map[string]int)
|
||||
|
||||
for svc, callsToSwitch := range tt.services {
|
||||
endpoint := fmt.Sprintf("http://%s.local", svc)
|
||||
if tt.healthCheckSet {
|
||||
(*cfgs)[svc] = egressservices.Config{
|
||||
HealthCheckEndpoint: endpoint,
|
||||
}
|
||||
}
|
||||
switches[endpoint] = callsToSwitch
|
||||
}
|
||||
|
||||
ep := &egressProxy{
|
||||
podIPv4: podIP,
|
||||
client: &mockHTTPClient{
|
||||
podIP: podIP,
|
||||
anotherIP: anotherIP,
|
||||
switches: switches,
|
||||
},
|
||||
}
|
||||
|
||||
ep.waitTillSafeToShutdown(context.Background(), cfgs, tt.replicas)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// mockHTTPClient is a client that receives an HTTP call for an egress service endpoint and returns a response with an
|
||||
// IP address in a 'Pod-IPv4' header. It can be configured to return one IP address for N calls, then switch to another
|
||||
// IP address to simulate a scenario where an IP is eventually no longer a backend for an endpoint.
|
||||
// TODO(irbekrm): to test this more thoroughly, we should have the client take into account the number of replicas and
|
||||
// return as if traffic was round robin load balanced across different Pods.
|
||||
type mockHTTPClient struct {
|
||||
// podIP - initial IP address to return, that matches the current proxy's IP address.
|
||||
podIP string
|
||||
anotherIP string
|
||||
// after how many calls to an endpoint, the client should start returning 'anotherIP' instead of 'podIP.
|
||||
switches map[string]int
|
||||
mu sync.Mutex // protects the following
|
||||
// calls tracks the number of calls received.
|
||||
calls map[string]int
|
||||
}
|
||||
|
||||
func (m *mockHTTPClient) Do(req *http.Request) (*http.Response, error) {
|
||||
m.mu.Lock()
|
||||
if m.calls == nil {
|
||||
m.calls = make(map[string]int)
|
||||
}
|
||||
|
||||
endpoint := req.URL.String()
|
||||
m.calls[endpoint]++
|
||||
calls := m.calls[endpoint]
|
||||
m.mu.Unlock()
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: make(http.Header),
|
||||
Body: io.NopCloser(strings.NewReader("")),
|
||||
}
|
||||
|
||||
if calls <= m.switches[endpoint] {
|
||||
resp.Header.Set(kubetypes.PodIPv4Header, m.podIP) // Pod is still routable
|
||||
} else {
|
||||
resp.Header.Set(kubetypes.PodIPv4Header, m.anotherIP) // Pod is no longer routable
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -64,16 +64,16 @@ type settings struct {
|
||||
// when setting up rules to proxy cluster traffic to cluster ingress
|
||||
// target.
|
||||
// Deprecated: use PodIPv4, PodIPv6 instead to support dual stack clusters
|
||||
PodIP string
|
||||
PodIPv4 string
|
||||
PodIPv6 string
|
||||
PodUID string
|
||||
HealthCheckAddrPort string
|
||||
LocalAddrPort string
|
||||
MetricsEnabled bool
|
||||
HealthCheckEnabled bool
|
||||
DebugAddrPort string
|
||||
EgressSvcsCfgPath string
|
||||
PodIP string
|
||||
PodIPv4 string
|
||||
PodIPv6 string
|
||||
PodUID string
|
||||
HealthCheckAddrPort string
|
||||
LocalAddrPort string
|
||||
MetricsEnabled bool
|
||||
HealthCheckEnabled bool
|
||||
DebugAddrPort string
|
||||
EgressProxiesCfgPath string
|
||||
}
|
||||
|
||||
func configFromEnv() (*settings, error) {
|
||||
@ -107,7 +107,7 @@ func configFromEnv() (*settings, error) {
|
||||
MetricsEnabled: defaultBool("TS_ENABLE_METRICS", false),
|
||||
HealthCheckEnabled: defaultBool("TS_ENABLE_HEALTH_CHECK", false),
|
||||
DebugAddrPort: defaultEnv("TS_DEBUG_ADDR_PORT", ""),
|
||||
EgressSvcsCfgPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""),
|
||||
EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""),
|
||||
PodUID: defaultEnv("POD_UID", ""),
|
||||
}
|
||||
podIPs, ok := os.LookupEnv("POD_IPS")
|
||||
@ -186,7 +186,7 @@ func (s *settings) validate() error {
|
||||
return fmt.Errorf("error parsing TS_HEALTHCHECK_ADDR_PORT value %q: %w", s.HealthCheckAddrPort, err)
|
||||
}
|
||||
}
|
||||
if s.localMetricsEnabled() || s.localHealthEnabled() {
|
||||
if s.localMetricsEnabled() || s.localHealthEnabled() || s.EgressProxiesCfgPath != "" {
|
||||
if _, err := netip.ParseAddrPort(s.LocalAddrPort); err != nil {
|
||||
return fmt.Errorf("error parsing TS_LOCAL_ADDR_PORT value %q: %w", s.LocalAddrPort, err)
|
||||
}
|
||||
@ -199,8 +199,8 @@ func (s *settings) validate() error {
|
||||
if s.HealthCheckEnabled && s.HealthCheckAddrPort != "" {
|
||||
return errors.New("TS_HEALTHCHECK_ADDR_PORT is deprecated and will be removed in 1.82.0, use TS_ENABLE_HEALTH_CHECK and optionally TS_LOCAL_ADDR_PORT")
|
||||
}
|
||||
if s.EgressSvcsCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") {
|
||||
return errors.New("TS_EGRESS_SERVICES_CONFIG_PATH is only supported for Tailscale running on Kubernetes")
|
||||
if s.EgressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") {
|
||||
return errors.New("TS_EGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -291,7 +291,7 @@ func isOneStepConfig(cfg *settings) bool {
|
||||
// as an L3 proxy, proxying to an endpoint provided via one of the config env
|
||||
// vars.
|
||||
func isL3Proxy(cfg *settings) bool {
|
||||
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressSvcsCfgPath != ""
|
||||
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != ""
|
||||
}
|
||||
|
||||
// hasKubeStateStore returns true if the state must be stored in a Kubernetes
|
||||
@ -308,6 +308,10 @@ func (cfg *settings) localHealthEnabled() bool {
|
||||
return cfg.LocalAddrPort != "" && cfg.HealthCheckEnabled
|
||||
}
|
||||
|
||||
func (cfg *settings) egressSvcsTerminateEPEnabled() bool {
|
||||
return cfg.LocalAddrPort != "" && cfg.EgressProxiesCfgPath != ""
|
||||
}
|
||||
|
||||
// defaultEnv returns the value of the given envvar name, or defVal if
|
||||
// unset.
|
||||
func defaultEnv(name, defVal string) string {
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
tsoperator "tailscale.com/k8s-operator"
|
||||
"tailscale.com/kube/egressservices"
|
||||
"tailscale.com/types/ptr"
|
||||
)
|
||||
@ -71,25 +70,27 @@ func (er *egressEpsReconciler) Reconcile(ctx context.Context, req reconcile.Requ
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("error retrieving ExternalName Service: %w", err)
|
||||
}
|
||||
if !tsoperator.EgressServiceIsValidAndConfigured(svc) {
|
||||
l.Infof("Cluster resources for ExternalName Service %s/%s are not yet configured", svc.Namespace, svc.Name)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// TODO(irbekrm): currently this reconcile loop runs all the checks every time it's triggered, which is
|
||||
// wasteful. Once we have a Ready condition for ExternalName Services for ProxyGroup, use the condition to
|
||||
// determine if a reconcile is needed.
|
||||
|
||||
oldEps := eps.DeepCopy()
|
||||
proxyGroupName := eps.Labels[labelProxyGroup]
|
||||
tailnetSvc := tailnetSvcName(svc)
|
||||
l = l.With("tailnet-service-name", tailnetSvc)
|
||||
|
||||
// Retrieve the desired tailnet service configuration from the ConfigMap.
|
||||
proxyGroupName := eps.Labels[labelProxyGroup]
|
||||
_, cfgs, err := egressSvcsConfigs(ctx, er.Client, proxyGroupName, er.tsNamespace)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("error retrieving tailnet services configuration: %w", err)
|
||||
}
|
||||
if cfgs == nil {
|
||||
// TODO(irbekrm): this path would be hit if egress service was once exposed on a ProxyGroup that later
|
||||
// got deleted. Probably the EndpointSlices then need to be deleted too- need to rethink this flow.
|
||||
l.Debugf("No egress config found, likely because ProxyGroup has not been created")
|
||||
return res, nil
|
||||
}
|
||||
cfg, ok := (*cfgs)[tailnetSvc]
|
||||
if !ok {
|
||||
l.Infof("[unexpected] configuration for tailnet service %s not found", tailnetSvc)
|
||||
|
@ -59,6 +59,8 @@ const (
|
||||
maxPorts = 1000
|
||||
|
||||
indexEgressProxyGroup = ".metadata.annotations.egress-proxy-group"
|
||||
|
||||
tsHealthCheckPortName = "tailscale-health-check"
|
||||
)
|
||||
|
||||
var gaugeEgressServices = clientmetric.NewGauge(kubetypes.MetricEgressServiceCount)
|
||||
@ -229,15 +231,16 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s
|
||||
found := false
|
||||
for _, wantsPM := range svc.Spec.Ports {
|
||||
if wantsPM.Port == pm.Port && strings.EqualFold(string(wantsPM.Protocol), string(pm.Protocol)) {
|
||||
// We don't use the port name to distinguish this port internally, but Kubernetes
|
||||
// require that, for Service ports with more than one name each port is uniquely named.
|
||||
// So we can always pick the port name from the ExternalName Service as at this point we
|
||||
// know that those are valid names because Kuberentes already validated it once. Note
|
||||
// that users could have changed an unnamed port to a named port and might have changed
|
||||
// port names- this should still work.
|
||||
// We want to both preserve the user set port names for ease of debugging, but also
|
||||
// ensure that we name all unnamed ports as the ClusterIP Service that we create will
|
||||
// always have at least two ports.
|
||||
// https://kubernetes.io/docs/concepts/services-networking/service/#multi-port-services
|
||||
// See also https://github.com/tailscale/tailscale/issues/13406#issuecomment-2507230388
|
||||
clusterIPSvc.Spec.Ports[i].Name = wantsPM.Name
|
||||
if wantsPM.Name != "" {
|
||||
clusterIPSvc.Spec.Ports[i].Name = wantsPM.Name
|
||||
} else {
|
||||
clusterIPSvc.Spec.Ports[i].Name = "tailscale-unnamed"
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
@ -252,6 +255,12 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s
|
||||
// ClusterIP Service produce new target port and add a portmapping to
|
||||
// the ClusterIP Service.
|
||||
for _, wantsPM := range svc.Spec.Ports {
|
||||
// Because we add a healthcheck port of our own, we will always have at least two ports. That
|
||||
// means that we cannot have ports with name not set.
|
||||
// https://kubernetes.io/docs/concepts/services-networking/service/#multi-port-services
|
||||
if wantsPM.Name == "" {
|
||||
wantsPM.Name = "tailscale-unnamed"
|
||||
}
|
||||
found := false
|
||||
for _, gotPM := range clusterIPSvc.Spec.Ports {
|
||||
if wantsPM.Port == gotPM.Port && strings.EqualFold(string(wantsPM.Protocol), string(gotPM.Protocol)) {
|
||||
@ -278,6 +287,25 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s
|
||||
})
|
||||
}
|
||||
}
|
||||
var healthCheckPort int32 = defaultLocalAddrPort
|
||||
|
||||
for {
|
||||
if !slices.ContainsFunc(svc.Spec.Ports, func(p corev1.ServicePort) bool {
|
||||
return p.Port == healthCheckPort
|
||||
}) {
|
||||
break
|
||||
}
|
||||
healthCheckPort++
|
||||
if healthCheckPort > 10002 {
|
||||
return nil, false, fmt.Errorf("unable to find a free port for internal health check in range [9002, 10002]")
|
||||
}
|
||||
}
|
||||
clusterIPSvc.Spec.Ports = append(clusterIPSvc.Spec.Ports, corev1.ServicePort{
|
||||
Name: tsHealthCheckPortName,
|
||||
Port: healthCheckPort,
|
||||
TargetPort: intstr.FromInt(defaultLocalAddrPort),
|
||||
Protocol: "TCP",
|
||||
})
|
||||
if !reflect.DeepEqual(clusterIPSvc, oldClusterIPSvc) {
|
||||
if clusterIPSvc, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, clusterIPSvc, func(svc *corev1.Service) {
|
||||
svc.Labels = clusterIPSvc.Labels
|
||||
@ -320,7 +348,7 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s
|
||||
}
|
||||
tailnetSvc := tailnetSvcName(svc)
|
||||
gotCfg := (*cfgs)[tailnetSvc]
|
||||
wantsCfg := egressSvcCfg(svc, clusterIPSvc)
|
||||
wantsCfg := egressSvcCfg(svc, clusterIPSvc, esr.tsNamespace, l)
|
||||
if !reflect.DeepEqual(gotCfg, wantsCfg) {
|
||||
l.Debugf("updating egress services ConfigMap %s", cm.Name)
|
||||
mak.Set(cfgs, tailnetSvc, wantsCfg)
|
||||
@ -504,10 +532,8 @@ func (esr *egressSvcsReconciler) validateClusterResources(ctx context.Context, s
|
||||
return false, nil
|
||||
}
|
||||
if !tsoperator.ProxyGroupIsReady(pg) {
|
||||
l.Infof("ProxyGroup %s is not ready, waiting...", proxyGroupName)
|
||||
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l)
|
||||
tsoperator.RemoveServiceCondition(svc, tsapi.EgressSvcConfigured)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
l.Debugf("egress service is valid")
|
||||
@ -515,6 +541,24 @@ func (esr *egressSvcsReconciler) validateClusterResources(ctx context.Context, s
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func egressSvcCfg(externalNameSvc, clusterIPSvc *corev1.Service, ns string, l *zap.SugaredLogger) egressservices.Config {
|
||||
d := retrieveClusterDomain(ns, l)
|
||||
tt := tailnetTargetFromSvc(externalNameSvc)
|
||||
hep := healthCheckForSvc(clusterIPSvc, d)
|
||||
cfg := egressservices.Config{
|
||||
TailnetTarget: tt,
|
||||
HealthCheckEndpoint: hep,
|
||||
}
|
||||
for _, svcPort := range clusterIPSvc.Spec.Ports {
|
||||
if svcPort.Name == tsHealthCheckPortName {
|
||||
continue // exclude healthcheck from egress svcs configs
|
||||
}
|
||||
pm := portMap(svcPort)
|
||||
mak.Set(&cfg.Ports, pm, struct{}{})
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
func validateEgressService(svc *corev1.Service, pg *tsapi.ProxyGroup) []string {
|
||||
violations := validateService(svc)
|
||||
|
||||
@ -584,16 +628,6 @@ func tailnetTargetFromSvc(svc *corev1.Service) egressservices.TailnetTarget {
|
||||
}
|
||||
}
|
||||
|
||||
func egressSvcCfg(externalNameSvc, clusterIPSvc *corev1.Service) egressservices.Config {
|
||||
tt := tailnetTargetFromSvc(externalNameSvc)
|
||||
cfg := egressservices.Config{TailnetTarget: tt}
|
||||
for _, svcPort := range clusterIPSvc.Spec.Ports {
|
||||
pm := portMap(svcPort)
|
||||
mak.Set(&cfg.Ports, pm, struct{}{})
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
func portMap(p corev1.ServicePort) egressservices.PortMap {
|
||||
// TODO (irbekrm): out of bounds check?
|
||||
return egressservices.PortMap{Protocol: string(p.Protocol), MatchPort: uint16(p.TargetPort.IntVal), TargetPort: uint16(p.Port)}
|
||||
@ -618,7 +652,11 @@ func egressSvcsConfigs(ctx context.Context, cl client.Client, proxyGroupName, ts
|
||||
Namespace: tsNamespace,
|
||||
},
|
||||
}
|
||||
if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil {
|
||||
err = cl.Get(ctx, client.ObjectKeyFromObject(cm), cm)
|
||||
if apierrors.IsNotFound(err) { // ProxyGroup resources have not been created (yet)
|
||||
return nil, nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error retrieving egress services ConfigMap %s: %v", name, err)
|
||||
}
|
||||
cfgs = &egressservices.Configs{}
|
||||
@ -740,3 +778,17 @@ func (esr *egressSvcsReconciler) updateSvcSpec(ctx context.Context, svc *corev1.
|
||||
svc.Status = *st
|
||||
return err
|
||||
}
|
||||
|
||||
// healthCheckForSvc return the URL of the containerboot's health check endpoint served by this Service or empty string.
|
||||
func healthCheckForSvc(svc *corev1.Service, clusterDomain string) string {
|
||||
// This version of the operator always sets health check port on the egress Services. However, it is possible
|
||||
// that this reconcile loops runs during a proxy upgrade from a version that did not set the health check port
|
||||
// and parses a Service that does not have the port set yet.
|
||||
i := slices.IndexFunc(svc.Spec.Ports, func(port corev1.ServicePort) bool {
|
||||
return port.Name == tsHealthCheckPortName
|
||||
})
|
||||
if i == -1 {
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf("http://%s.%s.svc.%s:%d/healthz", svc.Name, svc.Namespace, clusterDomain, svc.Spec.Ports[i].Port)
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
|
||||
@ -78,42 +79,16 @@ func TestTailscaleEgressServices(t *testing.T) {
|
||||
Selector: nil,
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Protocol: "TCP",
|
||||
Port: 80,
|
||||
},
|
||||
{
|
||||
Name: "https",
|
||||
Protocol: "TCP",
|
||||
Port: 443,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("proxy_group_not_ready", func(t *testing.T) {
|
||||
t.Run("service_one_unnamed_port", func(t *testing.T) {
|
||||
mustCreate(t, fc, svc)
|
||||
expectReconciled(t, esr, "default", "test")
|
||||
// Service should have EgressSvcValid condition set to Unknown.
|
||||
svc.Status.Conditions = []metav1.Condition{condition(tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, clock)}
|
||||
expectEqual(t, fc, svc)
|
||||
})
|
||||
|
||||
t.Run("proxy_group_ready", func(t *testing.T) {
|
||||
mustUpdateStatus(t, fc, "", "foo", func(pg *tsapi.ProxyGroup) {
|
||||
pg.Status.Conditions = []metav1.Condition{
|
||||
condition(tsapi.ProxyGroupReady, metav1.ConditionTrue, "", "", clock),
|
||||
}
|
||||
})
|
||||
expectReconciled(t, esr, "default", "test")
|
||||
validateReadyService(t, fc, esr, svc, clock, zl, cm)
|
||||
})
|
||||
t.Run("service_retain_one_unnamed_port", func(t *testing.T) {
|
||||
svc.Spec.Ports = []corev1.ServicePort{{Protocol: "TCP", Port: 80}}
|
||||
mustUpdate(t, fc, "default", "test", func(s *corev1.Service) {
|
||||
s.Spec.Ports = svc.Spec.Ports
|
||||
})
|
||||
expectReconciled(t, esr, "default", "test")
|
||||
validateReadyService(t, fc, esr, svc, clock, zl, cm)
|
||||
})
|
||||
t.Run("service_add_two_named_ports", func(t *testing.T) {
|
||||
@ -164,7 +139,7 @@ func validateReadyService(t *testing.T, fc client.WithWatch, esr *egressSvcsReco
|
||||
// Verify that an EndpointSlice has been created.
|
||||
expectEqual(t, fc, endpointSlice(name, svc, clusterSvc))
|
||||
// Verify that ConfigMap contains configuration for the new egress service.
|
||||
mustHaveConfigForSvc(t, fc, svc, clusterSvc, cm)
|
||||
mustHaveConfigForSvc(t, fc, svc, clusterSvc, cm, zl)
|
||||
r := svcConfiguredReason(svc, true, zl.Sugar())
|
||||
// Verify that the user-created ExternalName Service has Configured set to true and ExternalName pointing to the
|
||||
// CluterIP Service.
|
||||
@ -203,6 +178,23 @@ func findGenNameForEgressSvcResources(t *testing.T, client client.Client, svc *c
|
||||
|
||||
func clusterIPSvc(name string, extNSvc *corev1.Service) *corev1.Service {
|
||||
labels := egressSvcChildResourceLabels(extNSvc)
|
||||
ports := make([]corev1.ServicePort, len(extNSvc.Spec.Ports))
|
||||
for i, port := range extNSvc.Spec.Ports {
|
||||
ports[i] = corev1.ServicePort{ // Copy the port to avoid modifying the original.
|
||||
Name: port.Name,
|
||||
Port: port.Port,
|
||||
Protocol: port.Protocol,
|
||||
}
|
||||
if port.Name == "" {
|
||||
ports[i].Name = "tailscale-unnamed"
|
||||
}
|
||||
}
|
||||
ports = append(ports, corev1.ServicePort{
|
||||
Name: "tailscale-health-check",
|
||||
Port: 9002,
|
||||
TargetPort: intstr.FromInt(9002),
|
||||
Protocol: "TCP",
|
||||
})
|
||||
return &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
@ -212,7 +204,7 @@ func clusterIPSvc(name string, extNSvc *corev1.Service) *corev1.Service {
|
||||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
Type: corev1.ServiceTypeClusterIP,
|
||||
Ports: extNSvc.Spec.Ports,
|
||||
Ports: ports,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -257,9 +249,9 @@ func portsForEndpointSlice(svc *corev1.Service) []discoveryv1.EndpointPort {
|
||||
return ports
|
||||
}
|
||||
|
||||
func mustHaveConfigForSvc(t *testing.T, cl client.Client, extNSvc, clusterIPSvc *corev1.Service, cm *corev1.ConfigMap) {
|
||||
func mustHaveConfigForSvc(t *testing.T, cl client.Client, extNSvc, clusterIPSvc *corev1.Service, cm *corev1.ConfigMap, l *zap.Logger) {
|
||||
t.Helper()
|
||||
wantsCfg := egressSvcCfg(extNSvc, clusterIPSvc)
|
||||
wantsCfg := egressSvcCfg(extNSvc, clusterIPSvc, clusterIPSvc.Namespace, l.Sugar())
|
||||
if err := cl.Get(context.Background(), client.ObjectKeyFromObject(cm), cm); err != nil {
|
||||
t.Fatalf("Error retrieving ConfigMap: %v", err)
|
||||
}
|
||||
|
@ -777,7 +777,7 @@ func proxyClassHandlerForConnector(cl client.Client, logger *zap.SugaredLogger)
|
||||
}
|
||||
}
|
||||
|
||||
// proxyClassHandlerForConnector returns a handler that, for a given ProxyClass,
|
||||
// proxyClassHandlerForProxyGroup returns a handler that, for a given ProxyClass,
|
||||
// returns a list of reconcile requests for all Connectors that have
|
||||
// .spec.proxyClass set.
|
||||
func proxyClassHandlerForProxyGroup(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
|
||||
@ -998,7 +998,7 @@ func reconcileRequestsForPG(pg string, cl client.Client, ns string) []reconcile.
|
||||
// egressSvcsFromEgressProxyGroup is an event handler for egress ProxyGroups. It returns reconcile requests for all
|
||||
// user-created ExternalName Services that should be exposed on this ProxyGroup.
|
||||
func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
|
||||
return func(_ context.Context, o client.Object) []reconcile.Request {
|
||||
return func(ctx context.Context, o client.Object) []reconcile.Request {
|
||||
pg, ok := o.(*tsapi.ProxyGroup)
|
||||
if !ok {
|
||||
logger.Infof("[unexpected] ProxyGroup handler triggered for an object that is not a ProxyGroup")
|
||||
@ -1008,7 +1008,7 @@ func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger)
|
||||
return nil
|
||||
}
|
||||
svcList := &corev1.ServiceList{}
|
||||
if err := cl.List(context.Background(), svcList, client.MatchingFields{indexEgressProxyGroup: pg.Name}); err != nil {
|
||||
if err := cl.List(ctx, svcList, client.MatchingFields{indexEgressProxyGroup: pg.Name}); err != nil {
|
||||
logger.Infof("error listing Services: %v, skipping a reconcile for event on ProxyGroup %s", err, pg.Name)
|
||||
return nil
|
||||
}
|
||||
@ -1028,7 +1028,7 @@ func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger)
|
||||
// epsFromExternalNameService is an event handler for ExternalName Services that define a Tailscale egress service that
|
||||
// should be exposed on a ProxyGroup. It returns reconcile requests for EndpointSlices created for this Service.
|
||||
func epsFromExternalNameService(cl client.Client, logger *zap.SugaredLogger, ns string) handler.MapFunc {
|
||||
return func(_ context.Context, o client.Object) []reconcile.Request {
|
||||
return func(ctx context.Context, o client.Object) []reconcile.Request {
|
||||
svc, ok := o.(*corev1.Service)
|
||||
if !ok {
|
||||
logger.Infof("[unexpected] Service handler triggered for an object that is not a Service")
|
||||
@ -1038,7 +1038,7 @@ func epsFromExternalNameService(cl client.Client, logger *zap.SugaredLogger, ns
|
||||
return nil
|
||||
}
|
||||
epsList := &discoveryv1.EndpointSliceList{}
|
||||
if err := cl.List(context.Background(), epsList, client.InNamespace(ns),
|
||||
if err := cl.List(ctx, epsList, client.InNamespace(ns),
|
||||
client.MatchingLabels(egressSvcChildResourceLabels(svc))); err != nil {
|
||||
logger.Infof("error listing EndpointSlices: %v, skipping a reconcile for event on Service %s", err, svc.Name)
|
||||
return nil
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"tailscale.com/ipn"
|
||||
tsoperator "tailscale.com/k8s-operator"
|
||||
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
|
||||
"tailscale.com/kube/egressservices"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tstime"
|
||||
@ -166,6 +167,7 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
|
||||
r.recorder.Eventf(pg, corev1.EventTypeWarning, reasonProxyGroupCreationFailed, err.Error())
|
||||
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreationFailed, err.Error())
|
||||
}
|
||||
validateProxyClassForPG(logger, pg, proxyClass)
|
||||
if !tsoperator.ProxyClassIsReady(proxyClass) {
|
||||
message := fmt.Sprintf("the ProxyGroup's ProxyClass %s is not yet in a ready state, waiting...", proxyClassName)
|
||||
logger.Info(message)
|
||||
@ -204,6 +206,31 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
|
||||
return setStatusReady(pg, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady)
|
||||
}
|
||||
|
||||
// validateProxyClassForPG applies custom validation logic for ProxyClass applied to ProxyGroup.
|
||||
func validateProxyClassForPG(logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) {
|
||||
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
|
||||
return
|
||||
}
|
||||
// Our custom logic for ensuring minimum downtime ProxyGroup update rollouts relies on the local health check
|
||||
// beig accessible on the replica Pod IP:9002. This address can also be modified by users, via
|
||||
// TS_LOCAL_ADDR_PORT env var.
|
||||
//
|
||||
// Currently TS_LOCAL_ADDR_PORT controls Pod's health check and metrics address. _Probably_ there is no need for
|
||||
// users to set this to a custom value. Users who want to consume metrics, should integrate with the metrics
|
||||
// Service and/or ServiceMonitor, rather than Pods directly. The health check is likely not useful to integrate
|
||||
// directly with for operator proxies (and we should aim for unified lifecycle logic in the operator, users
|
||||
// shouldn't need to set their own).
|
||||
//
|
||||
// TODO(irbekrm): maybe disallow configuring this env var in future (in Tailscale 1.84 or later).
|
||||
if hasLocalAddrPortSet(pc) {
|
||||
msg := fmt.Sprintf("ProxyClass %s applied to an egress ProxyGroup has TS_LOCAL_ADDR_PORT env var set to a custom value."+
|
||||
"This will disable the ProxyGroup graceful failover mechanism, so you might experience downtime when ProxyGroup pods are restarted."+
|
||||
"In future we will remove the ability to set custom TS_LOCAL_ADDR_PORT for egress ProxyGroups."+
|
||||
"Please raise an issue if you expect that this will cause issues for your workflow.", pc.Name)
|
||||
logger.Warn(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) error {
|
||||
logger := r.logger(pg.Name)
|
||||
r.mu.Lock()
|
||||
@ -253,10 +280,11 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
return fmt.Errorf("error provisioning RoleBinding: %w", err)
|
||||
}
|
||||
if pg.Spec.Type == tsapi.ProxyGroupTypeEgress {
|
||||
cm := pgEgressCM(pg, r.tsNamespace)
|
||||
cm, hp := pgEgressCM(pg, r.tsNamespace)
|
||||
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, cm, func(existing *corev1.ConfigMap) {
|
||||
existing.ObjectMeta.Labels = cm.ObjectMeta.Labels
|
||||
existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences
|
||||
mak.Set(&existing.BinaryData, egressservices.KeyHEPPings, hp)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error provisioning egress ConfigMap %q: %w", cm.Name, err)
|
||||
}
|
||||
@ -270,7 +298,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
|
||||
return fmt.Errorf("error provisioning ingress ConfigMap %q: %w", cm.Name, err)
|
||||
}
|
||||
}
|
||||
ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode)
|
||||
ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode, proxyClass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error generating StatefulSet spec: %w", err)
|
||||
}
|
||||
|
@ -7,11 +7,14 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strconv"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"sigs.k8s.io/yaml"
|
||||
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
|
||||
"tailscale.com/kube/egressservices"
|
||||
@ -19,9 +22,12 @@ import (
|
||||
"tailscale.com/types/ptr"
|
||||
)
|
||||
|
||||
// deletionGracePeriodSeconds is set to 6 minutes to ensure that the pre-stop hook of these proxies have enough chance to terminate gracefully.
|
||||
const deletionGracePeriodSeconds int64 = 360
|
||||
|
||||
// Returns the base StatefulSet definition for a ProxyGroup. A ProxyClass may be
|
||||
// applied over the top after.
|
||||
func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string) (*appsv1.StatefulSet, error) {
|
||||
func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string, proxyClass *tsapi.ProxyClass) (*appsv1.StatefulSet, error) {
|
||||
ss := new(appsv1.StatefulSet)
|
||||
if err := yaml.Unmarshal(proxyYaml, &ss); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal proxy spec: %w", err)
|
||||
@ -145,15 +151,25 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
|
||||
}
|
||||
|
||||
if pg.Spec.Type == tsapi.ProxyGroupTypeEgress {
|
||||
envs = append(envs, corev1.EnvVar{
|
||||
Name: "TS_EGRESS_SERVICES_CONFIG_PATH",
|
||||
Value: fmt.Sprintf("/etc/proxies/%s", egressservices.KeyEgressServices),
|
||||
},
|
||||
envs = append(envs,
|
||||
// TODO(irbekrm): in 1.80 we deprecated TS_EGRESS_SERVICES_CONFIG_PATH in favour of
|
||||
// TS_EGRESS_PROXIES_CONFIG_PATH. Remove it in 1.84.
|
||||
corev1.EnvVar{
|
||||
Name: "TS_EGRESS_SERVICES_CONFIG_PATH",
|
||||
Value: fmt.Sprintf("/etc/proxies/%s", egressservices.KeyEgressServices),
|
||||
},
|
||||
corev1.EnvVar{
|
||||
Name: "TS_EGRESS_PROXIES_CONFIG_PATH",
|
||||
Value: "/etc/proxies",
|
||||
},
|
||||
corev1.EnvVar{
|
||||
Name: "TS_INTERNAL_APP",
|
||||
Value: kubetypes.AppProxyGroupEgress,
|
||||
},
|
||||
)
|
||||
corev1.EnvVar{
|
||||
Name: "TS_ENABLE_HEALTH_CHECK",
|
||||
Value: "true",
|
||||
})
|
||||
} else { // ingress
|
||||
envs = append(envs, corev1.EnvVar{
|
||||
Name: "TS_INTERNAL_APP",
|
||||
@ -167,6 +183,25 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
|
||||
return append(c.Env, envs...)
|
||||
}()
|
||||
|
||||
// The pre-stop hook is used to ensure that a replica does not get terminated while cluster traffic for egress
|
||||
// services is still being routed to it.
|
||||
//
|
||||
// This mechanism currently (2025-01-26) rely on the local health check being accessible on the Pod's
|
||||
// IP, so they are not supported for ProxyGroups where users have configured TS_LOCAL_ADDR_PORT to a custom
|
||||
// value.
|
||||
if pg.Spec.Type == tsapi.ProxyGroupTypeEgress && !hasLocalAddrPortSet(proxyClass) {
|
||||
c.Lifecycle = &corev1.Lifecycle{
|
||||
PreStop: &corev1.LifecycleHandler{
|
||||
HTTPGet: &corev1.HTTPGetAction{
|
||||
Path: kubetypes.EgessServicesPreshutdownEP,
|
||||
Port: intstr.FromInt(defaultLocalAddrPort),
|
||||
},
|
||||
},
|
||||
}
|
||||
// Set the deletion grace period to 6 minutes to ensure that the pre-stop hook has enough time to terminate
|
||||
// gracefully.
|
||||
ss.Spec.Template.DeletionGracePeriodSeconds = ptr.To(deletionGracePeriodSeconds)
|
||||
}
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
@ -258,7 +293,9 @@ func pgStateSecrets(pg *tsapi.ProxyGroup, namespace string) (secrets []*corev1.S
|
||||
return secrets
|
||||
}
|
||||
|
||||
func pgEgressCM(pg *tsapi.ProxyGroup, namespace string) *corev1.ConfigMap {
|
||||
func pgEgressCM(pg *tsapi.ProxyGroup, namespace string) (*corev1.ConfigMap, []byte) {
|
||||
hp := hepPings(pg)
|
||||
hpBs := []byte(strconv.Itoa(hp))
|
||||
return &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pgEgressCMName(pg.Name),
|
||||
@ -266,8 +303,10 @@ func pgEgressCM(pg *tsapi.ProxyGroup, namespace string) *corev1.ConfigMap {
|
||||
Labels: pgLabels(pg.Name, nil),
|
||||
OwnerReferences: pgOwnerReference(pg),
|
||||
},
|
||||
}
|
||||
BinaryData: map[string][]byte{egressservices.KeyHEPPings: hpBs},
|
||||
}, hpBs
|
||||
}
|
||||
|
||||
func pgIngressCM(pg *tsapi.ProxyGroup, namespace string) *corev1.ConfigMap {
|
||||
return &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -313,3 +352,23 @@ func pgReplicas(pg *tsapi.ProxyGroup) int32 {
|
||||
func pgEgressCMName(pg string) string {
|
||||
return fmt.Sprintf("%s-egress-config", pg)
|
||||
}
|
||||
|
||||
// hasLocalAddrPortSet returns true if the proxyclass has the TS_LOCAL_ADDR_PORT env var set. For egress ProxyGroups,
|
||||
// currently (2025-01-26) this means that the ProxyGroup does not support graceful failover.
|
||||
func hasLocalAddrPortSet(proxyClass *tsapi.ProxyClass) bool {
|
||||
if proxyClass == nil || proxyClass.Spec.StatefulSet == nil || proxyClass.Spec.StatefulSet.Pod == nil || proxyClass.Spec.StatefulSet.Pod.TailscaleContainer == nil {
|
||||
return false
|
||||
}
|
||||
return slices.ContainsFunc(proxyClass.Spec.StatefulSet.Pod.TailscaleContainer.Env, func(env tsapi.Env) bool {
|
||||
return env.Name == envVarTSLocalAddrPort
|
||||
})
|
||||
}
|
||||
|
||||
// hepPings returns the number of times a health check endpoint exposed by a Service fronting ProxyGroup replicas should
|
||||
// be pinged to ensure that all currently configured backend replicas are hit.
|
||||
func hepPings(pg *tsapi.ProxyGroup) int {
|
||||
rc := pgReplicas(pg)
|
||||
// Assuming a Service implemented using round robin load balancing, number-of-replica-times should be enough, but in
|
||||
// practice, we cannot assume that the requests will be load balanced perfectly.
|
||||
return int(rc) * 3
|
||||
}
|
||||
|
@ -19,13 +19,13 @@ import (
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"tailscale.com/client/tailscale"
|
||||
tsoperator "tailscale.com/k8s-operator"
|
||||
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
|
||||
"tailscale.com/kube/egressservices"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/types/ptr"
|
||||
@ -97,7 +97,7 @@ func TestProxyGroup(t *testing.T) {
|
||||
|
||||
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionFalse, reasonProxyGroupCreating, "the ProxyGroup's ProxyClass default-pc is not yet in a ready state, waiting...", 0, cl, zl.Sugar())
|
||||
expectEqual(t, fc, pg)
|
||||
expectProxyGroupResources(t, fc, pg, false, "")
|
||||
expectProxyGroupResources(t, fc, pg, false, "", pc)
|
||||
})
|
||||
|
||||
t.Run("observe_ProxyGroupCreating_status_reason", func(t *testing.T) {
|
||||
@ -118,11 +118,11 @@ func TestProxyGroup(t *testing.T) {
|
||||
|
||||
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionFalse, reasonProxyGroupCreating, "0/2 ProxyGroup pods running", 0, cl, zl.Sugar())
|
||||
expectEqual(t, fc, pg)
|
||||
expectProxyGroupResources(t, fc, pg, true, "")
|
||||
expectProxyGroupResources(t, fc, pg, true, "", pc)
|
||||
if expected := 1; reconciler.egressProxyGroups.Len() != expected {
|
||||
t.Fatalf("expected %d egress ProxyGroups, got %d", expected, reconciler.egressProxyGroups.Len())
|
||||
}
|
||||
expectProxyGroupResources(t, fc, pg, true, "")
|
||||
expectProxyGroupResources(t, fc, pg, true, "", pc)
|
||||
keyReq := tailscale.KeyCapabilities{
|
||||
Devices: tailscale.KeyDeviceCapabilities{
|
||||
Create: tailscale.KeyDeviceCreateCapabilities{
|
||||
@ -154,7 +154,7 @@ func TestProxyGroup(t *testing.T) {
|
||||
}
|
||||
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady, 0, cl, zl.Sugar())
|
||||
expectEqual(t, fc, pg)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc)
|
||||
})
|
||||
|
||||
t.Run("scale_up_to_3", func(t *testing.T) {
|
||||
@ -165,7 +165,7 @@ func TestProxyGroup(t *testing.T) {
|
||||
expectReconciled(t, reconciler, "", pg.Name)
|
||||
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionFalse, reasonProxyGroupCreating, "2/3 ProxyGroup pods running", 0, cl, zl.Sugar())
|
||||
expectEqual(t, fc, pg)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc)
|
||||
|
||||
addNodeIDToStateSecrets(t, fc, pg)
|
||||
expectReconciled(t, reconciler, "", pg.Name)
|
||||
@ -175,7 +175,7 @@ func TestProxyGroup(t *testing.T) {
|
||||
TailnetIPs: []string{"1.2.3.4", "::1"},
|
||||
})
|
||||
expectEqual(t, fc, pg)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc)
|
||||
})
|
||||
|
||||
t.Run("scale_down_to_1", func(t *testing.T) {
|
||||
@ -188,7 +188,7 @@ func TestProxyGroup(t *testing.T) {
|
||||
|
||||
pg.Status.Devices = pg.Status.Devices[:1] // truncate to only the first device.
|
||||
expectEqual(t, fc, pg)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash)
|
||||
expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc)
|
||||
})
|
||||
|
||||
t.Run("trigger_config_change_and_observe_new_config_hash", func(t *testing.T) {
|
||||
@ -202,7 +202,7 @@ func TestProxyGroup(t *testing.T) {
|
||||
expectReconciled(t, reconciler, "", pg.Name)
|
||||
|
||||
expectEqual(t, fc, pg)
|
||||
expectProxyGroupResources(t, fc, pg, true, "518a86e9fae64f270f8e0ec2a2ea6ca06c10f725035d3d6caca132cd61e42a74")
|
||||
expectProxyGroupResources(t, fc, pg, true, "518a86e9fae64f270f8e0ec2a2ea6ca06c10f725035d3d6caca132cd61e42a74", pc)
|
||||
})
|
||||
|
||||
t.Run("enable_metrics", func(t *testing.T) {
|
||||
@ -246,12 +246,29 @@ func TestProxyGroup(t *testing.T) {
|
||||
// The fake client does not clean up objects whose owner has been
|
||||
// deleted, so we can't test for the owned resources getting deleted.
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestProxyGroupTypes(t *testing.T) {
|
||||
pc := &tsapi.ProxyClass{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test",
|
||||
Generation: 1,
|
||||
},
|
||||
Spec: tsapi.ProxyClassSpec{},
|
||||
}
|
||||
fc := fake.NewClientBuilder().
|
||||
WithScheme(tsapi.GlobalScheme).
|
||||
WithObjects(pc).
|
||||
WithStatusSubresource(pc).
|
||||
Build()
|
||||
mustUpdateStatus(t, fc, "", pc.Name, func(p *tsapi.ProxyClass) {
|
||||
p.Status.Conditions = []metav1.Condition{{
|
||||
Type: string(tsapi.ProxyClassReady),
|
||||
Status: metav1.ConditionTrue,
|
||||
ObservedGeneration: 1,
|
||||
}}
|
||||
})
|
||||
|
||||
zl, _ := zap.NewDevelopment()
|
||||
reconciler := &ProxyGroupReconciler{
|
||||
@ -274,9 +291,7 @@ func TestProxyGroupTypes(t *testing.T) {
|
||||
Replicas: ptr.To[int32](0),
|
||||
},
|
||||
}
|
||||
if err := fc.Create(context.Background(), pg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mustCreate(t, fc, pg)
|
||||
|
||||
expectReconciled(t, reconciler, "", pg.Name)
|
||||
verifyProxyGroupCounts(t, reconciler, 0, 1)
|
||||
@ -286,7 +301,8 @@ func TestProxyGroupTypes(t *testing.T) {
|
||||
t.Fatalf("failed to get StatefulSet: %v", err)
|
||||
}
|
||||
verifyEnvVar(t, sts, "TS_INTERNAL_APP", kubetypes.AppProxyGroupEgress)
|
||||
verifyEnvVar(t, sts, "TS_EGRESS_SERVICES_CONFIG_PATH", fmt.Sprintf("/etc/proxies/%s", egressservices.KeyEgressServices))
|
||||
verifyEnvVar(t, sts, "TS_EGRESS_PROXIES_CONFIG_PATH", "/etc/proxies")
|
||||
verifyEnvVar(t, sts, "TS_ENABLE_HEALTH_CHECK", "true")
|
||||
|
||||
// Verify that egress configuration has been set up.
|
||||
cm := &corev1.ConfigMap{}
|
||||
@ -323,6 +339,57 @@ func TestProxyGroupTypes(t *testing.T) {
|
||||
if diff := cmp.Diff(expectedVolumeMounts, sts.Spec.Template.Spec.Containers[0].VolumeMounts); diff != "" {
|
||||
t.Errorf("unexpected volume mounts (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
expectedLifecycle := corev1.Lifecycle{
|
||||
PreStop: &corev1.LifecycleHandler{
|
||||
HTTPGet: &corev1.HTTPGetAction{
|
||||
Path: kubetypes.EgessServicesPreshutdownEP,
|
||||
Port: intstr.FromInt(defaultLocalAddrPort),
|
||||
},
|
||||
},
|
||||
}
|
||||
if diff := cmp.Diff(expectedLifecycle, *sts.Spec.Template.Spec.Containers[0].Lifecycle); diff != "" {
|
||||
t.Errorf("unexpected lifecycle (-want +got):\n%s", diff)
|
||||
}
|
||||
if *sts.Spec.Template.DeletionGracePeriodSeconds != deletionGracePeriodSeconds {
|
||||
t.Errorf("unexpected deletion grace period seconds %d, want %d", *sts.Spec.Template.DeletionGracePeriodSeconds, deletionGracePeriodSeconds)
|
||||
}
|
||||
})
|
||||
t.Run("egress_type_no_lifecycle_hook_when_local_addr_port_set", func(t *testing.T) {
|
||||
pg := &tsapi.ProxyGroup{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-egress-no-lifecycle",
|
||||
UID: "test-egress-no-lifecycle-uid",
|
||||
},
|
||||
Spec: tsapi.ProxyGroupSpec{
|
||||
Type: tsapi.ProxyGroupTypeEgress,
|
||||
Replicas: ptr.To[int32](0),
|
||||
ProxyClass: "test",
|
||||
},
|
||||
}
|
||||
mustCreate(t, fc, pg)
|
||||
mustUpdate(t, fc, "", pc.Name, func(p *tsapi.ProxyClass) {
|
||||
p.Spec.StatefulSet = &tsapi.StatefulSet{
|
||||
Pod: &tsapi.Pod{
|
||||
TailscaleContainer: &tsapi.Container{
|
||||
Env: []tsapi.Env{{
|
||||
Name: "TS_LOCAL_ADDR_PORT",
|
||||
Value: "127.0.0.1:8080",
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
})
|
||||
expectReconciled(t, reconciler, "", pg.Name)
|
||||
|
||||
sts := &appsv1.StatefulSet{}
|
||||
if err := fc.Get(context.Background(), client.ObjectKey{Namespace: tsNamespace, Name: pg.Name}, sts); err != nil {
|
||||
t.Fatalf("failed to get StatefulSet: %v", err)
|
||||
}
|
||||
|
||||
if sts.Spec.Template.Spec.Containers[0].Lifecycle != nil {
|
||||
t.Error("lifecycle hook was set when TS_LOCAL_ADDR_PORT was configured via ProxyClass")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ingress_type", func(t *testing.T) {
|
||||
@ -341,7 +408,7 @@ func TestProxyGroupTypes(t *testing.T) {
|
||||
}
|
||||
|
||||
expectReconciled(t, reconciler, "", pg.Name)
|
||||
verifyProxyGroupCounts(t, reconciler, 1, 1)
|
||||
verifyProxyGroupCounts(t, reconciler, 1, 2)
|
||||
|
||||
sts := &appsv1.StatefulSet{}
|
||||
if err := fc.Get(context.Background(), client.ObjectKey{Namespace: tsNamespace, Name: pg.Name}, sts); err != nil {
|
||||
@ -402,13 +469,13 @@ func verifyEnvVar(t *testing.T, sts *appsv1.StatefulSet, name, expectedValue str
|
||||
t.Errorf("%s environment variable not found", name)
|
||||
}
|
||||
|
||||
func expectProxyGroupResources(t *testing.T, fc client.WithWatch, pg *tsapi.ProxyGroup, shouldExist bool, cfgHash string) {
|
||||
func expectProxyGroupResources(t *testing.T, fc client.WithWatch, pg *tsapi.ProxyGroup, shouldExist bool, cfgHash string, proxyClass *tsapi.ProxyClass) {
|
||||
t.Helper()
|
||||
|
||||
role := pgRole(pg, tsNamespace)
|
||||
roleBinding := pgRoleBinding(pg, tsNamespace)
|
||||
serviceAccount := pgServiceAccount(pg, tsNamespace)
|
||||
statefulSet, err := pgStatefulSet(pg, tsNamespace, testProxyImage, "auto")
|
||||
statefulSet, err := pgStatefulSet(pg, tsNamespace, testProxyImage, "auto", proxyClass)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -101,6 +101,9 @@ const (
|
||||
proxyTypeIngressResource = "ingress_resource"
|
||||
proxyTypeConnector = "connector"
|
||||
proxyTypeProxyGroup = "proxygroup"
|
||||
|
||||
envVarTSLocalAddrPort = "TS_LOCAL_ADDR_PORT"
|
||||
defaultLocalAddrPort = 9002 // metrics and health check port
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -75,16 +75,6 @@ func RemoveServiceCondition(svc *corev1.Service, conditionType tsapi.ConditionTy
|
||||
})
|
||||
}
|
||||
|
||||
func EgressServiceIsValidAndConfigured(svc *corev1.Service) bool {
|
||||
for _, typ := range []tsapi.ConditionType{tsapi.EgressSvcValid, tsapi.EgressSvcConfigured} {
|
||||
cond := GetServiceCondition(svc, typ)
|
||||
if cond == nil || cond.Status != metav1.ConditionTrue {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// SetRecorderCondition ensures that Recorder status has a condition with the
|
||||
// given attributes. LastTransitionTime gets set every time condition's status
|
||||
// changes.
|
||||
|
@ -13,9 +13,15 @@ import (
|
||||
"net/netip"
|
||||
)
|
||||
|
||||
// KeyEgressServices is name of the proxy state Secret field that contains the
|
||||
// currently applied egress proxy config.
|
||||
const KeyEgressServices = "egress-services"
|
||||
const (
|
||||
// KeyEgressServices is name of the proxy state Secret field that contains the
|
||||
// currently applied egress proxy config.
|
||||
KeyEgressServices = "egress-services"
|
||||
|
||||
// KeyHEPPings is the number of times an egress service health check endpoint needs to be pinged to ensure that
|
||||
// each currently configured backend is hit. In practice, it depends on the number of ProxyGroup replicas.
|
||||
KeyHEPPings = "hep-pings"
|
||||
)
|
||||
|
||||
// Configs contains the desired configuration for egress services keyed by
|
||||
// service name.
|
||||
@ -24,6 +30,7 @@ type Configs map[string]Config
|
||||
// Config is an egress service configuration.
|
||||
// TODO(irbekrm): version this?
|
||||
type Config struct {
|
||||
HealthCheckEndpoint string `json:"healthCheckEndpoint"`
|
||||
// TailnetTarget is the target to which cluster traffic for this service
|
||||
// should be proxied.
|
||||
TailnetTarget TailnetTarget `json:"tailnetTarget"`
|
||||
|
@ -55,7 +55,7 @@ func Test_jsonMarshalConfig(t *testing.T) {
|
||||
protocol: "tcp",
|
||||
matchPort: 4003,
|
||||
targetPort: 80,
|
||||
wantsBs: []byte(`{"tailnetTarget":{"ip":"","fqdn":""},"ports":[{"protocol":"tcp","matchPort":4003,"targetPort":80}]}`),
|
||||
wantsBs: []byte(`{"healthCheckEndpoint":"","tailnetTarget":{"ip":"","fqdn":""},"ports":[{"protocol":"tcp","matchPort":4003,"targetPort":80}]}`),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
@ -43,4 +43,9 @@ const (
|
||||
// that cluster workloads behind the Ingress can now be accessed via the given DNS name over HTTPS.
|
||||
KeyHTTPSEndpoint string = "https_endpoint"
|
||||
ValueNoHTTPS string = "no-https"
|
||||
|
||||
// Pod's IPv4 address header key as returned by containerboot health check endpoint.
|
||||
PodIPv4Header string = "Pod-IPv4"
|
||||
|
||||
EgessServicesPreshutdownEP = "/internal-egress-services-preshutdown"
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user