diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index fdde9ef09..7f235e85c 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -213,6 +213,7 @@ fs := newFlagSet("watch-ipn") fs.BoolVar(&watchIPNArgs.netmap, "netmap", true, "include netmap in messages") fs.BoolVar(&watchIPNArgs.initial, "initial", false, "include initial status") + fs.BoolVar(&watchIPNArgs.rateLimit, "rate-limit", true, "rate limit messags") fs.BoolVar(&watchIPNArgs.showPrivateKey, "show-private-key", false, "include node private key in printed netmap") fs.IntVar(&watchIPNArgs.count, "count", 0, "exit after printing this many statuses, or 0 to keep going forever") return fs @@ -500,6 +501,7 @@ func runPrefs(ctx context.Context, args []string) error { netmap bool initial bool showPrivateKey bool + rateLimit bool count int } @@ -511,6 +513,9 @@ func runWatchIPN(ctx context.Context, args []string) error { if !watchIPNArgs.showPrivateKey { mask |= ipn.NotifyNoPrivateKeys } + if watchIPNArgs.rateLimit { + mask |= ipn.NotifyRateLimit + } watcher, err := localClient.WatchIPNBus(ctx, mask) if err != nil { return err diff --git a/ipn/backend.go b/ipn/backend.go index 5779727fe..91a35df0d 100644 --- a/ipn/backend.go +++ b/ipn/backend.go @@ -73,6 +73,8 @@ type EngineStatus struct { NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client + + NotifyRateLimit // if set, rate limit spammy netmap updates to every few seconds ) // Notify is a communication from a backend (e.g. tailscaled) to a frontend diff --git a/ipn/ipnlocal/bus.go b/ipn/ipnlocal/bus.go new file mode 100644 index 000000000..65cc2573a --- /dev/null +++ b/ipn/ipnlocal/bus.go @@ -0,0 +1,161 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package ipnlocal + +import ( + "context" + "time" + + "tailscale.com/ipn" + "tailscale.com/tstime" +) + +type rateLimitingBusSender struct { + fn func(*ipn.Notify) (keepGoing bool) + lastFlush time.Time // last call to fn, or zero value if none + interval time.Duration // 0 to flush immediately; non-zero to rate limit sends + clock tstime.DefaultClock // non-nil for testing + didSendTestHook func() // non-nil for testing + + // pending, if non-nil, is the pending notification that we + // haven't sent yet. We own this memory to mutate. + pending *ipn.Notify + + // flushTimer is non-nil if the timer is armed. + flushTimer tstime.TimerController // effectively a *time.Timer + flushTimerC <-chan time.Time // ... said ~Timer's C chan +} + +func (s *rateLimitingBusSender) close() { + if s.flushTimer != nil { + s.flushTimer.Stop() + } +} + +func (s *rateLimitingBusSender) flushChan() <-chan time.Time { + return s.flushTimerC +} + +func (s *rateLimitingBusSender) flush() (keepGoing bool) { + if n := s.pending; n != nil { + s.pending = nil + return s.flushNotify(n) + } + return true +} + +func (s *rateLimitingBusSender) flushNotify(n *ipn.Notify) (keepGoing bool) { + s.lastFlush = s.clock.Now() + return s.fn(n) +} + +// send conditionally sends n to the underlying fn, possibly rate +// limiting it, depending on whether s.interval is set, and whether +// n is a notable notification that the client (typically a GUI) would +// want to act on (render) immediately. +// +// It returns whether the caller should keep looping. +// +// The passed-in memory 'n' is owned by the caller and should +// not be mutated. +func (s *rateLimitingBusSender) send(n *ipn.Notify) (keepGoing bool) { + if s.interval <= 0 { + // No rate limiting case. + return s.fn(n) + } + if isNotableNotify(n) { + // Notable notifications are always sent immediately. + // But first send any boring one that was pending. + // TODO(bradfitz): there might be a boring one pending + // with a NetMap or Engine field that is redundant + // with the new one (n) with NetMap or Engine populated. + // We should clear the pending one's NetMap/Engine in + // that case. Or really, merge the two, but mergeBoringNotifies + // only handles the case of both sides being boring. + // So for now, flush both. + if !s.flush() { + return false + } + return s.flushNotify(n) + } + s.pending = mergeBoringNotifies(s.pending, n) + d := s.clock.Now().Sub(s.lastFlush) + if d > s.interval { + return s.flush() + } + nextFlushIn := s.interval - d + if s.flushTimer == nil { + s.flushTimer, s.flushTimerC = s.clock.NewTimer(nextFlushIn) + } else { + s.flushTimer.Reset(nextFlushIn) + } + return true +} + +func (s *rateLimitingBusSender) Run(ctx context.Context, ch <-chan *ipn.Notify) { + for { + select { + case <-ctx.Done(): + return + case n, ok := <-ch: + if !ok { + return + } + if !s.send(n) { + return + } + if f := s.didSendTestHook; f != nil { + f() + } + case <-s.flushChan(): + if !s.flush() { + return + } + } + } +} + +// mergeBoringNotify merges new notify 'src' into possibly-nil 'dst', +// either mutating 'dst' or allocating a new one if 'dst' is nil, +// returning the merged result. +// +// dst and src must both be "boring" (i.e. not notable per isNotifiableNotify). +func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify { + if dst == nil { + dst = &ipn.Notify{Version: src.Version} + } + if src.NetMap != nil { + dst.NetMap = src.NetMap + } + if src.Engine != nil { + dst.Engine = src.Engine + } + return dst +} + +// isNotableNotify reports whether n is a "notable" notification that +// should be sent on the IPN bus immediately (e.g. to GUIs) without +// rate limiting it for a few seconds. +// +// It effectively reports whether n contains any field set that's +// not NetMap or Engine. +func isNotableNotify(n *ipn.Notify) bool { + if n == nil { + return false + } + return n.State != nil || + n.SessionID != "" || + n.BackendLogID != nil || + n.BrowseToURL != nil || + n.LocalTCPPort != nil || + n.ClientVersion != nil || + n.Prefs != nil || + n.ErrMessage != nil || + n.LoginFinished != nil || + !n.DriveShares.IsNil() || + n.Health != nil || + len(n.IncomingFiles) > 0 || + len(n.OutgoingFiles) > 0 || + n.FilesWaiting != nil +} diff --git a/ipn/ipnlocal/bus_test.go b/ipn/ipnlocal/bus_test.go new file mode 100644 index 000000000..5c75ac54d --- /dev/null +++ b/ipn/ipnlocal/bus_test.go @@ -0,0 +1,220 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package ipnlocal + +import ( + "context" + "reflect" + "slices" + "testing" + "time" + + "tailscale.com/drive" + "tailscale.com/ipn" + "tailscale.com/tstest" + "tailscale.com/tstime" + "tailscale.com/types/logger" + "tailscale.com/types/netmap" + "tailscale.com/types/views" +) + +func TestIsNotableNotify(t *testing.T) { + tests := []struct { + name string + notify *ipn.Notify + want bool + }{ + {"nil", nil, false}, + {"empty", &ipn.Notify{}, false}, + {"version", &ipn.Notify{Version: "foo"}, false}, + {"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false}, + {"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false}, + } + + // Then for all other fields, assume they're notable. + // We use reflect to catch fields that might be added in the future without + // remembering to update the [isNotableNotify] function. + rt := reflect.TypeFor[ipn.Notify]() + for i := range rt.NumField() { + n := &ipn.Notify{} + sf := rt.Field(i) + switch sf.Name { + case "_", "NetMap", "Engine", "Version": + // Already covered above or not applicable. + continue + case "DriveShares": + n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1)) + default: + rf := reflect.ValueOf(n).Elem().Field(i) + switch rf.Kind() { + case reflect.Pointer: + rf.Set(reflect.New(rf.Type().Elem())) + case reflect.String: + rf.SetString("foo") + case reflect.Slice: + rf.Set(reflect.MakeSlice(rf.Type(), 1, 1)) + default: + t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name) + } + } + + tests = append(tests, struct { + name string + notify *ipn.Notify + want bool + }{ + name: "field-" + rt.Field(i).Name, + notify: n, + want: true, + }) + } + + for _, tt := range tests { + if got := isNotableNotify(tt.notify); got != tt.want { + t.Errorf("%v: got %v; want %v", tt.name, got, tt.want) + } + } +} + +type rateLimitingBusSenderTester struct { + tb testing.TB + got []*ipn.Notify + clock *tstest.Clock + s *rateLimitingBusSender +} + +func (st *rateLimitingBusSenderTester) init() { + if st.s != nil { + return + } + st.clock = tstest.NewClock(tstest.ClockOpts{ + Start: time.Unix(1731777537, 0), // time I wrote this test :) + }) + st.s = &rateLimitingBusSender{ + clock: tstime.DefaultClock{Clock: st.clock}, + fn: func(n *ipn.Notify) bool { + st.got = append(st.got, n) + return true + }, + } +} + +func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) { + st.tb.Helper() + st.init() + if !st.s.send(n) { + st.tb.Fatal("unexpected send failed") + } +} + +func (st *rateLimitingBusSenderTester) advance(d time.Duration) { + st.tb.Helper() + st.clock.Advance(d) + select { + case <-st.s.flushChan(): + if !st.s.flush() { + st.tb.Fatal("unexpected flush failed") + } + default: + } +} + +func TestRateLimitingBusSender(t *testing.T) { + nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)} + nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)} + eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)} + eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)} + + t.Run("unbuffered", func(t *testing.T) { + st := &rateLimitingBusSenderTester{tb: t} + st.send(nm1) + st.send(nm2) + st.send(eng1) + st.send(eng2) + if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) { + t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got)) + } + }) + + t.Run("buffered", func(t *testing.T) { + st := &rateLimitingBusSenderTester{tb: t} + st.init() + st.s.interval = 1 * time.Second + st.send(&ipn.Notify{Version: "initial"}) + if len(st.got) != 1 { + t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got)) + } + st.send(nm1) + st.send(nm2) + st.send(eng1) + st.send(eng2) + if len(st.got) != 1 { + if len(st.got) != 1 { + t.Fatalf("got %d items; expected still just that first 1", len(st.got)) + } + } + + // But moving the clock should flush the rest, collasced into one new one. + st.advance(5 * time.Second) + if len(st.got) != 2 { + t.Fatalf("got %d items; want 2", len(st.got)) + } + gotn := st.got[1] + if gotn.NetMap != nm2.NetMap { + t.Errorf("got wrong NetMap; got %p", gotn.NetMap) + } + if gotn.Engine != eng2.Engine { + t.Errorf("got wrong Engine; got %p", gotn.Engine) + } + if t.Failed() { + t.Logf("failed Notify was: %v", logger.AsJSON(gotn)) + } + }) + + // Test the Run method + t.Run("run", func(t *testing.T) { + st := &rateLimitingBusSenderTester{tb: t} + st.init() + st.s.interval = 1 * time.Second + st.s.lastFlush = st.clock.Now() // pretend we just flushed + + flushc := make(chan *ipn.Notify, 1) + st.s.fn = func(n *ipn.Notify) bool { + flushc <- n + return true + } + didSend := make(chan bool, 2) + st.s.didSendTestHook = func() { didSend <- true } + waitSend := func() { + select { + case <-didSend: + case <-time.After(5 * time.Second): + t.Error("timeout waiting for call to send") + } + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + incoming := make(chan *ipn.Notify, 2) + go func() { + incoming <- nm1 + waitSend() + incoming <- nm2 + waitSend() + st.advance(5 * time.Second) + select { + case n := <-flushc: + if n.NetMap != nm2.NetMap { + t.Errorf("got wrong NetMap; got %p", n.NetMap) + } + case <-time.After(10 * time.Second): + t.Error("timeout") + } + cancel() + }() + + st.s.Run(ctx, incoming) + }) +} diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 33025ed40..cbbea32aa 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -2780,20 +2780,17 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A go b.pollRequestEngineStatus(ctx) } - // TODO(marwan-at-work): check err // TODO(marwan-at-work): streaming background logs? defer b.DeleteForegroundSession(sessionID) - for { - select { - case <-ctx.Done(): - return - case n := <-ch: - if !fn(n) { - return - } - } + sender := &rateLimitingBusSender{fn: fn} + defer sender.close() + + if mask&ipn.NotifyRateLimit != 0 { + sender.interval = 3 * time.Second } + + sender.Run(ctx, ch) } // pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx