derp: track histograms of packet drops by client

When packets are dropped, track this for both the client from which the packet
was received and the client to which it was sent. Distinguish drops by kind
(disco vs non-disco) and drop reason.

Every 10 seconds, calculate the rate of drops per second per client, and
collect a histogram showing the distribution of clients by drops per second.

Signed-off-by: Percy Wegmann <percy@tailscale.com>
This commit is contained in:
Percy Wegmann 2025-01-17 19:36:01 -06:00
parent c79b736a85
commit b341f3ffcf
No known key found for this signature in database
GPG Key ID: 29D8CDEB4C13D48B
2 changed files with 240 additions and 45 deletions

View File

@ -52,6 +52,8 @@ import (
"tailscale.com/util/set" "tailscale.com/util/set"
"tailscale.com/util/slicesx" "tailscale.com/util/slicesx"
"tailscale.com/version" "tailscale.com/version"
"github.com/prometheus/client_golang/prometheus"
) )
// verboseDropKeys is the set of destination public keys that should // verboseDropKeys is the set of destination public keys that should
@ -170,6 +172,8 @@ type Server struct {
meshUpdateBatchSize *metrics.Histogram meshUpdateBatchSize *metrics.Histogram
meshUpdateLoopCount *metrics.Histogram meshUpdateLoopCount *metrics.Histogram
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
packetsRecvDropRateByClient *prometheus.HistogramVec
packetsSentDropRateByClient *prometheus.HistogramVec
// verifyClientsLocalTailscaled only accepts client connections to the DERP // verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a // server if the clientKey is a known peer in the network, as specified by a
@ -381,6 +385,16 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}), meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}), meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}), bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
packetsRecvDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "clients_by_packet_loss_rate_recv",
Help: "Histogram counting clients by the packet loss rate of packets received from those clients, by reason and kind",
Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1},
}, []string{"kind", "reason"}),
packetsSentDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "clients_by_packet_loss_rate_send",
Help: "Histogram counting clients by the packet loss rate of packets sent to those clients, by reason and kind",
Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1},
}, []string{"kind", "reason"}),
keyOfAddr: map[netip.AddrPort]key.NodePublic{}, keyOfAddr: map[netip.AddrPort]key.NodePublic{},
clock: tstime.StdClock{}, clock: tstime.StdClock{},
} }
@ -391,6 +405,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
genPacketsDroppedCounters() genPacketsDroppedCounters()
s.perClientSendQueueDepth = getPerClientSendQueueDepth() s.perClientSendQueueDepth = getPerClientSendQueueDepth()
go s.collectDropsByClient()
return s return s
} }
@ -1105,7 +1121,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
} else { } else {
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
} }
s.recordDrop(contents, srcKey, dstKey, reason) s.recordDrop(c, dst, contents, srcKey, dstKey, reason)
return nil return nil
} }
@ -1122,7 +1138,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
s := c.s s := c.s
dstKey, contents, err := s.recvPacket(c.br, fl) dstKey, contents, err := c.recvPacket(fl)
if err != nil { if err != nil {
return fmt.Errorf("client %v: recvPacket: %v", c.key, err) return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
} }
@ -1158,7 +1174,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
} else { } else {
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
} }
s.recordDrop(contents, c.key, dstKey, reason) s.recordDrop(c, dst, contents, c.key, dstKey, reason)
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
return nil return nil
} }
@ -1196,16 +1212,108 @@ const (
dropReasonDupClient dropReason = "dup_client" // the public key is connected 2+ times (active/active, fighting) dropReasonDupClient dropReason = "dup_client" // the public key is connected 2+ times (active/active, fighting)
) )
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { type dropsByReason struct {
total float64
unknownDest float64
unknownDestOnFwd float64
goneDisconnected float64
queueHead float64
queueTail float64
writeError float64
dupClient float64
}
func (d *dropsByReason) recordDrop(reason dropReason) {
d.total += 1
switch reason {
case dropReasonUnknownDest:
d.unknownDest += 1
case dropReasonUnknownDestOnFwd:
d.unknownDestOnFwd += 1
case dropReasonGoneDisconnected:
d.goneDisconnected += 1
case dropReasonQueueHead:
d.queueHead += 1
case dropReasonQueueTail:
d.queueTail += 1
case dropReasonWriteError:
d.writeError += 1
case dropReasonDupClient:
d.dupClient += 1
}
}
func (d *dropsByReason) reset() {
d.total = 0
d.unknownDest = 0
d.unknownDestOnFwd = 0
d.goneDisconnected = 0
d.queueHead = 0
d.queueTail = 0
d.writeError = 0
d.dupClient = 0
}
// collect collects packet drop rates into the given HistogramVec. The rates
// are labeled with the given kind and the relevant drop reasons. The rates are
// calculated as the number of drops divided by the total number of packets.
// If includeDrops is true, the total drops are added to the given total. This
// is used for send statistics since the packets sent counter only includes
// packets that weren't dropped.
func (d *dropsByReason) collect(hv *prometheus.HistogramVec, kind packetKind, total float64, includeDrops bool) {
if includeDrops {
total += d.total
}
if total == 0 {
return
}
kindString := string(kind)
hv.WithLabelValues(kindString, string(dropReasonUnknownDest)).Observe(d.unknownDest / total)
hv.WithLabelValues(kindString, string(dropReasonUnknownDestOnFwd)).Observe(d.unknownDestOnFwd / total)
hv.WithLabelValues(kindString, string(dropReasonGoneDisconnected)).Observe(d.goneDisconnected / total)
hv.WithLabelValues(kindString, string(dropReasonQueueHead)).Observe(d.queueHead / total)
hv.WithLabelValues(kindString, string(dropReasonQueueTail)).Observe(d.queueTail / total)
hv.WithLabelValues(kindString, string(dropReasonWriteError)).Observe(d.writeError / total)
hv.WithLabelValues(kindString, string(dropReasonDupClient)).Observe(d.dupClient / total)
}
func (s *Server) recordDrop(src, dst *sclient, packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
labels := dropReasonKindLabels{ labels := dropReasonKindLabels{
Reason: string(reason), Reason: string(reason),
} }
looksDisco := disco.LooksLikeDiscoWrapper(packetBytes) looksDisco := disco.LooksLikeDiscoWrapper(packetBytes)
if src != nil {
src.dropsMu.Lock()
}
if dst != nil {
dst.dropsMu.Lock()
}
if looksDisco { if looksDisco {
labels.Kind = string(packetKindDisco) labels.Kind = string(packetKindDisco)
if src != nil {
src.discoRecvDropsByReason.recordDrop(reason)
}
if dst != nil {
dst.discoSendDropsByReason.recordDrop(reason)
}
} else { } else {
labels.Kind = string(packetKindOther) labels.Kind = string(packetKindOther)
if src != nil {
src.otherRecvDropsByReason.recordDrop(reason)
} }
if dst != nil {
dst.otherSendDropsByReason.recordDrop(reason)
}
}
if src != nil {
src.dropsMu.Unlock()
}
if dst != nil {
dst.dropsMu.Unlock()
}
packetsDropped.Add(labels, 1) packetsDropped.Add(labels, 1)
if verboseDropKeys[dstKey] { if verboseDropKeys[dstKey] {
@ -1220,7 +1328,6 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
} }
func (c *sclient) sendPkt(dst *sclient, p pkt) error { func (c *sclient) sendPkt(dst *sclient, p pkt) error {
s := c.s
dstKey := dst.key dstKey := dst.key
// Attempt to queue for sending up to 3 times. On each attempt, if // Attempt to queue for sending up to 3 times. On each attempt, if
@ -1233,7 +1340,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
for attempt := 0; attempt < 3; attempt++ { for attempt := 0; attempt < 3; attempt++ {
select { select {
case <-dst.done: case <-dst.done:
s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected) c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonGoneDisconnected)
dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt) dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt)
return nil return nil
default: default:
@ -1247,7 +1354,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
select { select {
case pkt := <-sendQueue: case pkt := <-sendQueue:
s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead) c.s.recordDrop(c, dst, pkt.bs, c.key, dstKey, dropReasonQueueHead)
c.recordQueueTime(pkt.enqueuedAt) c.recordQueueTime(pkt.enqueuedAt)
default: default:
} }
@ -1255,7 +1362,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
// Failed to make room for packet. This can happen in a heavily // Failed to make room for packet. This can happen in a heavily
// contended queue with racing writers. Give up and tail-drop in // contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked. // this case to keep reader unblocked.
s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail) c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonQueueTail)
dst.debugLogf("sendPkt attempt %d dropped, queue full") dst.debugLogf("sendPkt attempt %d dropped, queue full")
return nil return nil
@ -1497,7 +1604,10 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info
return clientKey, info, nil return clientKey, info, nil
} }
func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { func (c *sclient) recvPacket(frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
s := c.s
br := c.br
if frameLen < keyLen { if frameLen < keyLen {
return zpub, nil, errors.New("short send packet frame") return zpub, nil, errors.New("short send packet frame")
} }
@ -1512,12 +1622,15 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodeP
if _, err := io.ReadFull(br, contents); err != nil { if _, err := io.ReadFull(br, contents); err != nil {
return zpub, nil, err return zpub, nil, err
} }
s.packetsRecv.Add(1) s.packetsRecv.Add(1)
s.bytesRecv.Add(int64(len(contents))) s.bytesRecv.Add(int64(len(contents)))
if disco.LooksLikeDiscoWrapper(contents) { if disco.LooksLikeDiscoWrapper(contents) {
s.packetsRecvDisco.Add(1) s.packetsRecvDisco.Add(1)
c.packetsRecvDisco.Add(1)
} else { } else {
s.packetsRecvOther.Add(1) s.packetsRecvOther.Add(1)
c.packetsRecvOther.Add(1)
} }
return dstKey, contents, nil return dstKey, contents, nil
} }
@ -1598,6 +1711,15 @@ type sclient struct {
// client that it's trying to establish a direct connection // client that it's trying to establish a direct connection
// through us with a peer we have no record of. // through us with a peer we have no record of.
peerGoneLim *rate.Limiter peerGoneLim *rate.Limiter
packetsRecvDisco, packetsRecvOther, packetsSentDisco, packetsSentOther atomic.Uint64
// dropsMu guards the below packet drop metrics
dropsMu sync.Mutex
discoRecvDropsByReason dropsByReason
discoSendDropsByReason dropsByReason
otherRecvDropsByReason dropsByReason
otherSendDropsByReason dropsByReason
} }
func (c *sclient) presentFlags() PeerPresentFlags { func (c *sclient) presentFlags() PeerPresentFlags {
@ -1708,9 +1830,9 @@ func (c *sclient) onSendLoopDone() {
for { for {
select { select {
case pkt := <-c.sendQueue: case pkt := <-c.sendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
case pkt := <-c.discoSendQueue: case pkt := <-c.discoSendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
default: default:
return return
} }
@ -1917,7 +2039,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error)
defer func() { defer func() {
// Stats update. // Stats update.
if err != nil { if err != nil {
c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError) c.s.recordDrop(nil, c, contents, srcKey, c.key, dropReasonWriteError)
} else { } else {
c.s.packetsSent.Add(1) c.s.packetsSent.Add(1)
c.s.bytesSent.Add(int64(len(contents))) c.s.bytesSent.Add(int64(len(contents)))
@ -2149,6 +2271,8 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize) m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount) m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
m.Set("counter_buffered_write_frames", s.bufferedWriteFrames) m.Set("counter_buffered_write_frames", s.bufferedWriteFrames)
m.Set("packets_recv_drop_rate_by_client", collectorVar{s.packetsRecvDropRateByClient})
m.Set("packets_send_drop_rate_by_client", collectorVar{s.packetsSentDropRateByClient})
var expvarVersion expvar.String var expvarVersion expvar.String
expvarVersion.Set(version.Long()) expvarVersion.Set(version.Long())
m.Set("version", &expvarVersion) m.Set("version", &expvarVersion)
@ -2341,3 +2465,43 @@ func (w *lazyBufioWriter) Flush() error {
return err return err
} }
// monitorQueueDepths maintains histograms of send queue depths for disco and
// non-disco traffic. It observes queue depths for all active clients every 10
// seconds.
func (s *Server) collectDropsByClient() {
t := time.NewTicker(10 * time.Second)
var clients []*clientSet
for {
select {
case <-t.C:
clients = clients[:0]
s.mu.Lock()
for _, cs := range s.clients {
clients = append(clients, cs)
}
s.mu.Unlock()
for _, cs := range clients {
if c := cs.activeClient.Load(); c != nil {
discoRecv, otherRecv, discoSent, otherSent := c.packetsRecvDisco.Swap(0), c.packetsRecvOther.Swap(0), c.packetsSentDisco.Swap(0), c.packetsSentOther.Swap(0)
c.dropsMu.Lock()
c.discoRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindDisco, float64(discoRecv), false)
c.otherRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindOther, float64(otherRecv), false)
c.discoSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindDisco, float64(discoSent), true)
c.otherSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindOther, float64(otherSent), true)
c.discoRecvDropsByReason.reset()
c.otherRecvDropsByReason.reset()
c.discoSendDropsByReason.reset()
c.otherSendDropsByReason.reset()
c.dropsMu.Unlock()
}
}
// clients does retain clientSets, but these are fairly lightweight since they just point at actual clients
clients = clients[:0]
}
}
}

View File

@ -0,0 +1,31 @@
package derp
import (
"io"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
)
var (
expFormat = expfmt.NewFormat(expfmt.TypeTextPlain)
)
// collectorVar implements expvar.Var and metrics.PrometheusWriter
type collectorVar struct {
prometheus.Collector
}
func (cw collectorVar) String() string {
return `"CollectorVar"`
}
func (cw collectorVar) WritePrometheus(w io.Writer, name string) {
reg := prometheus.NewRegistry()
_ = reg.Register(cw)
mfs, _ := reg.Gather()
enc := expfmt.NewEncoder(w, expFormat)
for _, mf := range mfs {
_ = enc.Encode(mf)
}
}