From b341f3ffcf3f15be5a80d9fd4d2e877cb73dd3f9 Mon Sep 17 00:00:00 2001 From: Percy Wegmann Date: Fri, 17 Jan 2025 19:36:01 -0600 Subject: [PATCH] derp: track histograms of packet drops by client When packets are dropped, track this for both the client from which the packet was received and the client to which it was sent. Distinguish drops by kind (disco vs non-disco) and drop reason. Every 10 seconds, calculate the rate of drops per second per client, and collect a histogram showing the distribution of clients by drops per second. Signed-off-by: Percy Wegmann --- derp/derp_server.go | 254 ++++++++++++++++++++++++++++++------- derp/prometheus_adapter.go | 31 +++++ 2 files changed, 240 insertions(+), 45 deletions(-) create mode 100644 derp/prometheus_adapter.go diff --git a/derp/derp_server.go b/derp/derp_server.go index 08fd280a9..df5a1b2e3 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -52,6 +52,8 @@ import ( "tailscale.com/util/set" "tailscale.com/util/slicesx" "tailscale.com/version" + + "github.com/prometheus/client_golang/prometheus" ) // verboseDropKeys is the set of destination public keys that should @@ -139,37 +141,39 @@ type Server struct { debug bool // Counters: - packetsSent, bytesSent expvar.Int - packetsRecv, bytesRecv expvar.Int - packetsRecvByKind metrics.LabelMap - packetsRecvDisco *expvar.Int - packetsRecvOther *expvar.Int - _ align64 - packetsForwardedOut expvar.Int - packetsForwardedIn expvar.Int - peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent - peerGoneNotHereFrames expvar.Int // number of peer not here frames sent - gotPing expvar.Int // number of ping frames from client - sentPong expvar.Int // number of pong frames enqueued to client - accepts expvar.Int - curClients expvar.Int - curClientsNotIdeal expvar.Int - curHomeClients expvar.Int // ones with preferred - dupClientKeys expvar.Int // current number of public keys we have 2+ connections for - dupClientConns expvar.Int // current number of connections sharing a public key - dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed - unknownFrames expvar.Int - homeMovesIn expvar.Int // established clients announce home server moves in - homeMovesOut expvar.Int // established clients announce home server moves out - multiForwarderCreated expvar.Int - multiForwarderDeleted expvar.Int - removePktForwardOther expvar.Int - sclientWriteTimeouts expvar.Int - avgQueueDuration *uint64 // In milliseconds; accessed atomically - tcpRtt metrics.LabelMap // histogram - meshUpdateBatchSize *metrics.Histogram - meshUpdateLoopCount *metrics.Histogram - bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + packetsSent, bytesSent expvar.Int + packetsRecv, bytesRecv expvar.Int + packetsRecvByKind metrics.LabelMap + packetsRecvDisco *expvar.Int + packetsRecvOther *expvar.Int + _ align64 + packetsForwardedOut expvar.Int + packetsForwardedIn expvar.Int + peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent + peerGoneNotHereFrames expvar.Int // number of peer not here frames sent + gotPing expvar.Int // number of ping frames from client + sentPong expvar.Int // number of pong frames enqueued to client + accepts expvar.Int + curClients expvar.Int + curClientsNotIdeal expvar.Int + curHomeClients expvar.Int // ones with preferred + dupClientKeys expvar.Int // current number of public keys we have 2+ connections for + dupClientConns expvar.Int // current number of connections sharing a public key + dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed + unknownFrames expvar.Int + homeMovesIn expvar.Int // established clients announce home server moves in + homeMovesOut expvar.Int // established clients announce home server moves out + multiForwarderCreated expvar.Int + multiForwarderDeleted expvar.Int + removePktForwardOther expvar.Int + sclientWriteTimeouts expvar.Int + avgQueueDuration *uint64 // In milliseconds; accessed atomically + tcpRtt metrics.LabelMap // histogram + meshUpdateBatchSize *metrics.Histogram + meshUpdateLoopCount *metrics.Histogram + bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + packetsRecvDropRateByClient *prometheus.HistogramVec + packetsSentDropRateByClient *prometheus.HistogramVec // verifyClientsLocalTailscaled only accepts client connections to the DERP // server if the clientKey is a known peer in the network, as specified by a @@ -381,8 +385,18 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}), meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}), bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}), - keyOfAddr: map[netip.AddrPort]key.NodePublic{}, - clock: tstime.StdClock{}, + packetsRecvDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "clients_by_packet_loss_rate_recv", + Help: "Histogram counting clients by the packet loss rate of packets received from those clients, by reason and kind", + Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1}, + }, []string{"kind", "reason"}), + packetsSentDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "clients_by_packet_loss_rate_send", + Help: "Histogram counting clients by the packet loss rate of packets sent to those clients, by reason and kind", + Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1}, + }, []string{"kind", "reason"}), + keyOfAddr: map[netip.AddrPort]key.NodePublic{}, + clock: tstime.StdClock{}, } s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get(string(packetKindDisco)) @@ -391,6 +405,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { genPacketsDroppedCounters() s.perClientSendQueueDepth = getPerClientSendQueueDepth() + + go s.collectDropsByClient() return s } @@ -1105,7 +1121,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { } else { c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) } - s.recordDrop(contents, srcKey, dstKey, reason) + s.recordDrop(c, dst, contents, srcKey, dstKey, reason) return nil } @@ -1122,7 +1138,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { s := c.s - dstKey, contents, err := s.recvPacket(c.br, fl) + dstKey, contents, err := c.recvPacket(fl) if err != nil { return fmt.Errorf("client %v: recvPacket: %v", c.key, err) } @@ -1158,7 +1174,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } else { c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) } - s.recordDrop(contents, c.key, dstKey, reason) + s.recordDrop(c, dst, contents, c.key, dstKey, reason) c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) return nil } @@ -1196,16 +1212,108 @@ const ( dropReasonDupClient dropReason = "dup_client" // the public key is connected 2+ times (active/active, fighting) ) -func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { +type dropsByReason struct { + total float64 + unknownDest float64 + unknownDestOnFwd float64 + goneDisconnected float64 + queueHead float64 + queueTail float64 + writeError float64 + dupClient float64 +} + +func (d *dropsByReason) recordDrop(reason dropReason) { + d.total += 1 + switch reason { + case dropReasonUnknownDest: + d.unknownDest += 1 + case dropReasonUnknownDestOnFwd: + d.unknownDestOnFwd += 1 + case dropReasonGoneDisconnected: + d.goneDisconnected += 1 + case dropReasonQueueHead: + d.queueHead += 1 + case dropReasonQueueTail: + d.queueTail += 1 + case dropReasonWriteError: + d.writeError += 1 + case dropReasonDupClient: + d.dupClient += 1 + } +} + +func (d *dropsByReason) reset() { + d.total = 0 + d.unknownDest = 0 + d.unknownDestOnFwd = 0 + d.goneDisconnected = 0 + d.queueHead = 0 + d.queueTail = 0 + d.writeError = 0 + d.dupClient = 0 +} + +// collect collects packet drop rates into the given HistogramVec. The rates +// are labeled with the given kind and the relevant drop reasons. The rates are +// calculated as the number of drops divided by the total number of packets. +// If includeDrops is true, the total drops are added to the given total. This +// is used for send statistics since the packets sent counter only includes +// packets that weren't dropped. +func (d *dropsByReason) collect(hv *prometheus.HistogramVec, kind packetKind, total float64, includeDrops bool) { + if includeDrops { + total += d.total + } + + if total == 0 { + return + } + + kindString := string(kind) + hv.WithLabelValues(kindString, string(dropReasonUnknownDest)).Observe(d.unknownDest / total) + hv.WithLabelValues(kindString, string(dropReasonUnknownDestOnFwd)).Observe(d.unknownDestOnFwd / total) + hv.WithLabelValues(kindString, string(dropReasonGoneDisconnected)).Observe(d.goneDisconnected / total) + hv.WithLabelValues(kindString, string(dropReasonQueueHead)).Observe(d.queueHead / total) + hv.WithLabelValues(kindString, string(dropReasonQueueTail)).Observe(d.queueTail / total) + hv.WithLabelValues(kindString, string(dropReasonWriteError)).Observe(d.writeError / total) + hv.WithLabelValues(kindString, string(dropReasonDupClient)).Observe(d.dupClient / total) +} + +func (s *Server) recordDrop(src, dst *sclient, packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { labels := dropReasonKindLabels{ Reason: string(reason), } looksDisco := disco.LooksLikeDiscoWrapper(packetBytes) + if src != nil { + src.dropsMu.Lock() + } + if dst != nil { + dst.dropsMu.Lock() + } if looksDisco { labels.Kind = string(packetKindDisco) + if src != nil { + src.discoRecvDropsByReason.recordDrop(reason) + } + if dst != nil { + dst.discoSendDropsByReason.recordDrop(reason) + } } else { labels.Kind = string(packetKindOther) + if src != nil { + src.otherRecvDropsByReason.recordDrop(reason) + } + if dst != nil { + dst.otherSendDropsByReason.recordDrop(reason) + } } + if src != nil { + src.dropsMu.Unlock() + } + if dst != nil { + dst.dropsMu.Unlock() + } + packetsDropped.Add(labels, 1) if verboseDropKeys[dstKey] { @@ -1220,7 +1328,6 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r } func (c *sclient) sendPkt(dst *sclient, p pkt) error { - s := c.s dstKey := dst.key // Attempt to queue for sending up to 3 times. On each attempt, if @@ -1233,7 +1340,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { for attempt := 0; attempt < 3; attempt++ { select { case <-dst.done: - s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected) + c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonGoneDisconnected) dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt) return nil default: @@ -1247,7 +1354,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { select { case pkt := <-sendQueue: - s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead) + c.s.recordDrop(c, dst, pkt.bs, c.key, dstKey, dropReasonQueueHead) c.recordQueueTime(pkt.enqueuedAt) default: } @@ -1255,7 +1362,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { // Failed to make room for packet. This can happen in a heavily // contended queue with racing writers. Give up and tail-drop in // this case to keep reader unblocked. - s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail) + c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonQueueTail) dst.debugLogf("sendPkt attempt %d dropped, queue full") return nil @@ -1497,7 +1604,10 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info return clientKey, info, nil } -func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { +func (c *sclient) recvPacket(frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { + s := c.s + br := c.br + if frameLen < keyLen { return zpub, nil, errors.New("short send packet frame") } @@ -1512,12 +1622,15 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodeP if _, err := io.ReadFull(br, contents); err != nil { return zpub, nil, err } + s.packetsRecv.Add(1) s.bytesRecv.Add(int64(len(contents))) if disco.LooksLikeDiscoWrapper(contents) { s.packetsRecvDisco.Add(1) + c.packetsRecvDisco.Add(1) } else { s.packetsRecvOther.Add(1) + c.packetsRecvOther.Add(1) } return dstKey, contents, nil } @@ -1598,6 +1711,15 @@ type sclient struct { // client that it's trying to establish a direct connection // through us with a peer we have no record of. peerGoneLim *rate.Limiter + + packetsRecvDisco, packetsRecvOther, packetsSentDisco, packetsSentOther atomic.Uint64 + + // dropsMu guards the below packet drop metrics + dropsMu sync.Mutex + discoRecvDropsByReason dropsByReason + discoSendDropsByReason dropsByReason + otherRecvDropsByReason dropsByReason + otherSendDropsByReason dropsByReason } func (c *sclient) presentFlags() PeerPresentFlags { @@ -1708,9 +1830,9 @@ func (c *sclient) onSendLoopDone() { for { select { case pkt := <-c.sendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) case pkt := <-c.discoSendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) default: return } @@ -1917,7 +2039,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) defer func() { // Stats update. if err != nil { - c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError) + c.s.recordDrop(nil, c, contents, srcKey, c.key, dropReasonWriteError) } else { c.s.packetsSent.Add(1) c.s.bytesSent.Add(int64(len(contents))) @@ -2149,6 +2271,8 @@ func (s *Server) ExpVar() expvar.Var { m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize) m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount) m.Set("counter_buffered_write_frames", s.bufferedWriteFrames) + m.Set("packets_recv_drop_rate_by_client", collectorVar{s.packetsRecvDropRateByClient}) + m.Set("packets_send_drop_rate_by_client", collectorVar{s.packetsSentDropRateByClient}) var expvarVersion expvar.String expvarVersion.Set(version.Long()) m.Set("version", &expvarVersion) @@ -2341,3 +2465,43 @@ func (w *lazyBufioWriter) Flush() error { return err } + +// monitorQueueDepths maintains histograms of send queue depths for disco and +// non-disco traffic. It observes queue depths for all active clients every 10 +// seconds. +func (s *Server) collectDropsByClient() { + t := time.NewTicker(10 * time.Second) + + var clients []*clientSet + + for { + select { + case <-t.C: + clients = clients[:0] + s.mu.Lock() + for _, cs := range s.clients { + clients = append(clients, cs) + } + s.mu.Unlock() + + for _, cs := range clients { + if c := cs.activeClient.Load(); c != nil { + discoRecv, otherRecv, discoSent, otherSent := c.packetsRecvDisco.Swap(0), c.packetsRecvOther.Swap(0), c.packetsSentDisco.Swap(0), c.packetsSentOther.Swap(0) + c.dropsMu.Lock() + c.discoRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindDisco, float64(discoRecv), false) + c.otherRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindOther, float64(otherRecv), false) + c.discoSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindDisco, float64(discoSent), true) + c.otherSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindOther, float64(otherSent), true) + c.discoRecvDropsByReason.reset() + c.otherRecvDropsByReason.reset() + c.discoSendDropsByReason.reset() + c.otherSendDropsByReason.reset() + c.dropsMu.Unlock() + } + } + + // clients does retain clientSets, but these are fairly lightweight since they just point at actual clients + clients = clients[:0] + } + } +} diff --git a/derp/prometheus_adapter.go b/derp/prometheus_adapter.go new file mode 100644 index 000000000..0ab4537ce --- /dev/null +++ b/derp/prometheus_adapter.go @@ -0,0 +1,31 @@ +package derp + +import ( + "io" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" +) + +var ( + expFormat = expfmt.NewFormat(expfmt.TypeTextPlain) +) + +// collectorVar implements expvar.Var and metrics.PrometheusWriter +type collectorVar struct { + prometheus.Collector +} + +func (cw collectorVar) String() string { + return `"CollectorVar"` +} + +func (cw collectorVar) WritePrometheus(w io.Writer, name string) { + reg := prometheus.NewRegistry() + _ = reg.Register(cw) + mfs, _ := reg.Gather() + enc := expfmt.NewEncoder(w, expFormat) + for _, mf := range mfs { + _ = enc.Encode(mf) + } +}