From 8250af287cdc318a8473940ee13617ba6cc0571f Mon Sep 17 00:00:00 2001 From: Tom Proctor Date: Mon, 7 Jul 2025 13:01:22 +0100 Subject: [PATCH] {,cmd/}k8s-operator: simplify logic, tighten error handling Change-Id: I0b10d1921ebf29c59c40f1c7c14828dd80167b45 Signed-off-by: Tom Proctor --- cmd/k8s-operator/proxygroup.go | 20 ++++---- cmd/k8s-proxy/k8s-proxy.go | 84 ++++++++++++++++----------------- k8s-operator/api-proxy/proxy.go | 3 ++ 3 files changed, 54 insertions(+), 53 deletions(-) diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go index 12a4334ca..28535a67d 100644 --- a/cmd/k8s-operator/proxygroup.go +++ b/cmd/k8s-operator/proxygroup.go @@ -214,14 +214,16 @@ func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGrou return fmt.Errorf("error validating that ServiceAccount %q exists: %w", authAPIServerProxySAName, err) } - } else { - // Validate that the ServiceAccount we create won't overwrite the static one. - // TODO(tomhjp): This doesn't cover other controllers that could create a - // ServiceAccount. Perhaps should have some guards to ensure that an update - // would never change the ownership of a resource we expect to already be owned. - if pgServiceAccountName(pg) == authAPIServerProxySAName { - return fmt.Errorf("the name of the ProxyGroup %q conflicts with the static ServiceAccount used for the API server proxy in auth mode", pg.Name) - } + + return nil + } + + // Validate that the ServiceAccount we create won't overwrite the static one. + // TODO(tomhjp): This doesn't cover other controllers that could create a + // ServiceAccount. Perhaps should have some guards to ensure that an update + // would never change the ownership of a resource we expect to already be owned. + if pgServiceAccountName(pg) == authAPIServerProxySAName { + return fmt.Errorf("the name of the ProxyGroup %q conflicts with the static ServiceAccount used for the API server proxy in auth mode", pg.Name) } return nil @@ -776,7 +778,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p cfg := conf.VersionedConfig{ Version: "v1alpha1", ConfigV1Alpha1: &conf.ConfigV1Alpha1{ - Hostname: ptr.To(hostname), + Hostname: &hostname, State: ptr.To(fmt.Sprintf("kube:%s", pgPodName(pg.Name, i))), App: ptr.To(kubetypes.AppProxyGroupKubeAPIServer), AuthKey: authKey, diff --git a/cmd/k8s-proxy/k8s-proxy.go b/cmd/k8s-proxy/k8s-proxy.go index 5958eb00a..6e7eadb73 100644 --- a/cmd/k8s-proxy/k8s-proxy.go +++ b/cmd/k8s-proxy/k8s-proxy.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/sync/errgroup" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "tailscale.com/hostinfo" @@ -27,7 +28,6 @@ import ( apiproxy "tailscale.com/k8s-operator/api-proxy" "tailscale.com/kube/k8s-proxy/conf" "tailscale.com/kube/state" - "tailscale.com/syncs" "tailscale.com/tsnet" ) @@ -48,10 +48,6 @@ func run(logger *zap.SugaredLogger) error { return errors.New("TS_K8S_PROXY_CONFIG unset") } - // ctx to live for the lifetime of the process. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // TODO(tomhjp): Support reloading config. // TODO(tomhjp): Support reading config from a Secret. cfg, err := conf.Load(configFile) @@ -99,6 +95,10 @@ func run(logger *zap.SugaredLogger) error { ts.Hostname = *cfg.Parsed.Hostname } + // ctx to live for the lifetime of the process. + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + // Make sure we crash loop if Up doesn't complete in reasonable time. upCtx, upCancel := context.WithTimeout(ctx, time.Minute) defer upCancel() @@ -107,29 +107,31 @@ func run(logger *zap.SugaredLogger) error { } defer ts.Close() - lc, err := ts.LocalClient() - if err != nil { - return fmt.Errorf("error getting local client: %w", err) - } + group, groupCtx := errgroup.WithContext(ctx) - w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap) - if err != nil { - return fmt.Errorf("error watching IPN bus: %w", err) - } - defer w.Close() - - errs := make(chan error) - wg := syncs.WaitGroup{} + // Setup for updating state keys. if podUID != "" { - wg.Go(func() { - err := state.KeepKeysUpdated(st, w.Next) - if err != nil && err != ctx.Err() { - errs <- fmt.Errorf("error keeping state keys updated: %w", err) + lc, err := ts.LocalClient() + if err != nil { + return fmt.Errorf("error getting local client: %w", err) + } + w, err := lc.WatchIPNBus(groupCtx, ipn.NotifyInitialNetMap) + if err != nil { + return fmt.Errorf("error watching IPN bus: %w", err) + } + defer w.Close() + + group.Go(func() error { + if err := state.KeepKeysUpdated(st, w.Next); err != nil && err != groupCtx.Err() { + return fmt.Errorf("error keeping state keys updated: %w", err) } + + return nil }) } - restConfig, err := getRestConfig() + // Setup for the API server proxy. + restConfig, err := getRestConfig(logger) if err != nil { return fmt.Errorf("error getting rest config: %w", err) } @@ -147,25 +149,15 @@ func run(logger *zap.SugaredLogger) error { // TODO(tomhjp): Work out whether we should use TS_CERT_SHARE_MODE or not, // and possibly issue certs upfront here before serving. - wg.Go(func() { - if err := ap.Run(ctx); err != nil { - errs <- fmt.Errorf("error running api server proxy: %w", err) + group.Go(func() error { + if err := ap.Run(groupCtx); err != nil { + return fmt.Errorf("error running API server proxy: %w", err) } + + return nil }) - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - select { - case err = <-errs: - logger.Errorf("Shutting down due to error: %v", err) - case s := <-sig: - logger.Infof("Received %s, shutting down", s) - } - - cancel() - wg.Wait() - - return err + return group.Wait() } func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) { @@ -183,19 +175,23 @@ func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, err return st, nil } -func getRestConfig() (*rest.Config, error) { +func getRestConfig(logger *zap.SugaredLogger) (*rest.Config, error) { restConfig, err := rest.InClusterConfig() - if err == nil { + switch err { + case nil: return restConfig, nil + case rest.ErrNotInCluster: + logger.Info("Not running in-cluster, falling back to kubeconfig") + default: + return nil, fmt.Errorf("error getting in-cluster config: %w", err) } - inClusterErr := fmt.Errorf("could not use in-cluster config: %w", err) loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil) restConfig, err = clientConfig.ClientConfig() - if err == nil { - return restConfig, nil + if err != nil { + return nil, fmt.Errorf("error loading kubeconfig: %w", err) } - return nil, errors.Join(inClusterErr, fmt.Errorf("could not use kubeconfig: %w", err)) + return restConfig, nil } diff --git a/k8s-operator/api-proxy/proxy.go b/k8s-operator/api-proxy/proxy.go index ba30f8096..c3c13e784 100644 --- a/k8s-operator/api-proxy/proxy.go +++ b/k8s-operator/api-proxy/proxy.go @@ -70,6 +70,9 @@ func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsn if err != nil { return nil, fmt.Errorf("failed to parse URL %w", err) } + if u.Scheme == "" || u.Host == "" { + return nil, fmt.Errorf("the API server proxy requires host and scheme but got: %q", restConfig.Host) + } lc, err := ts.LocalClient() if err != nil {