diff --git a/ipn/backend.go b/ipn/backend.go index 76ad1910b..87ecdb18f 100644 --- a/ipn/backend.go +++ b/ipn/backend.go @@ -73,6 +73,15 @@ type EngineStatus struct { NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client + + NotifyRateLimitNetmaps // if set, rate limit netmap updates to once every DefaultNetmapRateLimit seconds +) + +const ( + // This is the minimum time between netmap updates when NotifyRateLimitNetmaps is included in the Notify opts. + // 3 seconds should be sufficient to avoid flooding the UI with netmap updates on large/chatty tailnets without + // causing noticable issues with the UI being out of date. + DefaultNetmapRateLimit = time.Duration(3 * time.Second) ) // Notify is a communication from a backend (e.g. tailscaled) to a frontend diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 493762fcc..4af13791f 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -82,6 +82,7 @@ "tailscale.com/tka" "tailscale.com/tsd" "tailscale.com/tstime" + "tailscale.com/tstime/rate" "tailscale.com/types/appctype" "tailscale.com/types/dnstype" "tailscale.com/types/empty" @@ -370,6 +371,16 @@ type LocalBackend struct { // backend is healthy and captive portal detection is not required // (sending false). needsCaptiveDetection chan bool + + // netmapRateLimiter rate limits netmap updates to to the IPN bus. + // It should be nil if the ipn bus options do not include the rate limiting flag. + // It is automatically created via setNetmapRateLimit. + netmapRateLimiter *rate.Limiter + + // deferredNetmapCancel is used to cancel deferred netmap updates which + // were initially blocked due to rate limiting. We always attempt to send the latest + // netmap once the rate limiter allows it, discarding any pending netmaps. + deferredNetmapCancel context.CancelFunc } // HealthTracker returns the health tracker for the backend. @@ -475,6 +486,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo captiveCtx: captiveCtx, captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running needsCaptiveDetection: make(chan bool), + deferredNetmapCancel: nil, } mConn.SetNetInfoCallback(b.setNetInfo) @@ -965,6 +977,11 @@ func (b *LocalBackend) Shutdown() { if b.notifyCancel != nil { b.notifyCancel() } + + if b.deferredNetmapCancel != nil { + b.deferredNetmapCancel() + } + b.mu.Unlock() b.webClientShutdown() @@ -1591,8 +1608,7 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control // Update the DERP map in the health package, which uses it for health notifications b.health.SetDERPMap(st.NetMap.DERPMap) - - b.send(ipn.Notify{NetMap: st.NetMap}) + b.sendNetmap(st.NetMap) } if st.URL != "" { b.logf("Received auth URL: %.20v...", st.URL) @@ -1677,20 +1693,91 @@ func applySysPolicy(prefs *ipn.Prefs) (anyChange bool) { return anyChange } +// setNetmapRateLimit Sets the minimum interval between netmap updates on the IPN Bus (in seconds) +// If interval is 0 or negative, the rate limiter is disabled. Netmap rate limiting is +// disabled by default +// b.mu must be held +func (b *LocalBackend) setNetmapRateLimit(interval time.Duration) { + if interval > 0 { + b.netmapRateLimiter = rate.NewLimiter(rate.Every(interval), 1) + } else { + b.netmapRateLimiter = nil + } +} + +// sendNetmap sends a netmap update to the IPN bus respecting the rate limiter. This function +// should be used for all netmap updates on the IPN bus unless there is some critical reason that +// a netmap update be sent immediately. +// +// A non-nil channel will be returned if the netmap update was deferred due to rate limiting. The channel will be closed +// when the netmap update is handled. true or false will be sent to the channel to indicate whether +// or not the netmap was sent or cancelled respectively. A nil return value indicates that the netmap +// was sent immediately. The returned value is primarily useful for testing and you can safely ignore +// it and just call this method at will. +func (b *LocalBackend) sendNetmap(nm *netmap.NetworkMap) chan bool { + notify := ipn.Notify{NetMap: nm} + + b.mu.Lock() + + // Cancel all pending netmap updates, they're stale and we have something newer + if b.deferredNetmapCancel != nil { + b.deferredNetmapCancel() + } + + // No rate limiter? Send it. + // Rate limiter allows the send? Send it. + if b.netmapRateLimiter == nil || b.netmapRateLimiter.Allow() { + b.mu.Unlock() + b.send(notify) + return nil + } + + // We're rate limited. Defer the netmap update + var ctx context.Context + ctx, cancel := context.WithCancel(b.ctx) + b.deferredNetmapCancel = cancel + // The rate limiter is set to Limit() events per second. Convert that back to + // the time interval we need to wait + delay := b.netmapRateLimiter.Delay() + b.mu.Unlock() + + c := make(chan bool) + + // Send the netmap update once the rate limiter allows it + go func() { + select { + case <-time.After(delay): + b.send(notify) + c <- true + case <-ctx.Done(): + c <- false + } + close(c) + }() + return c +} + var _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil) -// UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater. func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { if !b.MagicConn().UpdateNetmapDelta(muts) { return false } - var notify *ipn.Notify // non-nil if we need to send a Notify + // Will be sent if non nil + var netmap *netmap.NetworkMap + // Will send an empty notify if true and netmap is nil (for tests - see below) + var sendEmpty = false + defer func() { - if notify != nil { + if netmap != nil { + b.sendNetmap(netmap) + } else if sendEmpty { + notify := new(ipn.Notify) b.send(*notify) } }() + unlock := b.lockAndGetUnlock() defer unlock() if !b.updateNetmapDeltaLocked(muts) { @@ -1712,13 +1799,14 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo slices.SortFunc(nm.Peers, func(a, b tailcfg.NodeView) int { return cmp.Compare(a.ID(), b.ID()) }) - notify = &ipn.Notify{NetMap: nm} + netmap = nm } else if testenv.InTest() { // In tests, send an empty Notify as a wake-up so end-to-end // integration tests in another repo can check on the status of // LocalBackend after processing deltas. - notify = new(ipn.Notify) + sendEmpty = true } + return true } @@ -2749,6 +2837,12 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A cancel: cancel, } mak.Set(&b.notifyWatchers, sessionID, session) + if mask&ipn.NotifyRateLimitNetmaps != 0 { + b.setNetmapRateLimit(ipn.DefaultNetmapRateLimit) + } else { + b.setNetmapRateLimit(0) + } + b.mu.Unlock() defer func() { @@ -4989,6 +5083,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock if authURL == "" { systemd.Status("Stopped; run 'tailscale up' to log in") } + if b.deferredNetmapCancel != nil { + b.deferredNetmapCancel() + } case ipn.Starting, ipn.NeedsMachineAuth: b.authReconfig() // Needed so that UpdateEndpoints can run @@ -5001,7 +5098,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock } systemd.Status("Connected; %s; %s", activeLogin, strings.Join(addrStrs, " ")) case ipn.NoState: - // Do nothing. + if b.deferredNetmapCancel != nil { + b.deferredNetmapCancel() + } default: b.logf("[unexpected] unknown newState %#v", newState) } diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 6dad2dba4..090e45609 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -572,6 +572,164 @@ func TestSetUseExitNodeEnabled(t *testing.T) { } } +func TestNetmapRateLimiting(t *testing.T) { + b := new(LocalBackend) + var cancel context.CancelFunc + b.ctx, cancel = context.WithCancel(context.Background()) + b.logf = t.Logf + b.setNetmapRateLimit(time.Duration(100 * time.Millisecond)) + + if b.netmapRateLimiter == nil { + t.Fatalf("no netmapRateLimiter") + } + + now := time.Now() + + b.netMap = new(netmap.NetworkMap) + if g := b.sendNetmap(b.netMap); g != nil { + t.Errorf("First should be immediately sent immediately") + } + + // We just sent a netmap, so these should all be rate limited. c1 should get cancelled. + // c2 should be cancelled. c3 should be sent after 100ms. + c1 := b.sendNetmap(b.netMap) + c2 := b.sendNetmap(b.netMap) + + // Let's spam a bunch more we won't track, just for fun + for i := 0; i < 10; i++ { + if g := b.sendNetmap(b.netMap); g == nil { + t.Errorf("should have been deferred") + } + } + + // This is our last netmap send. It should be deferred and sent after 100ms. + c3 := b.sendNetmap(b.netMap) + + // The first onnetmape should be cancelled + select { + case sent := <-c1: + if sent { + t.Errorf("Second netmap update was not cacncelled; sent got %v, want %v", sent, false) + } + } + + // The second netmap should be cancelled + select { + case sent := <-c2: + if sent { + t.Errorf("Second netmap update was not cacncelled; got %v, sent want %v", sent, false) + } + } + + // The last netmap should be sent after about 100ms + select { + case sent := <-c3: + if !sent { + t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true) + } + } + + elapsed := time.Since(now) + if elapsed < 90*time.Millisecond { + t.Errorf("elapsed time %v is too short", elapsed) + } + if elapsed > 110*time.Millisecond { + t.Errorf("elapsed time %v is too long", elapsed) + } + + // The rate limiter should be reset at this point and the next netmap should be sent immediately. + if g := b.sendNetmap(b.netMap); g != nil { + t.Errorf("netmap should be immediately sent immediately") + } + + // We're rate limited - becuase we just sent a netmap. + // Lower the rate limit and make sure we can send again once the rate limit is up. + b.setNetmapRateLimit(time.Duration(10 * time.Millisecond)) + time.Sleep(12 * time.Millisecond) + if g := b.sendNetmap(b.netMap); g != nil { + t.Errorf("netmap should be immediately sent immediately") + } + + // Check to make sure the cancellation function is properly set and does what it's + // supposed to do. + c4 := b.sendNetmap(b.netMap) + b.deferredNetmapCancel() + select { + case sent := <-c4: + if sent { + t.Errorf("Fourth netmap should have been cancelled; sent got %v, want %v", sent, true) + } + } + + cancel() +} + +func TestNetmapDeferral(t *testing.T) { + b := new(LocalBackend) + var cancel context.CancelFunc + b.ctx, cancel = context.WithCancel(context.Background()) + b.logf = t.Logf + + w := 40 * time.Millisecond + + // Ensure that a deferred netmap gets sent with the correct delay + b.setNetmapRateLimit(time.Duration(w)) + start := time.Now() + b.sendNetmap(b.netMap) + + // Snooze for 20ms + time.Sleep(20 * time.Millisecond) + + // This one should be deferred and sent after ~40-20ms + c := b.sendNetmap(b.netMap) + select { + case sent := <-c: + if !sent { + t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true) + } + } + + slop := 5 * time.Millisecond + g := time.Since(start) * time.Millisecond + + // The difference between the elapsed time and the expected time should be within the slop + // and our total time should always be slightly greater than the expected time. + if w-g > slop || w > g { + t.Errorf("elapsed time is too incorrect w:%v g:%v", w, g) + } + + cancel() +} + +func TestNetmapNoRateLimiting(t *testing.T) { + b := new(LocalBackend) + var cancel context.CancelFunc + b.ctx, cancel = context.WithCancel(context.Background()) + b.logf = t.Logf + + // A zero rate limit means send-at-will + b.setNetmapRateLimit(0) + + b.netMap = new(netmap.NetworkMap) + + for i := 0; i < 10; i++ { + if g := b.sendNetmap(b.netMap); g != nil { + t.Errorf("should be immediately sent immediately") + } + } + + // A negative rate limit also means send-at-will + b.setNetmapRateLimit(-1) + + for i := 0; i < 10; i++ { + if g := b.sendNetmap(b.netMap); g != nil { + t.Errorf("should be immediately sent immediately") + } + } + + cancel() +} + func TestFileTargets(t *testing.T) { b := new(LocalBackend) _, err := b.FileTargets() diff --git a/tstime/rate/rate.go b/tstime/rate/rate.go index f0473862a..75ebcfd8e 100644 --- a/tstime/rate/rate.go +++ b/tstime/rate/rate.go @@ -56,6 +56,29 @@ func NewLimiter(r Limit, b int) *Limiter { return &Limiter{limit: r, burst: float64(b)} } +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + return lim.limit +} + +// Delay returns the approximate minimum duration before sufficient tokens +// will be available to permit another event. +func (lim *Limiter) Delay() time.Duration { + lim.mu.Lock() + defer lim.mu.Unlock() + + // Calculate the new number of tokens available due to the passage of time. + elapsed := mono.Now().Sub(lim.last) + tokens := lim.tokens + float64(lim.limit)*elapsed.Seconds() + if tokens > lim.burst { + tokens = lim.burst + } + + // Calculate the time until the next token is available. + wait := time.Duration((1-tokens)/float64(lim.limit)*1e9) * time.Nanosecond + return wait +} + // Allow reports whether an event may happen now. func (lim *Limiter) Allow() bool { return lim.allow(mono.Now()) diff --git a/tstime/rate/rate_test.go b/tstime/rate/rate_test.go index dc3f9e84b..0fb9344da 100644 --- a/tstime/rate/rate_test.go +++ b/tstime/rate/rate_test.go @@ -145,6 +145,31 @@ func TestSimultaneousRequests(t *testing.T) { } } +func TestDelay(t *testing.T) { + lim := NewLimiter(Every(1*time.Second), 1) + // We'll allow for 10 ms of slop to avoid flakiness. + // w should always be just slightly greater than d + slop := int64(10) + + lim.Allow() + + d := lim.Delay().Milliseconds() + w := int64(1000) + + if w-d > slop || d > w { + t.Errorf("Delay() = %v want 1000", d) + } + + time.Sleep(50 * time.Millisecond) + w = 950 + d = lim.Delay().Milliseconds() + + // ~50 milliseconds will have passed, + if w-d > slop || d > w { + t.Errorf("Delay() = %v want 950", d) + } +} + func BenchmarkAllowN(b *testing.B) { lim := NewLimiter(Every(1*time.Second), 1) now := mono.Now()