mirror of
https://github.com/tailscale/tailscale.git
synced 2025-08-24 12:19:07 +00:00
cmd/k8s-proxy: fix error on watch connection killed
k8s-proxy was reliably crashing after a long but variable period of time. On debugging, this was due to the watcher getting stopped when the connection was severed (for example if my laptop was asleep for a while), and the switch-case failed to account for the fact the result chan may have been closed. Fix by checking for channel being closed, and restarting the watcher in that case. Also set a timeout on the watch so we're not relying on the watcher being quick to react to severed connections and regularly exercise the logic to re-watch. Finally, pull the loadConfigFromSecret call inside the added/created case to ensure it's never possible to call it with a nil secret. Change-Id: Ibf0cf971cbe41524cf6db3c0644c29803ad69fc4 Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
@@ -26,6 +26,7 @@ import (
|
|||||||
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"tailscale.com/kube/k8s-proxy/conf"
|
"tailscale.com/kube/k8s-proxy/conf"
|
||||||
"tailscale.com/kube/kubetypes"
|
"tailscale.com/kube/kubetypes"
|
||||||
|
"tailscale.com/types/ptr"
|
||||||
"tailscale.com/util/testenv"
|
"tailscale.com/util/testenv"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -180,13 +181,21 @@ func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretName
|
|||||||
Kind: "Secret",
|
Kind: "Secret",
|
||||||
APIVersion: "v1",
|
APIVersion: "v1",
|
||||||
},
|
},
|
||||||
FieldSelector: fmt.Sprintf("metadata.name=%s", secretName),
|
// Re-watch regularly to avoid relying on long-lived connections.
|
||||||
Watch: true,
|
// See https://github.com/kubernetes-client/javascript/issues/596#issuecomment-786419380
|
||||||
|
TimeoutSeconds: ptr.To(int64(600)),
|
||||||
|
FieldSelector: fmt.Sprintf("metadata.name=%s", secretName),
|
||||||
|
Watch: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to watch config Secret %q: %w", secretName, err)
|
return fmt.Errorf("failed to watch config Secret %q: %w", secretName, err)
|
||||||
}
|
}
|
||||||
defer w.Stop()
|
defer func() {
|
||||||
|
// May not be the original watcher by the time we exit.
|
||||||
|
if w != nil {
|
||||||
|
w.Stop()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Get the initial config Secret now we've got the watcher set up.
|
// Get the initial config Secret now we've got the watcher set up.
|
||||||
secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{})
|
secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{})
|
||||||
@@ -204,7 +213,24 @@ func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretName
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case ev := <-w.ResultChan():
|
case ev, ok := <-w.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
w.Stop()
|
||||||
|
w, err = secrets.Watch(ctx, metav1.ListOptions{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
Kind: "Secret",
|
||||||
|
APIVersion: "v1",
|
||||||
|
},
|
||||||
|
TimeoutSeconds: ptr.To(int64(600)),
|
||||||
|
FieldSelector: fmt.Sprintf("metadata.name=%s", secretName),
|
||||||
|
Watch: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to re-watch config Secret %q: %w", secretName, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
switch ev.Type {
|
switch ev.Type {
|
||||||
case watch.Added, watch.Modified:
|
case watch.Added, watch.Modified:
|
||||||
// New config available to load.
|
// New config available to load.
|
||||||
@@ -216,16 +242,16 @@ func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretName
|
|||||||
if secret == nil || secret.Data == nil {
|
if secret == nil || secret.Data == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case watch.Deleted, watch.Bookmark:
|
if err := l.configFromSecret(ctx, secret); err != nil {
|
||||||
// Ignore, no action required.
|
return fmt.Errorf("error reloading config Secret %q: %v", secret.Name, err)
|
||||||
continue
|
}
|
||||||
case watch.Error:
|
case watch.Error:
|
||||||
return fmt.Errorf("error watching config Secret %q: %v", secretName, ev.Object)
|
return fmt.Errorf("error watching config Secret %q: %v", secretName, ev.Object)
|
||||||
|
default:
|
||||||
|
// Ignore, no action required.
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := l.configFromSecret(ctx, secret); err != nil {
|
|
||||||
return fmt.Errorf("error reloading config Secret %q: %v", secret.Name, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -15,7 +15,9 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
ktesting "k8s.io/client-go/testing"
|
||||||
"tailscale.com/kube/k8s-proxy/conf"
|
"tailscale.com/kube/k8s-proxy/conf"
|
||||||
"tailscale.com/kube/kubetypes"
|
"tailscale.com/kube/kubetypes"
|
||||||
"tailscale.com/types/ptr"
|
"tailscale.com/types/ptr"
|
||||||
@@ -125,19 +127,8 @@ func TestWatchConfig(t *testing.T) {
|
|||||||
t.Fatalf("error writing namespace file: %v", err)
|
t.Fatalf("error writing namespace file: %v", err)
|
||||||
}
|
}
|
||||||
writeFile = func(t *testing.T, content string) {
|
writeFile = func(t *testing.T, content string) {
|
||||||
s := &corev1.Secret{
|
s := secretFrom(content)
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
mustCreateOrUpdate(t, cl, s)
|
||||||
Name: "config-secret",
|
|
||||||
},
|
|
||||||
Data: map[string][]byte{
|
|
||||||
kubetypes.KubeAPIServerConfigFile: []byte(content),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if _, err := cl.CoreV1().Secrets("default").Create(t.Context(), s, metav1.CreateOptions{}); err != nil {
|
|
||||||
if _, updateErr := cl.CoreV1().Secrets("default").Update(t.Context(), s, metav1.UpdateOptions{}); updateErr != nil {
|
|
||||||
t.Fatalf("error writing config Secret %q: %v", cfgPath, updateErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
configChan := make(chan *conf.Config)
|
configChan := make(chan *conf.Config)
|
||||||
@@ -189,3 +180,83 @@ func TestWatchConfig(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchConfigSecret_Rewatches(t *testing.T) {
|
||||||
|
root := t.TempDir()
|
||||||
|
cl := fake.NewClientset()
|
||||||
|
var watchCount int
|
||||||
|
var watcher *watch.RaceFreeFakeWatcher
|
||||||
|
expected := []string{
|
||||||
|
`{"version": "v1alpha1", "authKey": "abc123"}`,
|
||||||
|
`{"version": "v1alpha1", "authKey": "def456"}`,
|
||||||
|
`{"version": "v1alpha1", "authKey": "ghi789"}`,
|
||||||
|
}
|
||||||
|
cl.PrependWatchReactor("secrets", func(action ktesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
watcher = watch.NewRaceFreeFake()
|
||||||
|
watcher.Add(secretFrom(expected[watchCount]))
|
||||||
|
if action.GetVerb() == "watch" && action.GetResource().Resource == "secrets" {
|
||||||
|
watchCount++
|
||||||
|
}
|
||||||
|
return true, watcher, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
nsFilePath := filepath.Join(root, namespacePath)
|
||||||
|
if err := os.MkdirAll(filepath.Dir(nsFilePath), 0o755); err != nil {
|
||||||
|
t.Fatalf("error creating namespace directory: %v", err)
|
||||||
|
}
|
||||||
|
if err := os.WriteFile(nsFilePath, []byte("default"), 0o644); err != nil {
|
||||||
|
t.Fatalf("error writing namespace file: %v", err)
|
||||||
|
}
|
||||||
|
configChan := make(chan *conf.Config)
|
||||||
|
l := NewConfigLoader(zap.Must(zap.NewDevelopment()).Sugar(), cl.CoreV1(), configChan)
|
||||||
|
l.root = root
|
||||||
|
|
||||||
|
mustCreateOrUpdate(t, cl, secretFrom(expected[0]))
|
||||||
|
|
||||||
|
errs := make(chan error)
|
||||||
|
go func() {
|
||||||
|
errs <- l.watchConfigSecretChanges(t.Context(), "config-secret")
|
||||||
|
}()
|
||||||
|
|
||||||
|
for i := range 2 {
|
||||||
|
select {
|
||||||
|
case cfg := <-configChan:
|
||||||
|
if exp := expected[i]; cfg.Parsed.AuthKey == nil || !strings.Contains(exp, *cfg.Parsed.AuthKey) {
|
||||||
|
t.Fatalf("expected config to have authKey %q, got: %v", exp, cfg.Parsed.AuthKey)
|
||||||
|
}
|
||||||
|
if i == 0 {
|
||||||
|
watcher.Stop()
|
||||||
|
}
|
||||||
|
case err := <-errs:
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
case <-l.cfgIgnored:
|
||||||
|
t.Fatalf("expected config to be reloaded, but got ignored signal")
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("timed out waiting for expected event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if watchCount != 2 {
|
||||||
|
t.Fatalf("expected 2 watch API calls, got %d", watchCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func secretFrom(content string) *corev1.Secret {
|
||||||
|
return &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "config-secret",
|
||||||
|
},
|
||||||
|
Data: map[string][]byte{
|
||||||
|
kubetypes.KubeAPIServerConfigFile: []byte(content),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustCreateOrUpdate(t *testing.T, cl *fake.Clientset, s *corev1.Secret) {
|
||||||
|
t.Helper()
|
||||||
|
if _, err := cl.CoreV1().Secrets("default").Create(t.Context(), s, metav1.CreateOptions{}); err != nil {
|
||||||
|
if _, updateErr := cl.CoreV1().Secrets("default").Update(t.Context(), s, metav1.UpdateOptions{}); updateErr != nil {
|
||||||
|
t.Fatalf("error writing config Secret %q: %v", s.Name, updateErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user