WIP:cmd/k8s-operator: run tailscaled and api server proxy on for leader replicas

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2023-11-14 07:25:03 +01:00
parent b015021747
commit 1c802077bc
3 changed files with 211 additions and 135 deletions

View File

@ -37,6 +37,7 @@ import (
"tailscale.com/ipn"
"tailscale.com/ipn/store/kubestore"
"tailscale.com/tsnet"
"tailscale.com/types/lazy"
"tailscale.com/types/logger"
"tailscale.com/version"
)
@ -83,123 +84,158 @@ func main() {
hostinfo.SetApp("k8s-operator-proxy")
}
s, tsClient := initTSNet(zlog)
defer s.Close()
restConfig := config.GetConfigOrDie()
maybeLaunchAPIServerProxy(zlog, restConfig, s, mode)
runReconcilers(zlog, s, tsNamespace, restConfig, tsClient, image, priorityClassName, tags, tsFirewallMode)
c := managerConfig{
apiServerProxyMode: mode,
tsNamespace: tsNamespace,
restConfig: restConfig,
zlog: zlog,
tsFirewallMode: tsFirewallMode,
proxyImage: image,
priorityClassName: priorityClassName,
tags: tags,
ts: initTSNet(zlog),
}
runReconcilers(c)
}
type managerConfig struct {
// apiserver proxy mode
apiServerProxyMode apiServerProxyMode
tsNamespace string
restConfig *rest.Config
zlog *zap.SugaredLogger
tsFirewallMode string
proxyImage string
priorityClassName string
tags string
ts tsSetupFunc
}
type tsSetup struct {
server *tsnet.Server
client *tailscale.Client
}
// initTSNet initializes the tsnet.Server and logs in to Tailscale. It uses the
// CLIENT_ID_FILE and CLIENT_SECRET_FILE environment variables to authenticate
// with Tailscale.
func initTSNet(zlog *zap.SugaredLogger) (*tsnet.Server, *tailscale.Client) {
var (
clientIDPath = defaultEnv("CLIENT_ID_FILE", "")
clientSecretPath = defaultEnv("CLIENT_SECRET_FILE", "")
hostname = defaultEnv("OPERATOR_HOSTNAME", "tailscale-operator")
kubeSecret = defaultEnv("OPERATOR_SECRET", "")
operatorTags = defaultEnv("OPERATOR_INITIAL_TAGS", "tag:k8s-operator")
)
startlog := zlog.Named("startup")
if clientIDPath == "" || clientSecretPath == "" {
startlog.Fatalf("CLIENT_ID_FILE and CLIENT_SECRET_FILE must be set")
}
clientID, err := os.ReadFile(clientIDPath)
if err != nil {
startlog.Fatalf("reading client ID %q: %v", clientIDPath, err)
}
clientSecret, err := os.ReadFile(clientSecretPath)
if err != nil {
startlog.Fatalf("reading client secret %q: %v", clientSecretPath, err)
}
credentials := clientcredentials.Config{
ClientID: string(clientID),
ClientSecret: string(clientSecret),
TokenURL: "https://login.tailscale.com/api/v2/oauth/token",
}
tsClient := tailscale.NewClient("-", nil)
tsClient.HTTPClient = credentials.Client(context.Background())
s := &tsnet.Server{
Hostname: hostname,
Logf: zlog.Named("tailscaled").Debugf,
}
if kubeSecret != "" {
st, err := kubestore.New(logger.Discard, kubeSecret)
if err != nil {
startlog.Fatalf("creating kube store: %v", err)
func initTSNet(zlog *zap.SugaredLogger) tsSetupFunc {
ts := lazy.SyncFunc(func() tsSetup {
var (
clientIDPath = defaultEnv("CLIENT_ID_FILE", "")
clientSecretPath = defaultEnv("CLIENT_SECRET_FILE", "")
hostname = defaultEnv("OPERATOR_HOSTNAME", "tailscale-operator")
kubeSecret = defaultEnv("OPERATOR_SECRET", "")
operatorTags = defaultEnv("OPERATOR_INITIAL_TAGS", "tag:k8s-operator")
)
startlog := zlog.Named("startup")
if clientIDPath == "" || clientSecretPath == "" {
startlog.Fatalf("CLIENT_ID_FILE and CLIENT_SECRET_FILE must be set")
}
s.Store = st
}
if err := s.Start(); err != nil {
startlog.Fatalf("starting tailscale server: %v", err)
}
lc, err := s.LocalClient()
if err != nil {
startlog.Fatalf("getting local client: %v", err)
}
ctx := context.Background()
loginDone := false
machineAuthShown := false
waitOnline:
for {
startlog.Debugf("querying tailscaled status")
st, err := lc.StatusWithoutPeers(ctx)
clientID, err := os.ReadFile(clientIDPath)
if err != nil {
startlog.Fatalf("getting status: %v", err)
startlog.Fatalf("reading client ID %q: %v", clientIDPath, err)
}
switch st.BackendState {
case "Running":
break waitOnline
case "NeedsLogin":
if loginDone {
break
}
caps := tailscale.KeyCapabilities{
Devices: tailscale.KeyDeviceCapabilities{
Create: tailscale.KeyDeviceCreateCapabilities{
Reusable: false,
Preauthorized: true,
Tags: strings.Split(operatorTags, ","),
},
},
}
authkey, _, err := tsClient.CreateKey(ctx, caps)
clientSecret, err := os.ReadFile(clientSecretPath)
if err != nil {
startlog.Fatalf("reading client secret %q: %v", clientSecretPath, err)
}
credentials := clientcredentials.Config{
ClientID: string(clientID),
ClientSecret: string(clientSecret),
TokenURL: "https://login.tailscale.com/api/v2/oauth/token",
}
tsClient := tailscale.NewClient("-", nil)
tsClient.HTTPClient = credentials.Client(context.Background())
s := &tsnet.Server{
Hostname: hostname,
Logf: zlog.Named("tailscaled").Debugf,
}
if kubeSecret != "" {
st, err := kubestore.New(logger.Discard, kubeSecret)
if err != nil {
startlog.Fatalf("creating operator authkey: %v", err)
startlog.Fatalf("creating kube store: %v", err)
}
if err := lc.Start(ctx, ipn.Options{
AuthKey: authkey,
}); err != nil {
startlog.Fatalf("starting tailscale: %v", err)
}
if err := lc.StartLoginInteractive(ctx); err != nil {
startlog.Fatalf("starting login: %v", err)
}
startlog.Debugf("requested login by authkey")
loginDone = true
case "NeedsMachineAuth":
if !machineAuthShown {
startlog.Infof("Machine approval required, please visit the admin panel to approve")
machineAuthShown = true
}
default:
startlog.Debugf("waiting for tailscale to start: %v", st.BackendState)
s.Store = st
}
time.Sleep(time.Second)
}
return s, tsClient
if err := s.Start(); err != nil {
startlog.Fatalf("starting tailscale server: %v", err)
}
lc, err := s.LocalClient()
if err != nil {
startlog.Fatalf("getting local client: %v", err)
}
ctx := context.Background()
loginDone := false
machineAuthShown := false
waitOnline:
for {
startlog.Debugf("querying tailscaled status")
st, err := lc.StatusWithoutPeers(ctx)
if err != nil {
startlog.Fatalf("getting status: %v", err)
}
switch st.BackendState {
case "Running":
break waitOnline
case "NeedsLogin":
if loginDone {
break
}
caps := tailscale.KeyCapabilities{
Devices: tailscale.KeyDeviceCapabilities{
Create: tailscale.KeyDeviceCreateCapabilities{
Reusable: false,
Preauthorized: true,
Tags: strings.Split(operatorTags, ","),
},
},
}
authkey, _, err := tsClient.CreateKey(ctx, caps)
if err != nil {
startlog.Fatalf("creating operator authkey: %v", err)
}
if err := lc.Start(ctx, ipn.Options{
AuthKey: authkey,
}); err != nil {
startlog.Fatalf("starting tailscale: %v", err)
}
if err := lc.StartLoginInteractive(ctx); err != nil {
startlog.Fatalf("starting login: %v", err)
}
startlog.Debugf("requested login by authkey")
loginDone = true
case "NeedsMachineAuth":
if !machineAuthShown {
startlog.Infof("Machine approval required, please visit the admin panel to approve")
machineAuthShown = true
}
default:
startlog.Debugf("waiting for tailscale to start: %v", st.BackendState)
}
time.Sleep(time.Second)
}
return tsSetup{
// TODO: server needs closing
server: s,
client: tsClient,
}
})
return ts
}
// runReconcilers starts the controller-runtime manager and registers the
// ServiceReconciler. It blocks forever.
func runReconcilers(zlog *zap.SugaredLogger, s *tsnet.Server, tsNamespace string, restConfig *rest.Config, tsClient *tailscale.Client, image, priorityClassName, tags, tsFirewallMode string) {
func runReconcilers(c managerConfig) {
var (
isDefaultLoadBalancer = defaultBool("OPERATOR_DEFAULT_LOAD_BALANCER", false)
)
startlog := zlog.Named("startReconcilers")
startlog := c.zlog.Named("startReconcilers")
// For secrets and statefulsets, we only get permission to touch the objects
// in the controller's own namespace. This cannot be expressed by
// .Watches(...) below, instead you have to add a per-type field selector to
@ -207,7 +243,7 @@ func runReconcilers(zlog *zap.SugaredLogger, s *tsnet.Server, tsNamespace string
// implicitly filter what parts of the world the builder code gets to see at
// all.
nsFilter := cache.ByObject{
Field: client.InNamespace(tsNamespace).AsSelector(),
Field: client.InNamespace(c.tsNamespace).AsSelector(),
}
mgr, err := manager.New(c.restConfig, manager.Options{
LeaderElectionNamespace: "kube-system",
@ -230,13 +266,12 @@ func runReconcilers(zlog *zap.SugaredLogger, s *tsnet.Server, tsNamespace string
eventRecorder := mgr.GetEventRecorderFor("tailscale-operator")
ssr := &tailscaleSTSReconciler{
Client: mgr.GetClient(),
tsnetServer: s,
tsClient: tsClient,
defaultTags: strings.Split(tags, ","),
operatorNamespace: tsNamespace,
proxyImage: image,
proxyPriorityClassName: priorityClassName,
tsFirewallMode: tsFirewallMode,
ts: c.ts,
defaultTags: strings.Split(c.tags, ","),
operatorNamespace: c.tsNamespace,
proxyImage: c.proxyImage,
proxyPriorityClassName: c.priorityClassName,
tsFirewallMode: c.tsFirewallMode,
}
err = builder.
ControllerManagedBy(mgr).
@ -247,7 +282,7 @@ func runReconcilers(zlog *zap.SugaredLogger, s *tsnet.Server, tsNamespace string
Complete(&ServiceReconciler{
ssr: ssr,
Client: mgr.GetClient(),
logger: zlog.Named("service-reconciler"),
logger: c.zlog.Named("service-reconciler"),
isDefaultLoadBalancer: isDefaultLoadBalancer,
recorder: eventRecorder,
})
@ -265,12 +300,15 @@ func runReconcilers(zlog *zap.SugaredLogger, s *tsnet.Server, tsNamespace string
ssr: ssr,
recorder: eventRecorder,
Client: mgr.GetClient(),
logger: zlog.Named("ingress-reconciler"),
logger: c.zlog.Named("ingress-reconciler"),
})
if err != nil {
startlog.Fatalf("could not create controller: %v", err)
}
if err := maybeConfigureAPIServerProxy(c, mgr); err != nil {
startlog.Fatalf("error configuring api server proxy: %v", err)
}
startlog.Infof("Startup complete, operator running, version: %s", version.Long())
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
startlog.Fatalf("could not start manager: %v", err)

View File

@ -16,13 +16,12 @@ import (
"os"
"strings"
"go.uber.org/zap"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"sigs.k8s.io/controller-runtime/pkg/manager"
"tailscale.com/client/tailscale"
"tailscale.com/client/tailscale/apitype"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
"tailscale.com/types/logger"
"tailscale.com/util/clientmetric"
"tailscale.com/util/set"
@ -80,20 +79,23 @@ func parseAPIProxyMode() apiServerProxyMode {
return apiserverProxyModeDisabled
}
// TODO: cleanup, return errors etc
// maybeLaunchAPIServerProxy launches the auth proxy, which is a small HTTP server
// that authenticates requests using the Tailscale LocalAPI and then proxies
// them to the kube-apiserver.
func maybeLaunchAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, s *tsnet.Server, mode apiServerProxyMode) {
if mode == apiserverProxyModeDisabled {
return
func maybeConfigureAPIServerProxy(c managerConfig, mgr manager.Manager) error {
if c.apiServerProxyMode == apiserverProxyModeDisabled {
return nil
}
startlog := zlog.Named("launchAPIProxy")
if mode == apiserverProxyModeNoAuth {
restConfig = rest.AnonymousClientConfig(restConfig)
startlog := mgr.GetLogger().WithName("lauchAPIProxy")
startlog.Info("configuring api-server proxy")
restConfig := c.restConfig
if c.apiServerProxyMode == apiserverProxyModeNoAuth {
restConfig = rest.AnonymousClientConfig(c.restConfig)
}
cfg, err := restConfig.TransportConfig()
if err != nil {
startlog.Fatalf("could not get rest.TransportConfig(): %v", err)
return fmt.Errorf("error retrieving transport config: %v", err)
}
// Kubernetes uses SPDY for exec and port-forward, however SPDY is
@ -101,15 +103,38 @@ func maybeLaunchAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config,
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig, err = transport.TLSConfigFor(cfg)
if err != nil {
startlog.Fatalf("could not get transport.TLSConfigFor(): %v", err)
return fmt.Errorf("error getting transport config: %v", err)
}
tr.TLSNextProto = make(map[string]func(authority string, c *tls.Conn) http.RoundTripper)
rt, err := transport.HTTPWrappersForConfig(cfg, tr)
if err != nil {
startlog.Fatalf("could not get rest.TransportConfig(): %v", err)
return fmt.Errorf("error getting http wrapper: %v", err)
}
go runAPIServerProxy(s, rt, zlog.Named("apiserver-proxy").Infof, mode)
pr := &proxyRunnable{
ts: c.ts,
rt: rt,
logf: mgr.GetLogger().WithName("proxyRunnable").Info,
mode: c.apiServerProxyMode,
}
if err := mgr.Add(pr); err != nil {
return fmt.Errorf("error adding proxy runnable:%v", err)
}
return nil
}
var _ manager.LeaderElectionRunnable = &proxyRunnable{}
var _ manager.Runnable = &proxyRunnable{}
type proxyRunnable struct {
ts tsSetupFunc
rt http.RoundTripper
mode apiServerProxyMode
logf logger.Logf
}
func (pr *proxyRunnable) NeedLeaderElection() bool {
return true
}
// apiserverProxy is an http.Handler that authenticates requests using the Tailscale
@ -145,25 +170,31 @@ func (h *apiserverProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// are passed through to the Kubernetes API.
//
// It never returns.
func runAPIServerProxy(s *tsnet.Server, rt http.RoundTripper, logf logger.Logf, mode apiServerProxyMode) {
if mode == apiserverProxyModeDisabled {
return
// func runAPIServerProxy(s *tsnet.Server, rt http.RoundTripper, logf logger.Logf, mode apiServerProxyMode) {
func (p *proxyRunnable) Start(ctx context.Context) error {
p.logf("starting proxy runnable")
if p.mode == apiserverProxyModeDisabled {
return nil
}
ln, err := s.Listen("tcp", ":443")
server := p.ts().server
ln, err := server.Listen("tcp", ":443")
if err != nil {
log.Fatalf("could not listen on :443: %v", err)
}
p.logf("listening on", "addr", ln.Addr())
u, err := url.Parse(fmt.Sprintf("https://%s:%s", os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")))
if err != nil {
log.Fatalf("runAPIServerProxy: failed to parse URL %v", err)
}
p.logf("will be forwarding requests to", "url", u)
lc, err := s.LocalClient()
lc, err := server.LocalClient()
if err != nil {
log.Fatalf("could not get local client: %v", err)
}
ap := &apiserverProxy{
logf: logf,
logf: p.logf,
lc: lc,
rp: &httputil.ReverseProxy{
Rewrite: func(r *httputil.ProxyRequest) {
@ -171,7 +202,7 @@ func runAPIServerProxy(s *tsnet.Server, rt http.RoundTripper, logf logger.Logf,
r.Out.URL.Scheme = u.Scheme
r.Out.URL.Host = u.Host
if mode == apiserverProxyModeNoAuth {
if p.mode == apiserverProxyModeNoAuth {
// If we are not providing authentication, then we are just
// proxying to the Kubernetes API, so we don't need to do
// anything else.
@ -199,8 +230,10 @@ func runAPIServerProxy(s *tsnet.Server, rt http.RoundTripper, logf logger.Logf,
if err := addImpersonationHeaders(r.Out); err != nil {
panic("failed to add impersonation headers: " + err.Error())
}
p.logf("will be forwarding with headers", "user", r.Out.Header.Get("Impersonate-User"))
p.logf("will be forwarding with headers", "group", r.Out.Header.Get("Impersonate-Group"))
},
Transport: rt,
Transport: p.rt,
},
}
hs := &http.Server{
@ -213,9 +246,12 @@ func runAPIServerProxy(s *tsnet.Server, rt http.RoundTripper, logf logger.Logf,
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
Handler: ap,
}
p.logf("about to serve TLS")
if err := hs.ServeTLS(ln, "", ""); err != nil {
log.Fatalf("runAPIServerProxy: failed to serve %v", err)
p.logf("error serving: %v", err)
return fmt.Errorf("runAPIServerProxy: failed to serve %v", err)
}
return nil
}
const capabilityName = "https://tailscale.com/cap/kubernetes"

View File

@ -26,7 +26,6 @@ import (
"tailscale.com/client/tailscale"
"tailscale.com/ipn"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
"tailscale.com/types/opt"
"tailscale.com/util/dnsname"
"tailscale.com/util/mak"
@ -75,8 +74,7 @@ type tailscaleSTSConfig struct {
type tailscaleSTSReconciler struct {
client.Client
tsnetServer *tsnet.Server
tsClient tsClient
ts tsSetupFunc
defaultTags []string
operatorNamespace string
proxyImage string
@ -93,7 +91,8 @@ func (sts tailscaleSTSReconciler) validate() error {
// IsHTTPSEnabledOnTailnet reports whether HTTPS is enabled on the tailnet.
func (a *tailscaleSTSReconciler) IsHTTPSEnabledOnTailnet() bool {
return len(a.tsnetServer.CertDomains()) > 0
server := a.ts().server
return len(server.CertDomains()) > 0
}
// Provision ensures that the StatefulSet for the given service is running and
@ -151,8 +150,9 @@ func (a *tailscaleSTSReconciler) Cleanup(ctx context.Context, logger *zap.Sugare
return false, fmt.Errorf("getting device info: %w", err)
}
if id != "" {
client := a.ts().client
logger.Debugf("deleting device %s from control", string(id))
if err := a.tsClient.DeleteDevice(ctx, string(id)); err != nil {
if err := client.DeleteDevice(ctx, string(id)); err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id))
@ -300,7 +300,9 @@ func (a *tailscaleSTSReconciler) newAuthKey(ctx context.Context, tags []string)
},
}
key, _, err := a.tsClient.CreateKey(ctx, caps)
client := a.ts().client
key, _, err := client.CreateKey(ctx, caps)
if err != nil {
return "", err
}