mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-30 07:43:42 +00:00

* Modifies the k8s-proxy to expose health check and metrics endpoints on the Pod's IP. * Moves cmd/containerboot/healthz.go and cmd/containerboot/metrics.go to /kube to be shared with /k8s-proxy. Updates #13358 Signed-off-by: David Bond <davidsbond93@gmail.com>
475 lines
13 KiB
Go
475 lines
13 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
//go:build !plan9
|
|
|
|
// k8s-proxy proxies between tailnet and Kubernetes cluster traffic.
|
|
// Currently, it only supports proxying tailnet clients to the Kubernetes API
|
|
// server.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"golang.org/x/sync/errgroup"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/utils/strings/slices"
|
|
"tailscale.com/client/local"
|
|
"tailscale.com/cmd/k8s-proxy/internal/config"
|
|
"tailscale.com/hostinfo"
|
|
"tailscale.com/ipn"
|
|
"tailscale.com/ipn/store"
|
|
apiproxy "tailscale.com/k8s-operator/api-proxy"
|
|
"tailscale.com/kube/certs"
|
|
healthz "tailscale.com/kube/health"
|
|
"tailscale.com/kube/k8s-proxy/conf"
|
|
"tailscale.com/kube/kubetypes"
|
|
klc "tailscale.com/kube/localclient"
|
|
"tailscale.com/kube/metrics"
|
|
"tailscale.com/kube/services"
|
|
"tailscale.com/kube/state"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/tsnet"
|
|
)
|
|
|
|
func main() {
|
|
encoderCfg := zap.NewProductionEncoderConfig()
|
|
encoderCfg.EncodeTime = zapcore.RFC3339TimeEncoder
|
|
logger := zap.Must(zap.Config{
|
|
Level: zap.NewAtomicLevelAt(zap.DebugLevel),
|
|
Encoding: "json",
|
|
OutputPaths: []string{"stderr"},
|
|
ErrorOutputPaths: []string{"stderr"},
|
|
EncoderConfig: encoderCfg,
|
|
}.Build()).Sugar()
|
|
defer logger.Sync()
|
|
|
|
if err := run(logger); err != nil {
|
|
logger.Fatal(err.Error())
|
|
}
|
|
}
|
|
|
|
func run(logger *zap.SugaredLogger) error {
|
|
var (
|
|
configPath = os.Getenv("TS_K8S_PROXY_CONFIG")
|
|
podUID = os.Getenv("POD_UID")
|
|
podIP = os.Getenv("POD_IP")
|
|
)
|
|
if configPath == "" {
|
|
return errors.New("TS_K8S_PROXY_CONFIG unset")
|
|
}
|
|
|
|
// serveCtx to live for the lifetime of the process, only gets cancelled
|
|
// once the Tailscale Service has been drained
|
|
serveCtx, serveCancel := context.WithCancel(context.Background())
|
|
defer serveCancel()
|
|
|
|
// ctx to cancel to start the shutdown process.
|
|
ctx, cancel := context.WithCancel(serveCtx)
|
|
defer cancel()
|
|
|
|
sigsChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigsChan, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
case s := <-sigsChan:
|
|
logger.Infof("Received shutdown signal %s, exiting", s)
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
var group *errgroup.Group
|
|
group, ctx = errgroup.WithContext(ctx)
|
|
|
|
restConfig, err := getRestConfig(logger)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting rest config: %w", err)
|
|
}
|
|
clientset, err := kubernetes.NewForConfig(restConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating Kubernetes clientset: %w", err)
|
|
}
|
|
|
|
// Load and watch config.
|
|
cfgChan := make(chan *conf.Config)
|
|
cfgLoader := config.NewConfigLoader(logger, clientset.CoreV1(), cfgChan)
|
|
group.Go(func() error {
|
|
return cfgLoader.WatchConfig(ctx, configPath)
|
|
})
|
|
|
|
// Get initial config.
|
|
var cfg *conf.Config
|
|
select {
|
|
case <-ctx.Done():
|
|
return group.Wait()
|
|
case cfg = <-cfgChan:
|
|
}
|
|
|
|
if cfg.Parsed.LogLevel != nil {
|
|
level, err := zapcore.ParseLevel(*cfg.Parsed.LogLevel)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing log level %q: %w", *cfg.Parsed.LogLevel, err)
|
|
}
|
|
logger = logger.WithOptions(zap.IncreaseLevel(level))
|
|
}
|
|
|
|
// TODO:(ChaosInTheCRD) This is a temporary workaround until we can set static endpoints using prefs
|
|
if se := cfg.Parsed.StaticEndpoints; len(se) > 0 {
|
|
logger.Debugf("setting static endpoints '%v' via TS_DEBUG_PRETENDPOINT environment variable", cfg.Parsed.StaticEndpoints)
|
|
ses := make([]string, len(se))
|
|
for i, e := range se {
|
|
ses[i] = e.String()
|
|
}
|
|
|
|
err := os.Setenv("TS_DEBUG_PRETENDPOINT", strings.Join(ses, ","))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if cfg.Parsed.App != nil {
|
|
hostinfo.SetApp(*cfg.Parsed.App)
|
|
}
|
|
|
|
// TODO(tomhjp): Pass this setting directly into the store instead of using
|
|
// environment variables.
|
|
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
|
|
os.Setenv("TS_CERT_SHARE_MODE", "rw")
|
|
} else {
|
|
os.Setenv("TS_CERT_SHARE_MODE", "ro")
|
|
}
|
|
|
|
st, err := getStateStore(cfg.Parsed.State, logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If Pod UID unset, assume we're running outside of a cluster/not managed
|
|
// by the operator, so no need to set additional state keys.
|
|
if podUID != "" {
|
|
if err := state.SetInitialKeys(st, podUID); err != nil {
|
|
return fmt.Errorf("error setting initial state: %w", err)
|
|
}
|
|
}
|
|
|
|
var authKey string
|
|
if cfg.Parsed.AuthKey != nil {
|
|
authKey = *cfg.Parsed.AuthKey
|
|
}
|
|
|
|
ts := &tsnet.Server{
|
|
Logf: logger.Named("tsnet").Debugf,
|
|
UserLogf: logger.Named("tsnet").Infof,
|
|
Store: st,
|
|
AuthKey: authKey,
|
|
}
|
|
|
|
if cfg.Parsed.ServerURL != nil {
|
|
ts.ControlURL = *cfg.Parsed.ServerURL
|
|
}
|
|
|
|
if cfg.Parsed.Hostname != nil {
|
|
ts.Hostname = *cfg.Parsed.Hostname
|
|
}
|
|
|
|
// Make sure we crash loop if Up doesn't complete in reasonable time.
|
|
upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
|
|
defer upCancel()
|
|
if _, err := ts.Up(upCtx); err != nil {
|
|
return fmt.Errorf("error starting tailscale server: %w", err)
|
|
}
|
|
defer ts.Close()
|
|
lc, err := ts.LocalClient()
|
|
if err != nil {
|
|
return fmt.Errorf("error getting local client: %w", err)
|
|
}
|
|
|
|
// Setup for updating state keys.
|
|
if podUID != "" {
|
|
group.Go(func() error {
|
|
return state.KeepKeysUpdated(ctx, st, klc.New(lc))
|
|
})
|
|
}
|
|
|
|
if cfg.Parsed.HealthCheckEnabled.EqualBool(true) || cfg.Parsed.MetricsEnabled.EqualBool(true) {
|
|
addr := podIP
|
|
if addr == "" {
|
|
addr = cfg.GetLocalAddr()
|
|
}
|
|
|
|
addrPort := getLocalAddrPort(addr, cfg.GetLocalPort())
|
|
mux := http.NewServeMux()
|
|
localSrv := &http.Server{Addr: addrPort, Handler: mux}
|
|
|
|
if cfg.Parsed.MetricsEnabled.EqualBool(true) {
|
|
logger.Infof("Running metrics endpoint at %s/metrics", addrPort)
|
|
metrics.RegisterMetricsHandlers(mux, lc, "")
|
|
}
|
|
|
|
if cfg.Parsed.HealthCheckEnabled.EqualBool(true) {
|
|
ipV4, _ := ts.TailscaleIPs()
|
|
hz := healthz.RegisterHealthHandlers(mux, ipV4.String(), logger.Infof)
|
|
group.Go(func() error {
|
|
err := hz.MonitorHealth(ctx, lc)
|
|
if err == nil || errors.Is(err, context.Canceled) {
|
|
return nil
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
|
|
group.Go(func() error {
|
|
errChan := make(chan error)
|
|
go func() {
|
|
if err := localSrv.ListenAndServe(); err != nil {
|
|
errChan <- err
|
|
}
|
|
close(errChan)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
sCtx, scancel := context.WithTimeout(serveCtx, 10*time.Second)
|
|
defer scancel()
|
|
return localSrv.Shutdown(sCtx)
|
|
case err := <-errChan:
|
|
return err
|
|
}
|
|
})
|
|
}
|
|
|
|
if v, ok := cfg.Parsed.AcceptRoutes.Get(); ok {
|
|
_, err = lc.EditPrefs(ctx, &ipn.MaskedPrefs{
|
|
RouteAllSet: true,
|
|
Prefs: ipn.Prefs{RouteAll: v},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error editing prefs: %w", err)
|
|
}
|
|
}
|
|
|
|
// TODO(tomhjp): There seems to be a bug that on restart the device does
|
|
// not get reassigned it's already working Service IPs unless we clear and
|
|
// reset the serve config.
|
|
if err := lc.SetServeConfig(ctx, &ipn.ServeConfig{}); err != nil {
|
|
return fmt.Errorf("error clearing existing ServeConfig: %w", err)
|
|
}
|
|
|
|
var cm *certs.CertManager
|
|
if shouldIssueCerts(cfg) {
|
|
logger.Infof("Will issue TLS certs for Tailscale Service")
|
|
cm = certs.NewCertManager(klc.New(lc), logger.Infof)
|
|
}
|
|
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if cfg.Parsed.AdvertiseServices != nil {
|
|
if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
|
|
AdvertiseServicesSet: true,
|
|
Prefs: ipn.Prefs{
|
|
AdvertiseServices: cfg.Parsed.AdvertiseServices,
|
|
},
|
|
}); err != nil {
|
|
return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
|
|
}
|
|
}
|
|
|
|
// Setup for the API server proxy.
|
|
mode := kubetypes.APIServerProxyModeAuth
|
|
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil {
|
|
mode = *cfg.Parsed.APIServerProxy.Mode
|
|
}
|
|
ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating api server proxy: %w", err)
|
|
}
|
|
|
|
group.Go(func() error {
|
|
if err := ap.Run(serveCtx); err != nil {
|
|
return fmt.Errorf("error running API server proxy: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// Context cancelled, exit.
|
|
logger.Info("Context cancelled, exiting")
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(serveCtx, 20*time.Second)
|
|
unadvertiseErr := services.EnsureServicesNotAdvertised(shutdownCtx, lc, logger.Infof)
|
|
shutdownCancel()
|
|
serveCancel()
|
|
return errors.Join(unadvertiseErr, group.Wait())
|
|
case cfg = <-cfgChan:
|
|
// Handle config reload.
|
|
// TODO(tomhjp): Make auth mode reloadable.
|
|
var prefs ipn.MaskedPrefs
|
|
cfgLogger := logger
|
|
currentPrefs, err := lc.GetPrefs(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting current prefs: %w", err)
|
|
}
|
|
if !slices.Equal(currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices) {
|
|
cfgLogger = cfgLogger.With("AdvertiseServices", fmt.Sprintf("%v -> %v", currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices))
|
|
prefs.AdvertiseServicesSet = true
|
|
prefs.Prefs.AdvertiseServices = cfg.Parsed.AdvertiseServices
|
|
}
|
|
if cfg.Parsed.Hostname != nil && *cfg.Parsed.Hostname != currentPrefs.Hostname {
|
|
cfgLogger = cfgLogger.With("Hostname", fmt.Sprintf("%s -> %s", currentPrefs.Hostname, *cfg.Parsed.Hostname))
|
|
prefs.HostnameSet = true
|
|
prefs.Hostname = *cfg.Parsed.Hostname
|
|
}
|
|
if v, ok := cfg.Parsed.AcceptRoutes.Get(); ok && v != currentPrefs.RouteAll {
|
|
cfgLogger = cfgLogger.With("AcceptRoutes", fmt.Sprintf("%v -> %v", currentPrefs.RouteAll, v))
|
|
prefs.RouteAllSet = true
|
|
prefs.Prefs.RouteAll = v
|
|
}
|
|
if !prefs.IsEmpty() {
|
|
if _, err := lc.EditPrefs(ctx, &prefs); err != nil {
|
|
return fmt.Errorf("error editing prefs: %w", err)
|
|
}
|
|
}
|
|
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
|
|
return fmt.Errorf("error setting serve config: %w", err)
|
|
}
|
|
|
|
cfgLogger.Infof("Config reloaded")
|
|
}
|
|
}
|
|
}
|
|
|
|
func getLocalAddrPort(addr string, port uint16) string {
|
|
return net.JoinHostPort(addr, strconv.FormatUint(uint64(port), 10))
|
|
}
|
|
|
|
func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
|
|
p := "mem:"
|
|
if path != nil {
|
|
p = *path
|
|
} else {
|
|
logger.Warn("No state Secret provided; using in-memory store, which will lose state on restart")
|
|
}
|
|
st, err := store.New(logger.Errorf, p)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating state store: %w", err)
|
|
}
|
|
|
|
return st, nil
|
|
}
|
|
|
|
func getRestConfig(logger *zap.SugaredLogger) (*rest.Config, error) {
|
|
restConfig, err := rest.InClusterConfig()
|
|
switch err {
|
|
case nil:
|
|
return restConfig, nil
|
|
case rest.ErrNotInCluster:
|
|
logger.Info("Not running in-cluster, falling back to kubeconfig")
|
|
default:
|
|
return nil, fmt.Errorf("error getting in-cluster config: %w", err)
|
|
}
|
|
|
|
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
|
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
|
|
restConfig, err = clientConfig.ClientConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error loading kubeconfig: %w", err)
|
|
}
|
|
|
|
return restConfig, nil
|
|
}
|
|
|
|
func apiServerProxyService(cfg *conf.Config) tailcfg.ServiceName {
|
|
if cfg.Parsed.APIServerProxy != nil &&
|
|
cfg.Parsed.APIServerProxy.Enabled.EqualBool(true) &&
|
|
cfg.Parsed.APIServerProxy.ServiceName != nil &&
|
|
*cfg.Parsed.APIServerProxy.ServiceName != "" {
|
|
return tailcfg.ServiceName(*cfg.Parsed.APIServerProxy.ServiceName)
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func shouldIssueCerts(cfg *conf.Config) bool {
|
|
return cfg.Parsed.APIServerProxy != nil &&
|
|
cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true)
|
|
}
|
|
|
|
// setServeConfig sets up serve config such that it's serving for the passed in
|
|
// Tailscale Service, and does nothing if it's already up to date.
|
|
func setServeConfig(ctx context.Context, lc *local.Client, cm *certs.CertManager, name tailcfg.ServiceName) error {
|
|
existingServeConfig, err := lc.GetServeConfig(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting existing serve config: %w", err)
|
|
}
|
|
|
|
// Ensure serve config is cleared if no Tailscale Service.
|
|
if name == "" {
|
|
if reflect.DeepEqual(*existingServeConfig, ipn.ServeConfig{}) {
|
|
// Already up to date.
|
|
return nil
|
|
}
|
|
|
|
if cm != nil {
|
|
cm.EnsureCertLoops(ctx, &ipn.ServeConfig{})
|
|
}
|
|
return lc.SetServeConfig(ctx, &ipn.ServeConfig{})
|
|
}
|
|
|
|
status, err := lc.StatusWithoutPeers(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting local client status: %w", err)
|
|
}
|
|
serviceHostPort := ipn.HostPort(fmt.Sprintf("%s.%s:443", name.WithoutPrefix(), status.CurrentTailnet.MagicDNSSuffix))
|
|
|
|
serveConfig := ipn.ServeConfig{
|
|
// Configure for the Service hostname.
|
|
Services: map[tailcfg.ServiceName]*ipn.ServiceConfig{
|
|
name: {
|
|
TCP: map[uint16]*ipn.TCPPortHandler{
|
|
443: {
|
|
HTTPS: true,
|
|
},
|
|
},
|
|
Web: map[ipn.HostPort]*ipn.WebServerConfig{
|
|
serviceHostPort: {
|
|
Handlers: map[string]*ipn.HTTPHandler{
|
|
"/": {
|
|
Proxy: fmt.Sprintf("http://%s:80", strings.TrimSuffix(status.Self.DNSName, ".")),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if reflect.DeepEqual(*existingServeConfig, serveConfig) {
|
|
// Already up to date.
|
|
return nil
|
|
}
|
|
|
|
if cm != nil {
|
|
cm.EnsureCertLoops(ctx, &serveConfig)
|
|
}
|
|
return lc.SetServeConfig(ctx, &serveConfig)
|
|
}
|