mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-29 04:55:31 +00:00
ipn/ipnlocal: add IPN Bus NotifyRateLimit watch bit NotifyRateLimit
Limit spamming GUIs with boring updates to once in 3 seconds, unless the notification is relatively interesting and the GUI should update immediately. This is basically @barnstar's #14119 but with the logic moved to be per-watch-session (since the bit is per session), rather than globally. And this distinguishes notable Notify messages (such as state changes) and makes them send immediately. Updates tailscale/corp#24553 Change-Id: I79cac52cce85280ce351e65e76ea11e107b00b49 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
c2a7f17f2b
commit
93db503565
@ -213,6 +213,7 @@
|
|||||||
fs := newFlagSet("watch-ipn")
|
fs := newFlagSet("watch-ipn")
|
||||||
fs.BoolVar(&watchIPNArgs.netmap, "netmap", true, "include netmap in messages")
|
fs.BoolVar(&watchIPNArgs.netmap, "netmap", true, "include netmap in messages")
|
||||||
fs.BoolVar(&watchIPNArgs.initial, "initial", false, "include initial status")
|
fs.BoolVar(&watchIPNArgs.initial, "initial", false, "include initial status")
|
||||||
|
fs.BoolVar(&watchIPNArgs.rateLimit, "rate-limit", true, "rate limit messags")
|
||||||
fs.BoolVar(&watchIPNArgs.showPrivateKey, "show-private-key", false, "include node private key in printed netmap")
|
fs.BoolVar(&watchIPNArgs.showPrivateKey, "show-private-key", false, "include node private key in printed netmap")
|
||||||
fs.IntVar(&watchIPNArgs.count, "count", 0, "exit after printing this many statuses, or 0 to keep going forever")
|
fs.IntVar(&watchIPNArgs.count, "count", 0, "exit after printing this many statuses, or 0 to keep going forever")
|
||||||
return fs
|
return fs
|
||||||
@ -500,6 +501,7 @@ func runPrefs(ctx context.Context, args []string) error {
|
|||||||
netmap bool
|
netmap bool
|
||||||
initial bool
|
initial bool
|
||||||
showPrivateKey bool
|
showPrivateKey bool
|
||||||
|
rateLimit bool
|
||||||
count int
|
count int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -511,6 +513,9 @@ func runWatchIPN(ctx context.Context, args []string) error {
|
|||||||
if !watchIPNArgs.showPrivateKey {
|
if !watchIPNArgs.showPrivateKey {
|
||||||
mask |= ipn.NotifyNoPrivateKeys
|
mask |= ipn.NotifyNoPrivateKeys
|
||||||
}
|
}
|
||||||
|
if watchIPNArgs.rateLimit {
|
||||||
|
mask |= ipn.NotifyRateLimit
|
||||||
|
}
|
||||||
watcher, err := localClient.WatchIPNBus(ctx, mask)
|
watcher, err := localClient.WatchIPNBus(ctx, mask)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -73,6 +73,8 @@ type EngineStatus struct {
|
|||||||
NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles
|
NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles
|
||||||
|
|
||||||
NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client
|
NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client
|
||||||
|
|
||||||
|
NotifyRateLimit // if set, rate limit spammy netmap updates to every few seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
// Notify is a communication from a backend (e.g. tailscaled) to a frontend
|
// Notify is a communication from a backend (e.g. tailscaled) to a frontend
|
||||||
|
161
ipn/ipnlocal/bus.go
Normal file
161
ipn/ipnlocal/bus.go
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
// Copyright (c) Tailscale Inc & AUTHORS
|
||||||
|
// SPDX-License-Identifier: BSD-3-Clause
|
||||||
|
|
||||||
|
package ipnlocal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"tailscale.com/ipn"
|
||||||
|
"tailscale.com/tstime"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rateLimitingBusSender struct {
|
||||||
|
fn func(*ipn.Notify) (keepGoing bool)
|
||||||
|
lastFlush time.Time // last call to fn, or zero value if none
|
||||||
|
interval time.Duration // 0 to flush immediately; non-zero to rate limit sends
|
||||||
|
clock tstime.DefaultClock // non-nil for testing
|
||||||
|
didSendTestHook func() // non-nil for testing
|
||||||
|
|
||||||
|
// pending, if non-nil, is the pending notification that we
|
||||||
|
// haven't sent yet. We own this memory to mutate.
|
||||||
|
pending *ipn.Notify
|
||||||
|
|
||||||
|
// flushTimer is non-nil if the timer is armed.
|
||||||
|
flushTimer tstime.TimerController // effectively a *time.Timer
|
||||||
|
flushTimerC <-chan time.Time // ... said ~Timer's C chan
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) close() {
|
||||||
|
if s.flushTimer != nil {
|
||||||
|
s.flushTimer.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) flushChan() <-chan time.Time {
|
||||||
|
return s.flushTimerC
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) flush() (keepGoing bool) {
|
||||||
|
if n := s.pending; n != nil {
|
||||||
|
s.pending = nil
|
||||||
|
return s.flushNotify(n)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) flushNotify(n *ipn.Notify) (keepGoing bool) {
|
||||||
|
s.lastFlush = s.clock.Now()
|
||||||
|
return s.fn(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// send conditionally sends n to the underlying fn, possibly rate
|
||||||
|
// limiting it, depending on whether s.interval is set, and whether
|
||||||
|
// n is a notable notification that the client (typically a GUI) would
|
||||||
|
// want to act on (render) immediately.
|
||||||
|
//
|
||||||
|
// It returns whether the caller should keep looping.
|
||||||
|
//
|
||||||
|
// The passed-in memory 'n' is owned by the caller and should
|
||||||
|
// not be mutated.
|
||||||
|
func (s *rateLimitingBusSender) send(n *ipn.Notify) (keepGoing bool) {
|
||||||
|
if s.interval <= 0 {
|
||||||
|
// No rate limiting case.
|
||||||
|
return s.fn(n)
|
||||||
|
}
|
||||||
|
if isNotableNotify(n) {
|
||||||
|
// Notable notifications are always sent immediately.
|
||||||
|
// But first send any boring one that was pending.
|
||||||
|
// TODO(bradfitz): there might be a boring one pending
|
||||||
|
// with a NetMap or Engine field that is redundant
|
||||||
|
// with the new one (n) with NetMap or Engine populated.
|
||||||
|
// We should clear the pending one's NetMap/Engine in
|
||||||
|
// that case. Or really, merge the two, but mergeBoringNotifies
|
||||||
|
// only handles the case of both sides being boring.
|
||||||
|
// So for now, flush both.
|
||||||
|
if !s.flush() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return s.flushNotify(n)
|
||||||
|
}
|
||||||
|
s.pending = mergeBoringNotifies(s.pending, n)
|
||||||
|
d := s.clock.Now().Sub(s.lastFlush)
|
||||||
|
if d > s.interval {
|
||||||
|
return s.flush()
|
||||||
|
}
|
||||||
|
nextFlushIn := s.interval - d
|
||||||
|
if s.flushTimer == nil {
|
||||||
|
s.flushTimer, s.flushTimerC = s.clock.NewTimer(nextFlushIn)
|
||||||
|
} else {
|
||||||
|
s.flushTimer.Reset(nextFlushIn)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) Run(ctx context.Context, ch <-chan *ipn.Notify) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case n, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !s.send(n) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if f := s.didSendTestHook; f != nil {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
case <-s.flushChan():
|
||||||
|
if !s.flush() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeBoringNotify merges new notify 'src' into possibly-nil 'dst',
|
||||||
|
// either mutating 'dst' or allocating a new one if 'dst' is nil,
|
||||||
|
// returning the merged result.
|
||||||
|
//
|
||||||
|
// dst and src must both be "boring" (i.e. not notable per isNotifiableNotify).
|
||||||
|
func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify {
|
||||||
|
if dst == nil {
|
||||||
|
dst = &ipn.Notify{Version: src.Version}
|
||||||
|
}
|
||||||
|
if src.NetMap != nil {
|
||||||
|
dst.NetMap = src.NetMap
|
||||||
|
}
|
||||||
|
if src.Engine != nil {
|
||||||
|
dst.Engine = src.Engine
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
// isNotableNotify reports whether n is a "notable" notification that
|
||||||
|
// should be sent on the IPN bus immediately (e.g. to GUIs) without
|
||||||
|
// rate limiting it for a few seconds.
|
||||||
|
//
|
||||||
|
// It effectively reports whether n contains any field set that's
|
||||||
|
// not NetMap or Engine.
|
||||||
|
func isNotableNotify(n *ipn.Notify) bool {
|
||||||
|
if n == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return n.State != nil ||
|
||||||
|
n.SessionID != "" ||
|
||||||
|
n.BackendLogID != nil ||
|
||||||
|
n.BrowseToURL != nil ||
|
||||||
|
n.LocalTCPPort != nil ||
|
||||||
|
n.ClientVersion != nil ||
|
||||||
|
n.Prefs != nil ||
|
||||||
|
n.ErrMessage != nil ||
|
||||||
|
n.LoginFinished != nil ||
|
||||||
|
!n.DriveShares.IsNil() ||
|
||||||
|
n.Health != nil ||
|
||||||
|
len(n.IncomingFiles) > 0 ||
|
||||||
|
len(n.OutgoingFiles) > 0 ||
|
||||||
|
n.FilesWaiting != nil
|
||||||
|
}
|
220
ipn/ipnlocal/bus_test.go
Normal file
220
ipn/ipnlocal/bus_test.go
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
// Copyright (c) Tailscale Inc & AUTHORS
|
||||||
|
// SPDX-License-Identifier: BSD-3-Clause
|
||||||
|
|
||||||
|
package ipnlocal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"slices"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"tailscale.com/drive"
|
||||||
|
"tailscale.com/ipn"
|
||||||
|
"tailscale.com/tstest"
|
||||||
|
"tailscale.com/tstime"
|
||||||
|
"tailscale.com/types/logger"
|
||||||
|
"tailscale.com/types/netmap"
|
||||||
|
"tailscale.com/types/views"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestIsNotableNotify(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
notify *ipn.Notify
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{"nil", nil, false},
|
||||||
|
{"empty", &ipn.Notify{}, false},
|
||||||
|
{"version", &ipn.Notify{Version: "foo"}, false},
|
||||||
|
{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false},
|
||||||
|
{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then for all other fields, assume they're notable.
|
||||||
|
// We use reflect to catch fields that might be added in the future without
|
||||||
|
// remembering to update the [isNotableNotify] function.
|
||||||
|
rt := reflect.TypeFor[ipn.Notify]()
|
||||||
|
for i := range rt.NumField() {
|
||||||
|
n := &ipn.Notify{}
|
||||||
|
sf := rt.Field(i)
|
||||||
|
switch sf.Name {
|
||||||
|
case "_", "NetMap", "Engine", "Version":
|
||||||
|
// Already covered above or not applicable.
|
||||||
|
continue
|
||||||
|
case "DriveShares":
|
||||||
|
n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1))
|
||||||
|
default:
|
||||||
|
rf := reflect.ValueOf(n).Elem().Field(i)
|
||||||
|
switch rf.Kind() {
|
||||||
|
case reflect.Pointer:
|
||||||
|
rf.Set(reflect.New(rf.Type().Elem()))
|
||||||
|
case reflect.String:
|
||||||
|
rf.SetString("foo")
|
||||||
|
case reflect.Slice:
|
||||||
|
rf.Set(reflect.MakeSlice(rf.Type(), 1, 1))
|
||||||
|
default:
|
||||||
|
t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tests = append(tests, struct {
|
||||||
|
name string
|
||||||
|
notify *ipn.Notify
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
name: "field-" + rt.Field(i).Name,
|
||||||
|
notify: n,
|
||||||
|
want: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
if got := isNotableNotify(tt.notify); got != tt.want {
|
||||||
|
t.Errorf("%v: got %v; want %v", tt.name, got, tt.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type rateLimitingBusSenderTester struct {
|
||||||
|
tb testing.TB
|
||||||
|
got []*ipn.Notify
|
||||||
|
clock *tstest.Clock
|
||||||
|
s *rateLimitingBusSender
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *rateLimitingBusSenderTester) init() {
|
||||||
|
if st.s != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
st.clock = tstest.NewClock(tstest.ClockOpts{
|
||||||
|
Start: time.Unix(1731777537, 0), // time I wrote this test :)
|
||||||
|
})
|
||||||
|
st.s = &rateLimitingBusSender{
|
||||||
|
clock: tstime.DefaultClock{Clock: st.clock},
|
||||||
|
fn: func(n *ipn.Notify) bool {
|
||||||
|
st.got = append(st.got, n)
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) {
|
||||||
|
st.tb.Helper()
|
||||||
|
st.init()
|
||||||
|
if !st.s.send(n) {
|
||||||
|
st.tb.Fatal("unexpected send failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *rateLimitingBusSenderTester) advance(d time.Duration) {
|
||||||
|
st.tb.Helper()
|
||||||
|
st.clock.Advance(d)
|
||||||
|
select {
|
||||||
|
case <-st.s.flushChan():
|
||||||
|
if !st.s.flush() {
|
||||||
|
st.tb.Fatal("unexpected flush failed")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRateLimitingBusSender(t *testing.T) {
|
||||||
|
nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
|
||||||
|
nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
|
||||||
|
eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
|
||||||
|
eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
|
||||||
|
|
||||||
|
t.Run("unbuffered", func(t *testing.T) {
|
||||||
|
st := &rateLimitingBusSenderTester{tb: t}
|
||||||
|
st.send(nm1)
|
||||||
|
st.send(nm2)
|
||||||
|
st.send(eng1)
|
||||||
|
st.send(eng2)
|
||||||
|
if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) {
|
||||||
|
t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("buffered", func(t *testing.T) {
|
||||||
|
st := &rateLimitingBusSenderTester{tb: t}
|
||||||
|
st.init()
|
||||||
|
st.s.interval = 1 * time.Second
|
||||||
|
st.send(&ipn.Notify{Version: "initial"})
|
||||||
|
if len(st.got) != 1 {
|
||||||
|
t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got))
|
||||||
|
}
|
||||||
|
st.send(nm1)
|
||||||
|
st.send(nm2)
|
||||||
|
st.send(eng1)
|
||||||
|
st.send(eng2)
|
||||||
|
if len(st.got) != 1 {
|
||||||
|
if len(st.got) != 1 {
|
||||||
|
t.Fatalf("got %d items; expected still just that first 1", len(st.got))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// But moving the clock should flush the rest, collasced into one new one.
|
||||||
|
st.advance(5 * time.Second)
|
||||||
|
if len(st.got) != 2 {
|
||||||
|
t.Fatalf("got %d items; want 2", len(st.got))
|
||||||
|
}
|
||||||
|
gotn := st.got[1]
|
||||||
|
if gotn.NetMap != nm2.NetMap {
|
||||||
|
t.Errorf("got wrong NetMap; got %p", gotn.NetMap)
|
||||||
|
}
|
||||||
|
if gotn.Engine != eng2.Engine {
|
||||||
|
t.Errorf("got wrong Engine; got %p", gotn.Engine)
|
||||||
|
}
|
||||||
|
if t.Failed() {
|
||||||
|
t.Logf("failed Notify was: %v", logger.AsJSON(gotn))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test the Run method
|
||||||
|
t.Run("run", func(t *testing.T) {
|
||||||
|
st := &rateLimitingBusSenderTester{tb: t}
|
||||||
|
st.init()
|
||||||
|
st.s.interval = 1 * time.Second
|
||||||
|
st.s.lastFlush = st.clock.Now() // pretend we just flushed
|
||||||
|
|
||||||
|
flushc := make(chan *ipn.Notify, 1)
|
||||||
|
st.s.fn = func(n *ipn.Notify) bool {
|
||||||
|
flushc <- n
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
didSend := make(chan bool, 2)
|
||||||
|
st.s.didSendTestHook = func() { didSend <- true }
|
||||||
|
waitSend := func() {
|
||||||
|
select {
|
||||||
|
case <-didSend:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Error("timeout waiting for call to send")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
incoming := make(chan *ipn.Notify, 2)
|
||||||
|
go func() {
|
||||||
|
incoming <- nm1
|
||||||
|
waitSend()
|
||||||
|
incoming <- nm2
|
||||||
|
waitSend()
|
||||||
|
st.advance(5 * time.Second)
|
||||||
|
select {
|
||||||
|
case n := <-flushc:
|
||||||
|
if n.NetMap != nm2.NetMap {
|
||||||
|
t.Errorf("got wrong NetMap; got %p", n.NetMap)
|
||||||
|
}
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
t.Error("timeout")
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
st.s.Run(ctx, incoming)
|
||||||
|
})
|
||||||
|
}
|
@ -2780,20 +2780,17 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A
|
|||||||
go b.pollRequestEngineStatus(ctx)
|
go b.pollRequestEngineStatus(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(marwan-at-work): check err
|
|
||||||
// TODO(marwan-at-work): streaming background logs?
|
// TODO(marwan-at-work): streaming background logs?
|
||||||
defer b.DeleteForegroundSession(sessionID)
|
defer b.DeleteForegroundSession(sessionID)
|
||||||
|
|
||||||
for {
|
sender := &rateLimitingBusSender{fn: fn}
|
||||||
select {
|
defer sender.close()
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
if mask&ipn.NotifyRateLimit != 0 {
|
||||||
case n := <-ch:
|
sender.interval = 3 * time.Second
|
||||||
if !fn(n) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sender.Run(ctx, ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx
|
// pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx
|
||||||
|
Loading…
Reference in New Issue
Block a user