Merge pull request #2092 from tailscale/queue_latency

derp: add pkt queue latency timer
This commit is contained in:
Julian Knodt 2021-06-11 09:48:38 -07:00 committed by GitHub
commit 525eb5ce41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,6 +20,7 @@
"io"
"io/ioutil"
"log"
"math"
"math/big"
"math/rand"
"os"
@ -27,6 +28,7 @@
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go4.org/mem"
@ -120,6 +122,7 @@ type Server struct {
multiForwarderCreated expvar.Int
multiForwarderDeleted expvar.Int
removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically
mu sync.Mutex
closed bool
@ -182,6 +185,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
memSys0: ms.Sys,
watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
avgQueueDuration: new(uint64),
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
@ -612,6 +616,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: time.Now(),
src: srcKey,
})
}
@ -666,6 +671,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
p := pkt{
bs: contents,
enqueuedAt: time.Now(),
src: c.key,
}
return c.sendPkt(dst, p)
@ -696,7 +702,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
}
select {
case <-dst.sendQueue:
case pkt := <-dst.sendQueue:
s.packetsDropped.Add(1)
s.packetsDroppedQueueHead.Add(1)
if verboseDropKeys[dstKey] {
@ -705,6 +711,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
msg := fmt.Sprintf("tail drop %s -> %s", p.src.ShortString(), dstKey.ShortString())
c.s.limitedLogf(msg)
}
c.recordQueueTime(pkt.enqueuedAt)
if debug {
c.logf("dropping packet from client %x queue head", dstKey)
}
@ -927,11 +934,13 @@ type pkt struct {
// src is the who's the sender of the packet.
src key.Public
// enqueuedAt is when a packet was put onto a queue before it was sent,
// and is used for reporting metrics on the duration of packets in the queue.
enqueuedAt time.Time
// bs is the data packet bytes.
// The memory is owned by pkt.
bs []byte
// TODO(danderson): enqueue time, to measure queue latency?
}
func (c *sclient) setPreferred(v bool) {
@ -959,6 +968,25 @@ func (c *sclient) setPreferred(v bool) {
}
}
// expMovingAverage returns the new moving average given the previous average,
// a new value, and an alpha decay factor.
// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
func expMovingAverage(prev, newValue, alpha float64) float64 {
return alpha*newValue + (1-alpha)*prev
}
// recordQueueTime updates the average queue duration metric after a packet has been sent.
func (c *sclient) recordQueueTime(enqueuedAt time.Time) {
elapsed := float64(time.Since(enqueuedAt).Milliseconds())
for {
old := atomic.LoadUint64(c.s.avgQueueDuration)
newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1)
if atomic.CompareAndSwapUint64(c.s.avgQueueDuration, old, math.Float64bits(newAvg)) {
break
}
}
}
func (c *sclient) sendLoop(ctx context.Context) error {
defer func() {
// If the sender shuts down unilaterally due to an error, close so
@ -1002,6 +1030,7 @@ func (c *sclient) sendLoop(ctx context.Context) error {
continue
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case <-keepAliveTick.C:
werr = c.sendKeepAlive()
@ -1025,6 +1054,7 @@ func (c *sclient) sendLoop(ctx context.Context) error {
continue
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case <-keepAliveTick.C:
werr = c.sendKeepAlive()
}
@ -1290,6 +1320,9 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("multiforwarder_created", &s.multiForwarderCreated)
m.Set("multiforwarder_deleted", &s.multiForwarderDeleted)
m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther)
m.Set("average_queue_duration_ms", expvar.Func(func() interface{} {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
}))
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)