From 232d2b9c69fce6338c5f963145324d31dcaf010a Mon Sep 17 00:00:00 2001 From: Tom Proctor Date: Wed, 16 Jul 2025 21:38:38 +0100 Subject: [PATCH] cmd/k8s-proxy: fix error on watch connection killed k8s-proxy was reliably crashing after a long but variable period of time. On debugging, this was due to the watcher getting stopped when the connection was severed (for example if my laptop was asleep for a while), and the switch-case failed to account for the fact the result chan may have been closed. Fix by checking for channel being closed, and restarting the watcher in that case. Also set a timeout on the watch so we're not relying on the watcher being quick to react to severed connections and regularly exercise the logic to re-watch. Finally, pull the loadConfigFromSecret call inside the added/created case to ensure it's never possible to call it with a nil secret. Change-Id: Ibf0cf971cbe41524cf6db3c0644c29803ad69fc4 Signed-off-by: Tom Proctor --- cmd/k8s-proxy/internal/config/config.go | 46 ++++++++-- cmd/k8s-proxy/internal/config/config_test.go | 97 +++++++++++++++++--- 2 files changed, 120 insertions(+), 23 deletions(-) diff --git a/cmd/k8s-proxy/internal/config/config.go b/cmd/k8s-proxy/internal/config/config.go index d8bb1b04e..61e7513c6 100644 --- a/cmd/k8s-proxy/internal/config/config.go +++ b/cmd/k8s-proxy/internal/config/config.go @@ -26,6 +26,7 @@ import ( clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "tailscale.com/kube/k8s-proxy/conf" "tailscale.com/kube/kubetypes" + "tailscale.com/types/ptr" "tailscale.com/util/testenv" ) @@ -180,13 +181,21 @@ func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretName Kind: "Secret", APIVersion: "v1", }, - FieldSelector: fmt.Sprintf("metadata.name=%s", secretName), - Watch: true, + // Re-watch regularly to avoid relying on long-lived connections. + // See https://github.com/kubernetes-client/javascript/issues/596#issuecomment-786419380 + TimeoutSeconds: ptr.To(int64(600)), + FieldSelector: fmt.Sprintf("metadata.name=%s", secretName), + Watch: true, }) if err != nil { return fmt.Errorf("failed to watch config Secret %q: %w", secretName, err) } - defer w.Stop() + defer func() { + // May not be the original watcher by the time we exit. + if w != nil { + w.Stop() + } + }() // Get the initial config Secret now we've got the watcher set up. secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{}) @@ -204,7 +213,24 @@ func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretName select { case <-ctx.Done(): return ctx.Err() - case ev := <-w.ResultChan(): + case ev, ok := <-w.ResultChan(): + if !ok { + w.Stop() + w, err = secrets.Watch(ctx, metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + TimeoutSeconds: ptr.To(int64(600)), + FieldSelector: fmt.Sprintf("metadata.name=%s", secretName), + Watch: true, + }) + if err != nil { + return fmt.Errorf("failed to re-watch config Secret %q: %w", secretName, err) + } + continue + } + switch ev.Type { case watch.Added, watch.Modified: // New config available to load. @@ -216,16 +242,16 @@ func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretName if secret == nil || secret.Data == nil { continue } - case watch.Deleted, watch.Bookmark: - // Ignore, no action required. - continue + if err := l.configFromSecret(ctx, secret); err != nil { + return fmt.Errorf("error reloading config Secret %q: %v", secret.Name, err) + } case watch.Error: return fmt.Errorf("error watching config Secret %q: %v", secretName, ev.Object) + default: + // Ignore, no action required. + continue } } - if err := l.configFromSecret(ctx, secret); err != nil { - return fmt.Errorf("error reloading config Secret %q: %v", secret.Name, err) - } } } diff --git a/cmd/k8s-proxy/internal/config/config_test.go b/cmd/k8s-proxy/internal/config/config_test.go index 589afffa6..cf0ee4b28 100644 --- a/cmd/k8s-proxy/internal/config/config_test.go +++ b/cmd/k8s-proxy/internal/config/config_test.go @@ -15,7 +15,9 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" "tailscale.com/kube/k8s-proxy/conf" "tailscale.com/kube/kubetypes" "tailscale.com/types/ptr" @@ -125,19 +127,8 @@ func TestWatchConfig(t *testing.T) { t.Fatalf("error writing namespace file: %v", err) } writeFile = func(t *testing.T, content string) { - s := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config-secret", - }, - Data: map[string][]byte{ - kubetypes.KubeAPIServerConfigFile: []byte(content), - }, - } - if _, err := cl.CoreV1().Secrets("default").Create(t.Context(), s, metav1.CreateOptions{}); err != nil { - if _, updateErr := cl.CoreV1().Secrets("default").Update(t.Context(), s, metav1.UpdateOptions{}); updateErr != nil { - t.Fatalf("error writing config Secret %q: %v", cfgPath, updateErr) - } - } + s := secretFrom(content) + mustCreateOrUpdate(t, cl, s) } } configChan := make(chan *conf.Config) @@ -189,3 +180,83 @@ func TestWatchConfig(t *testing.T) { }) } } + +func TestWatchConfigSecret_Rewatches(t *testing.T) { + root := t.TempDir() + cl := fake.NewClientset() + var watchCount int + var watcher *watch.RaceFreeFakeWatcher + expected := []string{ + `{"version": "v1alpha1", "authKey": "abc123"}`, + `{"version": "v1alpha1", "authKey": "def456"}`, + `{"version": "v1alpha1", "authKey": "ghi789"}`, + } + cl.PrependWatchReactor("secrets", func(action ktesting.Action) (handled bool, ret watch.Interface, err error) { + watcher = watch.NewRaceFreeFake() + watcher.Add(secretFrom(expected[watchCount])) + if action.GetVerb() == "watch" && action.GetResource().Resource == "secrets" { + watchCount++ + } + return true, watcher, nil + }) + + nsFilePath := filepath.Join(root, namespacePath) + if err := os.MkdirAll(filepath.Dir(nsFilePath), 0o755); err != nil { + t.Fatalf("error creating namespace directory: %v", err) + } + if err := os.WriteFile(nsFilePath, []byte("default"), 0o644); err != nil { + t.Fatalf("error writing namespace file: %v", err) + } + configChan := make(chan *conf.Config) + l := NewConfigLoader(zap.Must(zap.NewDevelopment()).Sugar(), cl.CoreV1(), configChan) + l.root = root + + mustCreateOrUpdate(t, cl, secretFrom(expected[0])) + + errs := make(chan error) + go func() { + errs <- l.watchConfigSecretChanges(t.Context(), "config-secret") + }() + + for i := range 2 { + select { + case cfg := <-configChan: + if exp := expected[i]; cfg.Parsed.AuthKey == nil || !strings.Contains(exp, *cfg.Parsed.AuthKey) { + t.Fatalf("expected config to have authKey %q, got: %v", exp, cfg.Parsed.AuthKey) + } + if i == 0 { + watcher.Stop() + } + case err := <-errs: + t.Fatalf("unexpected error: %v", err) + case <-l.cfgIgnored: + t.Fatalf("expected config to be reloaded, but got ignored signal") + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for expected event") + } + } + + if watchCount != 2 { + t.Fatalf("expected 2 watch API calls, got %d", watchCount) + } +} + +func secretFrom(content string) *corev1.Secret { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-secret", + }, + Data: map[string][]byte{ + kubetypes.KubeAPIServerConfigFile: []byte(content), + }, + } +} + +func mustCreateOrUpdate(t *testing.T, cl *fake.Clientset, s *corev1.Secret) { + t.Helper() + if _, err := cl.CoreV1().Secrets("default").Create(t.Context(), s, metav1.CreateOptions{}); err != nil { + if _, updateErr := cl.CoreV1().Secrets("default").Update(t.Context(), s, metav1.UpdateOptions{}); updateErr != nil { + t.Fatalf("error writing config Secret %q: %v", s.Name, updateErr) + } + } +}