cmd/derper,derp: implement per-client rate limits

Signed-off-by: Tom DNetto <tom@tailscale.com>
This commit is contained in:
Tom DNetto 2022-09-15 11:49:23 -07:00
parent cf61070e26
commit ea6c4d4fe1
5 changed files with 310 additions and 2 deletions

View File

@ -57,6 +57,11 @@
acceptConnLimit = flag.Float64("accept-connection-limit", math.Inf(+1), "rate limit for accepting new connection") acceptConnLimit = flag.Float64("accept-connection-limit", math.Inf(+1), "rate limit for accepting new connection")
acceptConnBurst = flag.Int("accept-connection-burst", math.MaxInt, "burst limit for accepting new connection") acceptConnBurst = flag.Int("accept-connection-burst", math.MaxInt, "burst limit for accepting new connection")
egressInterface = flag.String("egress-interface", "", "the interface to monitor for automatic ratelimit tuning")
egressDataLimit = flag.Int("egress-data-limit", 100*1024*1024/8, "the bandwidth in bytes/s the server will try to stay under, only applies if egress-interface is set")
clientDataMin = flag.Int("client-data-min-limit", 1024*1024/8, "minimum bandwidth in bytes/s for a single client, only applies if egress-interface is set")
clientDataBurst = flag.Int("client-data-burst", 3*1024*1024, "burst limit in bytes for forwarded data from a single client, only applies if egress-interface is set")
) )
var ( var (
@ -154,6 +159,12 @@ func main() {
s := derp.NewServer(cfg.PrivateKey, log.Printf) s := derp.NewServer(cfg.PrivateKey, log.Printf)
s.SetVerifyClient(*verifyClients) s.SetVerifyClient(*verifyClients)
if *egressInterface != "" && *egressDataLimit > 0 {
if err := s.StartEgressRateLimiter(*egressInterface, *egressDataLimit, *clientDataMin, *clientDataBurst); err != nil {
log.Fatalf("failed to start egress rate limiter: %v", err)
}
}
if *meshPSKFile != "" { if *meshPSKFile != "" {
b, err := ioutil.ReadFile(*meshPSKFile) b, err := ioutil.ReadFile(*meshPSKFile)
if err != nil { if err != nil {

View File

@ -107,6 +107,9 @@ type Server struct {
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
dupPolicy dupPolicy dupPolicy dupPolicy
clientDataLimit *uint64 // limit for how many bytes/s of content a client can send; atomic
clientDataBurst int // burst limit for how many bytes/s of content a client can send
// Counters: // Counters:
packetsSent, bytesSent expvar.Int packetsSent, bytesSent expvar.Int
packetsRecv, bytesRecv expvar.Int packetsRecv, bytesRecv expvar.Int
@ -314,7 +317,10 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
sentTo: map[key.NodePublic]map[key.NodePublic]int64{}, sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
avgQueueDuration: new(uint64), avgQueueDuration: new(uint64),
keyOfAddr: map[netip.AddrPort]key.NodePublic{}, keyOfAddr: map[netip.AddrPort]key.NodePublic{},
clientDataLimit: new(uint64),
clientDataBurst: 10 * 1024 * 1024, // 10Mb default burst
} }
atomic.StoreUint64(s.clientDataLimit, 12*1024*1024) // 12Mb/s default ratelimit
s.initMetacert() s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco") s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
s.packetsRecvOther = s.packetsRecvByKind.Get("other") s.packetsRecvOther = s.packetsRecvByKind.Get("other")
@ -325,12 +331,48 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
s.packetsDroppedReason.Get("queue_head"), s.packetsDroppedReason.Get("queue_head"),
s.packetsDroppedReason.Get("queue_tail"), s.packetsDroppedReason.Get("queue_tail"),
s.packetsDroppedReason.Get("write_error"), s.packetsDroppedReason.Get("write_error"),
s.packetsDroppedReason.Get("rate_limited"),
} }
s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco") s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco")
s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other") s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other")
return s return s
} }
// StartEgressRateLimiter starts dynamically adjusting the rate limit
// based on the desired limit and the utilization of the specified interface.
//
// It must be called before serving begins. All limits are in bytes/s.
func (s *Server) StartEgressRateLimiter(interfaceName string, egressDataLimit, clientDataMin, clientDataBurst int) error {
limiter, err := newEgressLimiter(interfaceName, uint64(egressDataLimit), uint64(clientDataMin))
if err != nil {
return fmt.Errorf("starting limiter: %v", err)
}
atomic.StoreUint64(s.clientDataLimit, uint64(egressDataLimit))
s.clientDataBurst = clientDataBurst
go func() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
limit, err := limiter.Limit()
if err != nil {
s.logf("derp: failed to update egress limiter: %v", err)
return
}
atomic.StoreUint64(s.clientDataLimit, uint64(limit))
<-t.C
if s.closed {
return
}
}
}()
return nil
}
// SetMesh sets the pre-shared key that regional DERP servers used to mesh // SetMesh sets the pre-shared key that regional DERP servers used to mesh
// amongst themselves. // amongst themselves.
// //
@ -664,6 +706,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
remoteIPPort, _ := netip.ParseAddrPort(remoteAddr) remoteIPPort, _ := netip.ParseAddrPort(remoteAddr)
rateLimiter := rate.NewLimiter(rate.Limit(atomic.LoadUint64(s.clientDataLimit)), s.clientDataBurst)
c := &sclient{ c := &sclient{
connNum: connNum, connNum: connNum,
s: s, s: s,
@ -681,6 +724,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
sendPongCh: make(chan [8]byte, 1), sendPongCh: make(chan [8]byte, 1),
peerGone: make(chan key.NodePublic), peerGone: make(chan key.NodePublic),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
rateLimiter: rateLimiter,
} }
if c.canMesh { if c.canMesh {
@ -757,6 +801,18 @@ func (c *sclient) run(ctx context.Context) error {
} }
} }
func (c *sclient) shouldRatelimitData(dataLen int) bool {
if c.canMesh {
return false // Mesh connections arent regular clients.
}
now := time.Now()
if rateLimit := atomic.LoadUint64(c.s.clientDataLimit); rateLimit != uint64(c.rateLimiter.Limit()) {
c.rateLimiter.SetLimitAt(now, rate.Limit(rateLimit))
}
return !c.rateLimiter.AllowN(now, dataLen)
}
func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error { func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error {
_, err := io.CopyN(ioutil.Discard, c.br, int64(fl)) _, err := io.CopyN(ioutil.Discard, c.br, int64(fl))
return err return err
@ -858,6 +914,11 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
} }
s.packetsForwardedIn.Add(1) s.packetsForwardedIn.Add(1)
if c.shouldRatelimitData(len(contents)) {
s.recordDrop(contents, c.key, dstKey, dropReasonRateLimited)
return nil
}
var dstLen int var dstLen int
var dst *sclient var dst *sclient
@ -908,6 +969,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return fmt.Errorf("client %x: recvPacket: %v", c.key, err) return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
} }
if c.shouldRatelimitData(len(contents)) {
s.recordDrop(contents, c.key, dstKey, dropReasonRateLimited)
return nil
}
var fwd PacketForwarder var fwd PacketForwarder
var dstLen int var dstLen int
var dst *sclient var dst *sclient
@ -962,6 +1028,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
dropReasonQueueTail // destination queue is full, dropped packet at queue tail dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonWriteError // OS write() failed dropReasonWriteError // OS write() failed
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting) dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
dropReasonRateLimited // send/forward packet content exceeds rate limit
) )
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
@ -1254,6 +1321,7 @@ type sclient struct {
canMesh bool // clientInfo had correct mesh token for inter-region routing canMesh bool // clientInfo had correct mesh token for inter-region routing
isDup atomic.Bool // whether more than 1 sclient for key is connected isDup atomic.Bool // whether more than 1 sclient for key is connected
isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups
rateLimiter *rate.Limiter
// replaceLimiter controls how quickly two connections with // replaceLimiter controls how quickly two connections with
// the same client key can kick each other off the server by // the same client key can kick each other off the server by
@ -1700,6 +1768,7 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("average_queue_duration_ms", expvar.Func(func() any { m.Set("average_queue_duration_ms", expvar.Func(func() any {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
})) }))
m.Set("client_ratelimit_bytes_per_second", expvar.Func(func() any { return atomic.LoadUint64(s.clientDataLimit) }))
var expvarVersion expvar.String var expvarVersion expvar.String
expvarVersion.Set(version.Long) expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion) m.Set("version", &expvarVersion)

View File

@ -19,11 +19,12 @@ func _() {
_ = x[dropReasonQueueTail-4] _ = x[dropReasonQueueTail-4]
_ = x[dropReasonWriteError-5] _ = x[dropReasonWriteError-5]
_ = x[dropReasonDupClient-6] _ = x[dropReasonDupClient-6]
_ = x[dropReasonRateLimited-7]
} }
const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient" const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClientRateLimited"
var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68} var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68, 79}
func (i dropReason) String() string { func (i dropReason) String() string {
if i < 0 || i >= dropReason(len(_dropReason_index)-1) { if i < 0 || i >= dropReason(len(_dropReason_index)-1) {

171
derp/limiter.go Normal file
View File

@ -0,0 +1,171 @@
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package derp
import (
"io/ioutil"
"strconv"
"strings"
"time"
)
func readTxBytes(interfaceName string) (uint64, error) {
v, err := ioutil.ReadFile("/sys/class/net/" + interfaceName + "/statistics/tx_bytes")
if err != nil {
return 0, err
}
tx, err := strconv.Atoi(strings.TrimSpace(string(v)))
if err != nil {
return 0, err
}
return uint64(tx), nil
}
type egressLimiter struct {
interfaceName string
limitBytesSec uint64 // the egress bytes/s we want to stay under.
minBytesSec uint64 // the minimum bytes/s rate limit.
lastTxBytes uint64
controlLoop limiterLoop
}
func newEgressLimiter(interfaceName string, limitBytesSec, minBytesSec uint64) (*egressLimiter, error) {
initial, err := readTxBytes(interfaceName)
if err != nil {
return nil, err
}
return &egressLimiter{
interfaceName: interfaceName,
limitBytesSec: limitBytesSec,
minBytesSec: minBytesSec,
lastTxBytes: initial,
controlLoop: newLimiterLoop(limitBytesSec, time.Now()),
}, err
}
// Limit returns the current rate limit value based on interface utilization.
func (e *egressLimiter) Limit() (uint64, error) {
rx, err := readTxBytes(e.interfaceName)
if err != nil {
return 0, err
}
last := e.lastTxBytes
e.lastTxBytes = rx
limit := e.controlLoop.tick(uint64(rx)-last, time.Now())
if limit < 0 || uint64(limit) < e.minBytesSec {
limit = float64(e.minBytesSec)
}
if uint64(limit) > e.limitBytesSec {
limit = float64(e.limitBytesSec)
}
return uint64(limit), nil
}
// PID loop values for the dynamic ratelimit.
// The wikipedia page on PID is recommended reading if you are not familiar
// with PID loops or open-loop control theory.
//
// Gain values are unitless, but operate on a feedback value in bytes
// and a setpoint value in bytes/s, and a time delta (dt) of seconds.
//
// These values are initial and should be tuned: These are just initial
// values based on first principles and vibin with pretty graphs.
const (
// Proportional gain.
// Given this represents a global ratelimit, the P term doesnt make a lot of
// sense, as each clients contribution to link utilization is entirely
// dependent on the client workload.
//
// For this reason, its set super low: Its expected the I term will do
// most of the heavy lifting.
limiterP float64 = 1.0 / 1024
// Derivative gain.
// This term reacts against 'trends', that is, the first derivative of
// the feedback value. Think of it like a rapid-change damper.
//
// This isnt super important, so again we've set it fairly low.
limiterD float64 = 0.003
// Integral gain.
//
// This is where all the heavy lifting happens. Basically, we increase
// the ratelimit (by limiterIP) when we have room to spare, and
// decrease it once we exceed 4/5ths of the limit (by limiterIN).
// The increase is linear to the error between feedback and the setpoint,
// but clamped proportionate to the limit.
//
// The decrease term is stronger than the increase term, so we 'backoff
// quickly' when we are approaching limits, but test the waters on
// the other end cautiously.
limiterIP float64 = 0.008
limiterIN float64 = 0.3
)
// limiterLoop exposes a dynamic ratelimit, based on the egress rate
// of some interface. The PID loop tries to keep egress at 4/5 of the limit.
type limiterLoop struct {
limitBytesSec uint64 // the egress bytes/s we want to stay under.
integral float64 // the integral sum at lastUpdate instant
lastEgress uint64 // feedback value of previous iteration, bytes/s
lastUpdate time.Time // instant at which last iteration occurred.
}
func newLimiterLoop(limitBytesSec uint64, now time.Time) limiterLoop {
return limiterLoop{
limitBytesSec: limitBytesSec * 4 / 5,
lastUpdate: now,
lastEgress: 0,
integral: float64(limitBytesSec),
}
}
// tick computes & returns the ratelimit value in bytes/s, computing
// the next iteration of the PID loop in the process.
func (l *limiterLoop) tick(egressBytesPerSec uint64, now time.Time) float64 {
var (
dt = now.Sub(l.lastUpdate).Seconds()
err = float64(l.limitBytesSec) - float64(egressBytesPerSec)
)
// Integral term.
var iDelta float64
if err > 0 {
iDelta = err * dt * limiterIP
} else {
iDelta = err * dt * limiterIN
}
// Constrain integral sum change to a 20th of the setpoint per second.
maxDelta := dt * float64(l.limitBytesSec) / 20
if iDelta > maxDelta {
iDelta = maxDelta
} else if iDelta < -maxDelta {
iDelta = -maxDelta
}
l.integral += iDelta
// Constrain integral sum to prevent windup.
if max := float64(l.limitBytesSec); l.integral > max {
l.integral = max
} else if l.integral < -max {
l.integral = -max
}
// Derivative term.
var d float64
if dt > 0 {
d = -(float64(egressBytesPerSec-l.lastEgress) / dt) * limiterD
}
// Proportional term.
p := limiterP * err
l.lastEgress = egressBytesPerSec
l.lastUpdate = now
output := p + l.integral + d
// fmt.Printf("in=%d, out=%0.3f: p=%0.2f d=%0.2f i=%0.2f\n", egressBytesPerSec, output, p, d, l.integral)
return output
}

56
derp/limiter_test.go Normal file
View File

@ -0,0 +1,56 @@
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package derp
import (
"testing"
"time"
)
func mb(mb uint64) uint64 {
return mb * 1024 * 1024
}
func TestLimiterLoopGradual(t *testing.T) {
// Make a limiter that tries to keep under 200Mb/s.
limit := mb(200)
start := time.Now()
l := newLimiterLoop(limit, start)
// Make sure the initial value is sane.
// Lets imagine the egress is only like 1Mb/s.
now := start.Add(time.Second)
if v := uint64(l.tick(1024*1024, now)); v < mb(150) || v > limit {
t.Errorf("initial value = %dMb/s, want 150 < value < limit", v/1024/1024)
}
// Tick through 10 minutes of low usage. Lets make sure the limit stays high.
lowUsage := limit / 10
for i := 0; i < 600; i++ {
now = now.Add(time.Second)
v := uint64(l.tick(lowUsage, now))
if v < mb(150) {
t.Errorf("[t=%0.f] limit too low for low usage: %d (expected >150)", now.Sub(start).Seconds(), v/1024/1024)
}
}
// Lets tick through 60 seconds of steadily-increasing usage.
for i := 0; i < 60; i++ {
now = now.Add(time.Second)
l.tick(uint64(i)*limit/60, now)
}
if v := uint64(l.tick(limit, now)); v > mb(100) || v < mb(1) {
t.Errorf("[t=%0.f] limit = %dMb/s, want 1-100Mb/s", now.Sub(start).Seconds(), v/1024/1024)
}
// Lets imagine we are at limits for 10s. Does the limit drop pretty hard?
for i := 0; i < 10; i++ {
now = now.Add(time.Second)
l.tick(limit, now)
}
if v := uint64(l.tick(limit, now)); v > mb(20) || v < mb(1) {
t.Errorf("[t=%0.f] limit = %dMb/s, want 1-20Mb/s", now.Sub(start).Seconds(), v/1024/1024)
}
}