From 853fe3b7132959fb99648df3d5d7aec47a6734c1 Mon Sep 17 00:00:00 2001 From: Irbe Krumina Date: Sat, 26 Oct 2024 09:33:47 -0500 Subject: [PATCH] ipn/store/kubestore: cache state in memory (#13918) Cache state in memory on writes, read from memory in reads. kubestore was previously always reading state from a Secret. This change should fix bugs caused by temporary loss of access to kube API server and imporove overall performance Fixes #7671 Updates tailscale/tailscale#12079,tailscale/tailscale#13900 Signed-off-by: Maisem Ali Signed-off-by: Irbe Krumina Co-authored-by: Maisem Ali --- ipn/store/kubestore/store_kube.go | 81 +++++++++++++++++++------------ ipn/store/mem/store_mem.go | 17 +++++++ 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/ipn/store/kubestore/store_kube.go b/ipn/store/kubestore/store_kube.go index 00950bd3b..1e0e01c7b 100644 --- a/ipn/store/kubestore/store_kube.go +++ b/ipn/store/kubestore/store_kube.go @@ -13,19 +13,27 @@ "time" "tailscale.com/ipn" + "tailscale.com/ipn/store/mem" "tailscale.com/kube/kubeapi" "tailscale.com/kube/kubeclient" "tailscale.com/types/logger" ) +// TODO(irbekrm): should we bump this? should we have retries? See tailscale/tailscale#13024 +const timeout = 5 * time.Second + // Store is an ipn.StateStore that uses a Kubernetes Secret for persistence. type Store struct { client kubeclient.Client canPatch bool secretName string + + // memory holds the latest tailscale state. Writes write state to a kube Secret and memory, Reads read from + // memory. + memory mem.Store } -// New returns a new Store that persists to the named secret. +// New returns a new Store that persists to the named Secret. func New(_ logger.Logf, secretName string) (*Store, error) { c, err := kubeclient.New() if err != nil { @@ -39,11 +47,16 @@ func New(_ logger.Logf, secretName string) (*Store, error) { if err != nil { return nil, err } - return &Store{ + s := &Store{ client: c, canPatch: canPatch, secretName: secretName, - }, nil + } + // Load latest state from kube Secret if it already exists. + if err := s.loadState(); err != nil { + return nil, fmt.Errorf("error loading state from kube Secret: %w", err) + } + return s, nil } func (s *Store) SetDialer(d func(ctx context.Context, network, address string) (net.Conn, error)) { @@ -54,37 +67,17 @@ func (s *Store) String() string { return "kube.Store" } // ReadState implements the StateStore interface. func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - secret, err := s.client.GetSecret(ctx, s.secretName) - if err != nil { - if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 { - return nil, ipn.ErrStateNotExist - } - return nil, err - } - b, ok := secret.Data[sanitizeKey(id)] - if !ok { - return nil, ipn.ErrStateNotExist - } - return b, nil -} - -func sanitizeKey(k ipn.StateKey) string { - // The only valid characters in a Kubernetes secret key are alphanumeric, -, - // _, and . - return strings.Map(func(r rune) rune { - if r >= 'a' && r <= 'z' || r >= 'A' && r <= 'Z' || r >= '0' && r <= '9' || r == '-' || r == '_' || r == '.' { - return r - } - return '_' - }, string(k)) + return s.memory.ReadState(ipn.StateKey(sanitizeKey(id))) } // WriteState implements the StateStore interface. -func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { + defer func() { + if err == nil { + s.memory.WriteState(ipn.StateKey(sanitizeKey(id)), bs) + } + }() + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() secret, err := s.client.GetSecret(ctx, s.secretName) @@ -137,3 +130,29 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { } return err } + +func (s *Store) loadState() error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + secret, err := s.client.GetSecret(ctx, s.secretName) + if err != nil { + if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 { + return ipn.ErrStateNotExist + } + return err + } + s.memory.LoadFromMap(secret.Data) + return nil +} + +func sanitizeKey(k ipn.StateKey) string { + // The only valid characters in a Kubernetes secret key are alphanumeric, -, + // _, and . + return strings.Map(func(r rune) rune { + if r >= 'a' && r <= 'z' || r >= 'A' && r <= 'Z' || r >= '0' && r <= '9' || r == '-' || r == '_' || r == '.' { + return r + } + return '_' + }, string(k)) +} diff --git a/ipn/store/mem/store_mem.go b/ipn/store/mem/store_mem.go index f3a308ae5..6f474ce99 100644 --- a/ipn/store/mem/store_mem.go +++ b/ipn/store/mem/store_mem.go @@ -9,8 +9,10 @@ "encoding/json" "sync" + xmaps "golang.org/x/exp/maps" "tailscale.com/ipn" "tailscale.com/types/logger" + "tailscale.com/util/mak" ) // New returns a new Store. @@ -28,6 +30,7 @@ type Store struct { func (s *Store) String() string { return "mem.Store" } // ReadState implements the StateStore interface. +// It returns ipn.ErrStateNotExist if the state does not exist. func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { s.mu.Lock() defer s.mu.Unlock() @@ -39,6 +42,7 @@ func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { } // WriteState implements the StateStore interface. +// It never returns an error. func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { s.mu.Lock() defer s.mu.Unlock() @@ -49,6 +53,19 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { return nil } +// LoadFromMap loads the in-memory cache from the provided map. +// Any existing content is cleared, and the provided map is +// copied into the cache. +func (s *Store) LoadFromMap(m map[string][]byte) { + s.mu.Lock() + defer s.mu.Unlock() + xmaps.Clear(s.cache) + for k, v := range m { + mak.Set(&s.cache, ipn.StateKey(k), v) + } + return +} + // LoadFromJSON attempts to unmarshal json content into the // in-memory cache. func (s *Store) LoadFromJSON(data []byte) error {