From ea6c4d4fe17320c393e0f6a9c71ccae3b83ea28f Mon Sep 17 00:00:00 2001 From: Tom DNetto Date: Thu, 15 Sep 2022 11:49:23 -0700 Subject: [PATCH] cmd/derper,derp: implement per-client rate limits Signed-off-by: Tom DNetto --- cmd/derper/derper.go | 11 +++ derp/derp_server.go | 69 +++++++++++++++ derp/dropreason_string.go | 5 +- derp/limiter.go | 171 ++++++++++++++++++++++++++++++++++++++ derp/limiter_test.go | 56 +++++++++++++ 5 files changed, 310 insertions(+), 2 deletions(-) create mode 100644 derp/limiter.go create mode 100644 derp/limiter_test.go diff --git a/cmd/derper/derper.go b/cmd/derper/derper.go index f98b45375..07a38af9f 100644 --- a/cmd/derper/derper.go +++ b/cmd/derper/derper.go @@ -57,6 +57,11 @@ acceptConnLimit = flag.Float64("accept-connection-limit", math.Inf(+1), "rate limit for accepting new connection") acceptConnBurst = flag.Int("accept-connection-burst", math.MaxInt, "burst limit for accepting new connection") + + egressInterface = flag.String("egress-interface", "", "the interface to monitor for automatic ratelimit tuning") + egressDataLimit = flag.Int("egress-data-limit", 100*1024*1024/8, "the bandwidth in bytes/s the server will try to stay under, only applies if egress-interface is set") + clientDataMin = flag.Int("client-data-min-limit", 1024*1024/8, "minimum bandwidth in bytes/s for a single client, only applies if egress-interface is set") + clientDataBurst = flag.Int("client-data-burst", 3*1024*1024, "burst limit in bytes for forwarded data from a single client, only applies if egress-interface is set") ) var ( @@ -154,6 +159,12 @@ func main() { s := derp.NewServer(cfg.PrivateKey, log.Printf) s.SetVerifyClient(*verifyClients) + if *egressInterface != "" && *egressDataLimit > 0 { + if err := s.StartEgressRateLimiter(*egressInterface, *egressDataLimit, *clientDataMin, *clientDataBurst); err != nil { + log.Fatalf("failed to start egress rate limiter: %v", err) + } + } + if *meshPSKFile != "" { b, err := ioutil.ReadFile(*meshPSKFile) if err != nil { diff --git a/derp/derp_server.go b/derp/derp_server.go index 52680cace..afb0e6b23 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -107,6 +107,9 @@ type Server struct { metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate dupPolicy dupPolicy + clientDataLimit *uint64 // limit for how many bytes/s of content a client can send; atomic + clientDataBurst int // burst limit for how many bytes/s of content a client can send + // Counters: packetsSent, bytesSent expvar.Int packetsRecv, bytesRecv expvar.Int @@ -314,7 +317,10 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { sentTo: map[key.NodePublic]map[key.NodePublic]int64{}, avgQueueDuration: new(uint64), keyOfAddr: map[netip.AddrPort]key.NodePublic{}, + clientDataLimit: new(uint64), + clientDataBurst: 10 * 1024 * 1024, // 10Mb default burst } + atomic.StoreUint64(s.clientDataLimit, 12*1024*1024) // 12Mb/s default ratelimit s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get("disco") s.packetsRecvOther = s.packetsRecvByKind.Get("other") @@ -325,12 +331,48 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { s.packetsDroppedReason.Get("queue_head"), s.packetsDroppedReason.Get("queue_tail"), s.packetsDroppedReason.Get("write_error"), + s.packetsDroppedReason.Get("rate_limited"), } s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco") s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other") return s } +// StartEgressRateLimiter starts dynamically adjusting the rate limit +// based on the desired limit and the utilization of the specified interface. +// +// It must be called before serving begins. All limits are in bytes/s. +func (s *Server) StartEgressRateLimiter(interfaceName string, egressDataLimit, clientDataMin, clientDataBurst int) error { + limiter, err := newEgressLimiter(interfaceName, uint64(egressDataLimit), uint64(clientDataMin)) + if err != nil { + return fmt.Errorf("starting limiter: %v", err) + } + + atomic.StoreUint64(s.clientDataLimit, uint64(egressDataLimit)) + s.clientDataBurst = clientDataBurst + + go func() { + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + limit, err := limiter.Limit() + if err != nil { + s.logf("derp: failed to update egress limiter: %v", err) + return + } + atomic.StoreUint64(s.clientDataLimit, uint64(limit)) + + <-t.C + if s.closed { + return + } + } + }() + + return nil +} + // SetMesh sets the pre-shared key that regional DERP servers used to mesh // amongst themselves. // @@ -664,6 +706,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem remoteIPPort, _ := netip.ParseAddrPort(remoteAddr) + rateLimiter := rate.NewLimiter(rate.Limit(atomic.LoadUint64(s.clientDataLimit)), s.clientDataBurst) c := &sclient{ connNum: connNum, s: s, @@ -681,6 +724,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem sendPongCh: make(chan [8]byte, 1), peerGone: make(chan key.NodePublic), canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, + rateLimiter: rateLimiter, } if c.canMesh { @@ -757,6 +801,18 @@ func (c *sclient) run(ctx context.Context) error { } } +func (c *sclient) shouldRatelimitData(dataLen int) bool { + if c.canMesh { + return false // Mesh connections arent regular clients. + } + + now := time.Now() + if rateLimit := atomic.LoadUint64(c.s.clientDataLimit); rateLimit != uint64(c.rateLimiter.Limit()) { + c.rateLimiter.SetLimitAt(now, rate.Limit(rateLimit)) + } + return !c.rateLimiter.AllowN(now, dataLen) +} + func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error { _, err := io.CopyN(ioutil.Discard, c.br, int64(fl)) return err @@ -858,6 +914,11 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { } s.packetsForwardedIn.Add(1) + if c.shouldRatelimitData(len(contents)) { + s.recordDrop(contents, c.key, dstKey, dropReasonRateLimited) + return nil + } + var dstLen int var dst *sclient @@ -908,6 +969,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return fmt.Errorf("client %x: recvPacket: %v", c.key, err) } + if c.shouldRatelimitData(len(contents)) { + s.recordDrop(contents, c.key, dstKey, dropReasonRateLimited) + return nil + } + var fwd PacketForwarder var dstLen int var dst *sclient @@ -962,6 +1028,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { dropReasonQueueTail // destination queue is full, dropped packet at queue tail dropReasonWriteError // OS write() failed dropReasonDupClient // the public key is connected 2+ times (active/active, fighting) + dropReasonRateLimited // send/forward packet content exceeds rate limit ) func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { @@ -1254,6 +1321,7 @@ type sclient struct { canMesh bool // clientInfo had correct mesh token for inter-region routing isDup atomic.Bool // whether more than 1 sclient for key is connected isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups + rateLimiter *rate.Limiter // replaceLimiter controls how quickly two connections with // the same client key can kick each other off the server by @@ -1700,6 +1768,7 @@ func (s *Server) ExpVar() expvar.Var { m.Set("average_queue_duration_ms", expvar.Func(func() any { return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) })) + m.Set("client_ratelimit_bytes_per_second", expvar.Func(func() any { return atomic.LoadUint64(s.clientDataLimit) })) var expvarVersion expvar.String expvarVersion.Set(version.Long) m.Set("version", &expvarVersion) diff --git a/derp/dropreason_string.go b/derp/dropreason_string.go index 5ed41a26b..1f58e88a6 100644 --- a/derp/dropreason_string.go +++ b/derp/dropreason_string.go @@ -19,11 +19,12 @@ func _() { _ = x[dropReasonQueueTail-4] _ = x[dropReasonWriteError-5] _ = x[dropReasonDupClient-6] + _ = x[dropReasonRateLimited-7] } -const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient" +const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClientRateLimited" -var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68} +var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68, 79} func (i dropReason) String() string { if i < 0 || i >= dropReason(len(_dropReason_index)-1) { diff --git a/derp/limiter.go b/derp/limiter.go new file mode 100644 index 000000000..1cd653013 --- /dev/null +++ b/derp/limiter.go @@ -0,0 +1,171 @@ +// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package derp + +import ( + "io/ioutil" + "strconv" + "strings" + "time" +) + +func readTxBytes(interfaceName string) (uint64, error) { + v, err := ioutil.ReadFile("/sys/class/net/" + interfaceName + "/statistics/tx_bytes") + if err != nil { + return 0, err + } + tx, err := strconv.Atoi(strings.TrimSpace(string(v))) + if err != nil { + return 0, err + } + return uint64(tx), nil +} + +type egressLimiter struct { + interfaceName string + limitBytesSec uint64 // the egress bytes/s we want to stay under. + minBytesSec uint64 // the minimum bytes/s rate limit. + + lastTxBytes uint64 + controlLoop limiterLoop +} + +func newEgressLimiter(interfaceName string, limitBytesSec, minBytesSec uint64) (*egressLimiter, error) { + initial, err := readTxBytes(interfaceName) + if err != nil { + return nil, err + } + + return &egressLimiter{ + interfaceName: interfaceName, + limitBytesSec: limitBytesSec, + minBytesSec: minBytesSec, + lastTxBytes: initial, + controlLoop: newLimiterLoop(limitBytesSec, time.Now()), + }, err +} + +// Limit returns the current rate limit value based on interface utilization. +func (e *egressLimiter) Limit() (uint64, error) { + rx, err := readTxBytes(e.interfaceName) + if err != nil { + return 0, err + } + + last := e.lastTxBytes + e.lastTxBytes = rx + + limit := e.controlLoop.tick(uint64(rx)-last, time.Now()) + if limit < 0 || uint64(limit) < e.minBytesSec { + limit = float64(e.minBytesSec) + } + if uint64(limit) > e.limitBytesSec { + limit = float64(e.limitBytesSec) + } + return uint64(limit), nil +} + +// PID loop values for the dynamic ratelimit. +// The wikipedia page on PID is recommended reading if you are not familiar +// with PID loops or open-loop control theory. +// +// Gain values are unitless, but operate on a feedback value in bytes +// and a setpoint value in bytes/s, and a time delta (dt) of seconds. +// +// These values are initial and should be tuned: These are just initial +// values based on first principles and vibin with pretty graphs. +const ( + // Proportional gain. + // Given this represents a global ratelimit, the P term doesnt make a lot of + // sense, as each clients contribution to link utilization is entirely + // dependent on the client workload. + // + // For this reason, its set super low: Its expected the I term will do + // most of the heavy lifting. + limiterP float64 = 1.0 / 1024 + // Derivative gain. + // This term reacts against 'trends', that is, the first derivative of + // the feedback value. Think of it like a rapid-change damper. + // + // This isnt super important, so again we've set it fairly low. + limiterD float64 = 0.003 + // Integral gain. + // + // This is where all the heavy lifting happens. Basically, we increase + // the ratelimit (by limiterIP) when we have room to spare, and + // decrease it once we exceed 4/5ths of the limit (by limiterIN). + // The increase is linear to the error between feedback and the setpoint, + // but clamped proportionate to the limit. + // + // The decrease term is stronger than the increase term, so we 'backoff + // quickly' when we are approaching limits, but test the waters on + // the other end cautiously. + limiterIP float64 = 0.008 + limiterIN float64 = 0.3 +) + +// limiterLoop exposes a dynamic ratelimit, based on the egress rate +// of some interface. The PID loop tries to keep egress at 4/5 of the limit. +type limiterLoop struct { + limitBytesSec uint64 // the egress bytes/s we want to stay under. + + integral float64 // the integral sum at lastUpdate instant + lastEgress uint64 // feedback value of previous iteration, bytes/s + lastUpdate time.Time // instant at which last iteration occurred. +} + +func newLimiterLoop(limitBytesSec uint64, now time.Time) limiterLoop { + return limiterLoop{ + limitBytesSec: limitBytesSec * 4 / 5, + lastUpdate: now, + lastEgress: 0, + integral: float64(limitBytesSec), + } +} + +// tick computes & returns the ratelimit value in bytes/s, computing +// the next iteration of the PID loop in the process. +func (l *limiterLoop) tick(egressBytesPerSec uint64, now time.Time) float64 { + var ( + dt = now.Sub(l.lastUpdate).Seconds() + err = float64(l.limitBytesSec) - float64(egressBytesPerSec) + ) + + // Integral term. + var iDelta float64 + if err > 0 { + iDelta = err * dt * limiterIP + } else { + iDelta = err * dt * limiterIN + } + // Constrain integral sum change to a 20th of the setpoint per second. + maxDelta := dt * float64(l.limitBytesSec) / 20 + if iDelta > maxDelta { + iDelta = maxDelta + } else if iDelta < -maxDelta { + iDelta = -maxDelta + } + l.integral += iDelta + // Constrain integral sum to prevent windup. + if max := float64(l.limitBytesSec); l.integral > max { + l.integral = max + } else if l.integral < -max { + l.integral = -max + } + + // Derivative term. + var d float64 + if dt > 0 { + d = -(float64(egressBytesPerSec-l.lastEgress) / dt) * limiterD + } + // Proportional term. + p := limiterP * err + + l.lastEgress = egressBytesPerSec + l.lastUpdate = now + output := p + l.integral + d + // fmt.Printf("in=%d, out=%0.3f: p=%0.2f d=%0.2f i=%0.2f\n", egressBytesPerSec, output, p, d, l.integral) + return output +} diff --git a/derp/limiter_test.go b/derp/limiter_test.go new file mode 100644 index 000000000..be0466143 --- /dev/null +++ b/derp/limiter_test.go @@ -0,0 +1,56 @@ +// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package derp + +import ( + "testing" + "time" +) + +func mb(mb uint64) uint64 { + return mb * 1024 * 1024 +} + +func TestLimiterLoopGradual(t *testing.T) { + // Make a limiter that tries to keep under 200Mb/s. + limit := mb(200) + start := time.Now() + l := newLimiterLoop(limit, start) + + // Make sure the initial value is sane. + // Lets imagine the egress is only like 1Mb/s. + now := start.Add(time.Second) + if v := uint64(l.tick(1024*1024, now)); v < mb(150) || v > limit { + t.Errorf("initial value = %dMb/s, want 150 < value < limit", v/1024/1024) + } + + // Tick through 10 minutes of low usage. Lets make sure the limit stays high. + lowUsage := limit / 10 + for i := 0; i < 600; i++ { + now = now.Add(time.Second) + v := uint64(l.tick(lowUsage, now)) + + if v < mb(150) { + t.Errorf("[t=%0.f] limit too low for low usage: %d (expected >150)", now.Sub(start).Seconds(), v/1024/1024) + } + } + + // Lets tick through 60 seconds of steadily-increasing usage. + for i := 0; i < 60; i++ { + now = now.Add(time.Second) + l.tick(uint64(i)*limit/60, now) + } + if v := uint64(l.tick(limit, now)); v > mb(100) || v < mb(1) { + t.Errorf("[t=%0.f] limit = %dMb/s, want 1-100Mb/s", now.Sub(start).Seconds(), v/1024/1024) + } + // Lets imagine we are at limits for 10s. Does the limit drop pretty hard? + for i := 0; i < 10; i++ { + now = now.Add(time.Second) + l.tick(limit, now) + } + if v := uint64(l.tick(limit, now)); v > mb(20) || v < mb(1) { + t.Errorf("[t=%0.f] limit = %dMb/s, want 1-20Mb/s", now.Sub(start).Seconds(), v/1024/1024) + } +}