ipn, tstime : add opt in rate limiting for netmap updates on the IPN bus

updates tailscale/corp#24553

Adds opt-in rate limiting to limit netmap updates to, at most, one every
3 seconds when the client includes the NotifyRateLimitNetmaps option
in the ipn bus watcher opts.   This should mitigate issues with excessive
memory and CPU usage in clients on large, busy tailnets.

Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
This commit is contained in:
Jonathan Nobels 2024-11-13 16:01:56 -05:00
parent 8fd471ce57
commit b24b36fcc8
5 changed files with 322 additions and 8 deletions

View File

@ -73,6 +73,15 @@ type EngineStatus struct {
NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles
NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client
NotifyRateLimitNetmaps // if set, rate limit netmap updates to once every DefaultNetmapRateLimit seconds
)
const (
// This is the minimum time between netmap updates when NotifyRateLimitNetmaps is included in the Notify opts.
// 3 seconds should be sufficient to avoid flooding the UI with netmap updates on large/chatty tailnets without
// causing noticable issues with the UI being out of date.
DefaultNetmapRateLimit = time.Duration(3 * time.Second)
)
// Notify is a communication from a backend (e.g. tailscaled) to a frontend

View File

@ -82,6 +82,7 @@
"tailscale.com/tka"
"tailscale.com/tsd"
"tailscale.com/tstime"
"tailscale.com/tstime/rate"
"tailscale.com/types/appctype"
"tailscale.com/types/dnstype"
"tailscale.com/types/empty"
@ -370,6 +371,16 @@ type LocalBackend struct {
// backend is healthy and captive portal detection is not required
// (sending false).
needsCaptiveDetection chan bool
// netmapRateLimiter rate limits netmap updates to to the IPN bus.
// It should be nil if the ipn bus options do not include the rate limiting flag.
// It is automatically created via setNetmapRateLimit.
netmapRateLimiter *rate.Limiter
// deferredNetmapCancel is used to cancel deferred netmap updates which
// were initially blocked due to rate limiting. We always attempt to send the latest
// netmap once the rate limiter allows it, discarding any pending netmaps.
deferredNetmapCancel context.CancelFunc
}
// HealthTracker returns the health tracker for the backend.
@ -475,6 +486,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCtx: captiveCtx,
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
needsCaptiveDetection: make(chan bool),
deferredNetmapCancel: nil,
}
mConn.SetNetInfoCallback(b.setNetInfo)
@ -965,6 +977,11 @@ func (b *LocalBackend) Shutdown() {
if b.notifyCancel != nil {
b.notifyCancel()
}
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
b.mu.Unlock()
b.webClientShutdown()
@ -1591,8 +1608,7 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control
// Update the DERP map in the health package, which uses it for health notifications
b.health.SetDERPMap(st.NetMap.DERPMap)
b.send(ipn.Notify{NetMap: st.NetMap})
b.sendNetmap(st.NetMap)
}
if st.URL != "" {
b.logf("Received auth URL: %.20v...", st.URL)
@ -1677,20 +1693,91 @@ func applySysPolicy(prefs *ipn.Prefs) (anyChange bool) {
return anyChange
}
// setNetmapRateLimit Sets the minimum interval between netmap updates on the IPN Bus (in seconds)
// If interval is 0 or negative, the rate limiter is disabled. Netmap rate limiting is
// disabled by default
// b.mu must be held
func (b *LocalBackend) setNetmapRateLimit(interval time.Duration) {
if interval > 0 {
b.netmapRateLimiter = rate.NewLimiter(rate.Every(interval), 1)
} else {
b.netmapRateLimiter = nil
}
}
// sendNetmap sends a netmap update to the IPN bus respecting the rate limiter. This function
// should be used for all netmap updates on the IPN bus unless there is some critical reason that
// a netmap update be sent immediately.
//
// A non-nil channel will be returned if the netmap update was deferred due to rate limiting. The channel will be closed
// when the netmap update is handled. true or false will be sent to the channel to indicate whether
// or not the netmap was sent or cancelled respectively. A nil return value indicates that the netmap
// was sent immediately. The returned value is primarily useful for testing and you can safely ignore
// it and just call this method at will.
func (b *LocalBackend) sendNetmap(nm *netmap.NetworkMap) chan bool {
notify := ipn.Notify{NetMap: nm}
b.mu.Lock()
// Cancel all pending netmap updates, they're stale and we have something newer
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
// No rate limiter? Send it.
// Rate limiter allows the send? Send it.
if b.netmapRateLimiter == nil || b.netmapRateLimiter.Allow() {
b.mu.Unlock()
b.send(notify)
return nil
}
// We're rate limited. Defer the netmap update
var ctx context.Context
ctx, cancel := context.WithCancel(b.ctx)
b.deferredNetmapCancel = cancel
// The rate limiter is set to Limit() events per second. Convert that back to
// the time interval we need to wait
delay := b.netmapRateLimiter.Delay()
b.mu.Unlock()
c := make(chan bool)
// Send the netmap update once the rate limiter allows it
go func() {
select {
case <-time.After(delay):
b.send(notify)
c <- true
case <-ctx.Done():
c <- false
}
close(c)
}()
return c
}
var _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil)
// UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater.
func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
if !b.MagicConn().UpdateNetmapDelta(muts) {
return false
}
var notify *ipn.Notify // non-nil if we need to send a Notify
// Will be sent if non nil
var netmap *netmap.NetworkMap
// Will send an empty notify if true and netmap is nil (for tests - see below)
var sendEmpty = false
defer func() {
if notify != nil {
if netmap != nil {
b.sendNetmap(netmap)
} else if sendEmpty {
notify := new(ipn.Notify)
b.send(*notify)
}
}()
unlock := b.lockAndGetUnlock()
defer unlock()
if !b.updateNetmapDeltaLocked(muts) {
@ -1712,13 +1799,14 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
slices.SortFunc(nm.Peers, func(a, b tailcfg.NodeView) int {
return cmp.Compare(a.ID(), b.ID())
})
notify = &ipn.Notify{NetMap: nm}
netmap = nm
} else if testenv.InTest() {
// In tests, send an empty Notify as a wake-up so end-to-end
// integration tests in another repo can check on the status of
// LocalBackend after processing deltas.
notify = new(ipn.Notify)
sendEmpty = true
}
return true
}
@ -2749,6 +2837,12 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A
cancel: cancel,
}
mak.Set(&b.notifyWatchers, sessionID, session)
if mask&ipn.NotifyRateLimitNetmaps != 0 {
b.setNetmapRateLimit(ipn.DefaultNetmapRateLimit)
} else {
b.setNetmapRateLimit(0)
}
b.mu.Unlock()
defer func() {
@ -4989,6 +5083,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
if authURL == "" {
systemd.Status("Stopped; run 'tailscale up' to log in")
}
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
case ipn.Starting, ipn.NeedsMachineAuth:
b.authReconfig()
// Needed so that UpdateEndpoints can run
@ -5001,7 +5098,9 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
}
systemd.Status("Connected; %s; %s", activeLogin, strings.Join(addrStrs, " "))
case ipn.NoState:
// Do nothing.
if b.deferredNetmapCancel != nil {
b.deferredNetmapCancel()
}
default:
b.logf("[unexpected] unknown newState %#v", newState)
}

View File

@ -572,6 +572,164 @@ func TestSetUseExitNodeEnabled(t *testing.T) {
}
}
func TestNetmapRateLimiting(t *testing.T) {
b := new(LocalBackend)
var cancel context.CancelFunc
b.ctx, cancel = context.WithCancel(context.Background())
b.logf = t.Logf
b.setNetmapRateLimit(time.Duration(100 * time.Millisecond))
if b.netmapRateLimiter == nil {
t.Fatalf("no netmapRateLimiter")
}
now := time.Now()
b.netMap = new(netmap.NetworkMap)
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("First should be immediately sent immediately")
}
// We just sent a netmap, so these should all be rate limited. c1 should get cancelled.
// c2 should be cancelled. c3 should be sent after 100ms.
c1 := b.sendNetmap(b.netMap)
c2 := b.sendNetmap(b.netMap)
// Let's spam a bunch more we won't track, just for fun
for i := 0; i < 10; i++ {
if g := b.sendNetmap(b.netMap); g == nil {
t.Errorf("should have been deferred")
}
}
// This is our last netmap send. It should be deferred and sent after 100ms.
c3 := b.sendNetmap(b.netMap)
// The first onnetmape should be cancelled
select {
case sent := <-c1:
if sent {
t.Errorf("Second netmap update was not cacncelled; sent got %v, want %v", sent, false)
}
}
// The second netmap should be cancelled
select {
case sent := <-c2:
if sent {
t.Errorf("Second netmap update was not cacncelled; got %v, sent want %v", sent, false)
}
}
// The last netmap should be sent after about 100ms
select {
case sent := <-c3:
if !sent {
t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true)
}
}
elapsed := time.Since(now)
if elapsed < 90*time.Millisecond {
t.Errorf("elapsed time %v is too short", elapsed)
}
if elapsed > 110*time.Millisecond {
t.Errorf("elapsed time %v is too long", elapsed)
}
// The rate limiter should be reset at this point and the next netmap should be sent immediately.
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("netmap should be immediately sent immediately")
}
// We're rate limited - becuase we just sent a netmap.
// Lower the rate limit and make sure we can send again once the rate limit is up.
b.setNetmapRateLimit(time.Duration(10 * time.Millisecond))
time.Sleep(12 * time.Millisecond)
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("netmap should be immediately sent immediately")
}
// Check to make sure the cancellation function is properly set and does what it's
// supposed to do.
c4 := b.sendNetmap(b.netMap)
b.deferredNetmapCancel()
select {
case sent := <-c4:
if sent {
t.Errorf("Fourth netmap should have been cancelled; sent got %v, want %v", sent, true)
}
}
cancel()
}
func TestNetmapDeferral(t *testing.T) {
b := new(LocalBackend)
var cancel context.CancelFunc
b.ctx, cancel = context.WithCancel(context.Background())
b.logf = t.Logf
w := 40 * time.Millisecond
// Ensure that a deferred netmap gets sent with the correct delay
b.setNetmapRateLimit(time.Duration(w))
start := time.Now()
b.sendNetmap(b.netMap)
// Snooze for 20ms
time.Sleep(20 * time.Millisecond)
// This one should be deferred and sent after ~40-20ms
c := b.sendNetmap(b.netMap)
select {
case sent := <-c:
if !sent {
t.Errorf("Fourth netmap update was deferred but not sent; sent got %v, want %v", sent, true)
}
}
slop := 5 * time.Millisecond
g := time.Since(start) * time.Millisecond
// The difference between the elapsed time and the expected time should be within the slop
// and our total time should always be slightly greater than the expected time.
if w-g > slop || w > g {
t.Errorf("elapsed time is too incorrect w:%v g:%v", w, g)
}
cancel()
}
func TestNetmapNoRateLimiting(t *testing.T) {
b := new(LocalBackend)
var cancel context.CancelFunc
b.ctx, cancel = context.WithCancel(context.Background())
b.logf = t.Logf
// A zero rate limit means send-at-will
b.setNetmapRateLimit(0)
b.netMap = new(netmap.NetworkMap)
for i := 0; i < 10; i++ {
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("should be immediately sent immediately")
}
}
// A negative rate limit also means send-at-will
b.setNetmapRateLimit(-1)
for i := 0; i < 10; i++ {
if g := b.sendNetmap(b.netMap); g != nil {
t.Errorf("should be immediately sent immediately")
}
}
cancel()
}
func TestFileTargets(t *testing.T) {
b := new(LocalBackend)
_, err := b.FileTargets()

View File

@ -56,6 +56,29 @@ func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{limit: r, burst: float64(b)}
}
// Limit returns the maximum overall event rate.
func (lim *Limiter) Limit() Limit {
return lim.limit
}
// Delay returns the approximate minimum duration before sufficient tokens
// will be available to permit another event.
func (lim *Limiter) Delay() time.Duration {
lim.mu.Lock()
defer lim.mu.Unlock()
// Calculate the new number of tokens available due to the passage of time.
elapsed := mono.Now().Sub(lim.last)
tokens := lim.tokens + float64(lim.limit)*elapsed.Seconds()
if tokens > lim.burst {
tokens = lim.burst
}
// Calculate the time until the next token is available.
wait := time.Duration((1-tokens)/float64(lim.limit)*1e9) * time.Nanosecond
return wait
}
// Allow reports whether an event may happen now.
func (lim *Limiter) Allow() bool {
return lim.allow(mono.Now())

View File

@ -145,6 +145,31 @@ func TestSimultaneousRequests(t *testing.T) {
}
}
func TestDelay(t *testing.T) {
lim := NewLimiter(Every(1*time.Second), 1)
// We'll allow for 10 ms of slop to avoid flakiness.
// w should always be just slightly greater than d
slop := int64(10)
lim.Allow()
d := lim.Delay().Milliseconds()
w := int64(1000)
if w-d > slop || d > w {
t.Errorf("Delay() = %v want 1000", d)
}
time.Sleep(50 * time.Millisecond)
w = 950
d = lim.Delay().Milliseconds()
// ~50 milliseconds will have passed,
if w-d > slop || d > w {
t.Errorf("Delay() = %v want 950", d)
}
}
func BenchmarkAllowN(b *testing.B) {
lim := NewLimiter(Every(1*time.Second), 1)
now := mono.Now()