mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-31 16:23:44 +00:00
{,cmd/}k8s-operator: simplify logic, tighten error handling
Change-Id: I0b10d1921ebf29c59c40f1c7c14828dd80167b45 Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
parent
68b56d4b48
commit
8250af287c
@ -214,14 +214,16 @@ func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGrou
|
|||||||
|
|
||||||
return fmt.Errorf("error validating that ServiceAccount %q exists: %w", authAPIServerProxySAName, err)
|
return fmt.Errorf("error validating that ServiceAccount %q exists: %w", authAPIServerProxySAName, err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// Validate that the ServiceAccount we create won't overwrite the static one.
|
return nil
|
||||||
// TODO(tomhjp): This doesn't cover other controllers that could create a
|
}
|
||||||
// ServiceAccount. Perhaps should have some guards to ensure that an update
|
|
||||||
// would never change the ownership of a resource we expect to already be owned.
|
// Validate that the ServiceAccount we create won't overwrite the static one.
|
||||||
if pgServiceAccountName(pg) == authAPIServerProxySAName {
|
// TODO(tomhjp): This doesn't cover other controllers that could create a
|
||||||
return fmt.Errorf("the name of the ProxyGroup %q conflicts with the static ServiceAccount used for the API server proxy in auth mode", pg.Name)
|
// ServiceAccount. Perhaps should have some guards to ensure that an update
|
||||||
}
|
// would never change the ownership of a resource we expect to already be owned.
|
||||||
|
if pgServiceAccountName(pg) == authAPIServerProxySAName {
|
||||||
|
return fmt.Errorf("the name of the ProxyGroup %q conflicts with the static ServiceAccount used for the API server proxy in auth mode", pg.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -776,7 +778,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
|
|||||||
cfg := conf.VersionedConfig{
|
cfg := conf.VersionedConfig{
|
||||||
Version: "v1alpha1",
|
Version: "v1alpha1",
|
||||||
ConfigV1Alpha1: &conf.ConfigV1Alpha1{
|
ConfigV1Alpha1: &conf.ConfigV1Alpha1{
|
||||||
Hostname: ptr.To(hostname),
|
Hostname: &hostname,
|
||||||
State: ptr.To(fmt.Sprintf("kube:%s", pgPodName(pg.Name, i))),
|
State: ptr.To(fmt.Sprintf("kube:%s", pgPodName(pg.Name, i))),
|
||||||
App: ptr.To(kubetypes.AppProxyGroupKubeAPIServer),
|
App: ptr.To(kubetypes.AppProxyGroupKubeAPIServer),
|
||||||
AuthKey: authKey,
|
AuthKey: authKey,
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
"tailscale.com/hostinfo"
|
"tailscale.com/hostinfo"
|
||||||
@ -27,7 +28,6 @@ import (
|
|||||||
apiproxy "tailscale.com/k8s-operator/api-proxy"
|
apiproxy "tailscale.com/k8s-operator/api-proxy"
|
||||||
"tailscale.com/kube/k8s-proxy/conf"
|
"tailscale.com/kube/k8s-proxy/conf"
|
||||||
"tailscale.com/kube/state"
|
"tailscale.com/kube/state"
|
||||||
"tailscale.com/syncs"
|
|
||||||
"tailscale.com/tsnet"
|
"tailscale.com/tsnet"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,10 +48,6 @@ func run(logger *zap.SugaredLogger) error {
|
|||||||
return errors.New("TS_K8S_PROXY_CONFIG unset")
|
return errors.New("TS_K8S_PROXY_CONFIG unset")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ctx to live for the lifetime of the process.
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// TODO(tomhjp): Support reloading config.
|
// TODO(tomhjp): Support reloading config.
|
||||||
// TODO(tomhjp): Support reading config from a Secret.
|
// TODO(tomhjp): Support reading config from a Secret.
|
||||||
cfg, err := conf.Load(configFile)
|
cfg, err := conf.Load(configFile)
|
||||||
@ -99,6 +95,10 @@ func run(logger *zap.SugaredLogger) error {
|
|||||||
ts.Hostname = *cfg.Parsed.Hostname
|
ts.Hostname = *cfg.Parsed.Hostname
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ctx to live for the lifetime of the process.
|
||||||
|
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
// Make sure we crash loop if Up doesn't complete in reasonable time.
|
// Make sure we crash loop if Up doesn't complete in reasonable time.
|
||||||
upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
|
upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
|
||||||
defer upCancel()
|
defer upCancel()
|
||||||
@ -107,29 +107,31 @@ func run(logger *zap.SugaredLogger) error {
|
|||||||
}
|
}
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
lc, err := ts.LocalClient()
|
group, groupCtx := errgroup.WithContext(ctx)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error getting local client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap)
|
// Setup for updating state keys.
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error watching IPN bus: %w", err)
|
|
||||||
}
|
|
||||||
defer w.Close()
|
|
||||||
|
|
||||||
errs := make(chan error)
|
|
||||||
wg := syncs.WaitGroup{}
|
|
||||||
if podUID != "" {
|
if podUID != "" {
|
||||||
wg.Go(func() {
|
lc, err := ts.LocalClient()
|
||||||
err := state.KeepKeysUpdated(st, w.Next)
|
if err != nil {
|
||||||
if err != nil && err != ctx.Err() {
|
return fmt.Errorf("error getting local client: %w", err)
|
||||||
errs <- fmt.Errorf("error keeping state keys updated: %w", err)
|
}
|
||||||
|
w, err := lc.WatchIPNBus(groupCtx, ipn.NotifyInitialNetMap)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error watching IPN bus: %w", err)
|
||||||
|
}
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
group.Go(func() error {
|
||||||
|
if err := state.KeepKeysUpdated(st, w.Next); err != nil && err != groupCtx.Err() {
|
||||||
|
return fmt.Errorf("error keeping state keys updated: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
restConfig, err := getRestConfig()
|
// Setup for the API server proxy.
|
||||||
|
restConfig, err := getRestConfig(logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error getting rest config: %w", err)
|
return fmt.Errorf("error getting rest config: %w", err)
|
||||||
}
|
}
|
||||||
@ -147,25 +149,15 @@ func run(logger *zap.SugaredLogger) error {
|
|||||||
|
|
||||||
// TODO(tomhjp): Work out whether we should use TS_CERT_SHARE_MODE or not,
|
// TODO(tomhjp): Work out whether we should use TS_CERT_SHARE_MODE or not,
|
||||||
// and possibly issue certs upfront here before serving.
|
// and possibly issue certs upfront here before serving.
|
||||||
wg.Go(func() {
|
group.Go(func() error {
|
||||||
if err := ap.Run(ctx); err != nil {
|
if err := ap.Run(groupCtx); err != nil {
|
||||||
errs <- fmt.Errorf("error running api server proxy: %w", err)
|
return fmt.Errorf("error running API server proxy: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
sig := make(chan os.Signal, 1)
|
return group.Wait()
|
||||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
select {
|
|
||||||
case err = <-errs:
|
|
||||||
logger.Errorf("Shutting down due to error: %v", err)
|
|
||||||
case s := <-sig:
|
|
||||||
logger.Infof("Received %s, shutting down", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
|
func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
|
||||||
@ -183,19 +175,23 @@ func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, err
|
|||||||
return st, nil
|
return st, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRestConfig() (*rest.Config, error) {
|
func getRestConfig(logger *zap.SugaredLogger) (*rest.Config, error) {
|
||||||
restConfig, err := rest.InClusterConfig()
|
restConfig, err := rest.InClusterConfig()
|
||||||
if err == nil {
|
switch err {
|
||||||
|
case nil:
|
||||||
return restConfig, nil
|
return restConfig, nil
|
||||||
|
case rest.ErrNotInCluster:
|
||||||
|
logger.Info("Not running in-cluster, falling back to kubeconfig")
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("error getting in-cluster config: %w", err)
|
||||||
}
|
}
|
||||||
inClusterErr := fmt.Errorf("could not use in-cluster config: %w", err)
|
|
||||||
|
|
||||||
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
||||||
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
|
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
|
||||||
restConfig, err = clientConfig.ClientConfig()
|
restConfig, err = clientConfig.ClientConfig()
|
||||||
if err == nil {
|
if err != nil {
|
||||||
return restConfig, nil
|
return nil, fmt.Errorf("error loading kubeconfig: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.Join(inClusterErr, fmt.Errorf("could not use kubeconfig: %w", err))
|
return restConfig, nil
|
||||||
}
|
}
|
||||||
|
@ -70,6 +70,9 @@ func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsn
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse URL %w", err)
|
return nil, fmt.Errorf("failed to parse URL %w", err)
|
||||||
}
|
}
|
||||||
|
if u.Scheme == "" || u.Host == "" {
|
||||||
|
return nil, fmt.Errorf("the API server proxy requires host and scheme but got: %q", restConfig.Host)
|
||||||
|
}
|
||||||
|
|
||||||
lc, err := ts.LocalClient()
|
lc, err := ts.LocalClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user