diff --git a/derp/derp_server.go b/derp/derp_server.go index 08fd280a9..df5a1b2e3 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -52,6 +52,8 @@ import ( "tailscale.com/util/set" "tailscale.com/util/slicesx" "tailscale.com/version" + + "github.com/prometheus/client_golang/prometheus" ) // verboseDropKeys is the set of destination public keys that should @@ -139,37 +141,39 @@ type Server struct { debug bool // Counters: - packetsSent, bytesSent expvar.Int - packetsRecv, bytesRecv expvar.Int - packetsRecvByKind metrics.LabelMap - packetsRecvDisco *expvar.Int - packetsRecvOther *expvar.Int - _ align64 - packetsForwardedOut expvar.Int - packetsForwardedIn expvar.Int - peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent - peerGoneNotHereFrames expvar.Int // number of peer not here frames sent - gotPing expvar.Int // number of ping frames from client - sentPong expvar.Int // number of pong frames enqueued to client - accepts expvar.Int - curClients expvar.Int - curClientsNotIdeal expvar.Int - curHomeClients expvar.Int // ones with preferred - dupClientKeys expvar.Int // current number of public keys we have 2+ connections for - dupClientConns expvar.Int // current number of connections sharing a public key - dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed - unknownFrames expvar.Int - homeMovesIn expvar.Int // established clients announce home server moves in - homeMovesOut expvar.Int // established clients announce home server moves out - multiForwarderCreated expvar.Int - multiForwarderDeleted expvar.Int - removePktForwardOther expvar.Int - sclientWriteTimeouts expvar.Int - avgQueueDuration *uint64 // In milliseconds; accessed atomically - tcpRtt metrics.LabelMap // histogram - meshUpdateBatchSize *metrics.Histogram - meshUpdateLoopCount *metrics.Histogram - bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + packetsSent, bytesSent expvar.Int + packetsRecv, bytesRecv expvar.Int + packetsRecvByKind metrics.LabelMap + packetsRecvDisco *expvar.Int + packetsRecvOther *expvar.Int + _ align64 + packetsForwardedOut expvar.Int + packetsForwardedIn expvar.Int + peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent + peerGoneNotHereFrames expvar.Int // number of peer not here frames sent + gotPing expvar.Int // number of ping frames from client + sentPong expvar.Int // number of pong frames enqueued to client + accepts expvar.Int + curClients expvar.Int + curClientsNotIdeal expvar.Int + curHomeClients expvar.Int // ones with preferred + dupClientKeys expvar.Int // current number of public keys we have 2+ connections for + dupClientConns expvar.Int // current number of connections sharing a public key + dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed + unknownFrames expvar.Int + homeMovesIn expvar.Int // established clients announce home server moves in + homeMovesOut expvar.Int // established clients announce home server moves out + multiForwarderCreated expvar.Int + multiForwarderDeleted expvar.Int + removePktForwardOther expvar.Int + sclientWriteTimeouts expvar.Int + avgQueueDuration *uint64 // In milliseconds; accessed atomically + tcpRtt metrics.LabelMap // histogram + meshUpdateBatchSize *metrics.Histogram + meshUpdateLoopCount *metrics.Histogram + bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + packetsRecvDropRateByClient *prometheus.HistogramVec + packetsSentDropRateByClient *prometheus.HistogramVec // verifyClientsLocalTailscaled only accepts client connections to the DERP // server if the clientKey is a known peer in the network, as specified by a @@ -381,8 +385,18 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}), meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}), bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}), - keyOfAddr: map[netip.AddrPort]key.NodePublic{}, - clock: tstime.StdClock{}, + packetsRecvDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "clients_by_packet_loss_rate_recv", + Help: "Histogram counting clients by the packet loss rate of packets received from those clients, by reason and kind", + Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1}, + }, []string{"kind", "reason"}), + packetsSentDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "clients_by_packet_loss_rate_send", + Help: "Histogram counting clients by the packet loss rate of packets sent to those clients, by reason and kind", + Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1}, + }, []string{"kind", "reason"}), + keyOfAddr: map[netip.AddrPort]key.NodePublic{}, + clock: tstime.StdClock{}, } s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get(string(packetKindDisco)) @@ -391,6 +405,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { genPacketsDroppedCounters() s.perClientSendQueueDepth = getPerClientSendQueueDepth() + + go s.collectDropsByClient() return s } @@ -1105,7 +1121,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { } else { c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) } - s.recordDrop(contents, srcKey, dstKey, reason) + s.recordDrop(c, dst, contents, srcKey, dstKey, reason) return nil } @@ -1122,7 +1138,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { s := c.s - dstKey, contents, err := s.recvPacket(c.br, fl) + dstKey, contents, err := c.recvPacket(fl) if err != nil { return fmt.Errorf("client %v: recvPacket: %v", c.key, err) } @@ -1158,7 +1174,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } else { c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) } - s.recordDrop(contents, c.key, dstKey, reason) + s.recordDrop(c, dst, contents, c.key, dstKey, reason) c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) return nil } @@ -1196,16 +1212,108 @@ const ( dropReasonDupClient dropReason = "dup_client" // the public key is connected 2+ times (active/active, fighting) ) -func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { +type dropsByReason struct { + total float64 + unknownDest float64 + unknownDestOnFwd float64 + goneDisconnected float64 + queueHead float64 + queueTail float64 + writeError float64 + dupClient float64 +} + +func (d *dropsByReason) recordDrop(reason dropReason) { + d.total += 1 + switch reason { + case dropReasonUnknownDest: + d.unknownDest += 1 + case dropReasonUnknownDestOnFwd: + d.unknownDestOnFwd += 1 + case dropReasonGoneDisconnected: + d.goneDisconnected += 1 + case dropReasonQueueHead: + d.queueHead += 1 + case dropReasonQueueTail: + d.queueTail += 1 + case dropReasonWriteError: + d.writeError += 1 + case dropReasonDupClient: + d.dupClient += 1 + } +} + +func (d *dropsByReason) reset() { + d.total = 0 + d.unknownDest = 0 + d.unknownDestOnFwd = 0 + d.goneDisconnected = 0 + d.queueHead = 0 + d.queueTail = 0 + d.writeError = 0 + d.dupClient = 0 +} + +// collect collects packet drop rates into the given HistogramVec. The rates +// are labeled with the given kind and the relevant drop reasons. The rates are +// calculated as the number of drops divided by the total number of packets. +// If includeDrops is true, the total drops are added to the given total. This +// is used for send statistics since the packets sent counter only includes +// packets that weren't dropped. +func (d *dropsByReason) collect(hv *prometheus.HistogramVec, kind packetKind, total float64, includeDrops bool) { + if includeDrops { + total += d.total + } + + if total == 0 { + return + } + + kindString := string(kind) + hv.WithLabelValues(kindString, string(dropReasonUnknownDest)).Observe(d.unknownDest / total) + hv.WithLabelValues(kindString, string(dropReasonUnknownDestOnFwd)).Observe(d.unknownDestOnFwd / total) + hv.WithLabelValues(kindString, string(dropReasonGoneDisconnected)).Observe(d.goneDisconnected / total) + hv.WithLabelValues(kindString, string(dropReasonQueueHead)).Observe(d.queueHead / total) + hv.WithLabelValues(kindString, string(dropReasonQueueTail)).Observe(d.queueTail / total) + hv.WithLabelValues(kindString, string(dropReasonWriteError)).Observe(d.writeError / total) + hv.WithLabelValues(kindString, string(dropReasonDupClient)).Observe(d.dupClient / total) +} + +func (s *Server) recordDrop(src, dst *sclient, packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { labels := dropReasonKindLabels{ Reason: string(reason), } looksDisco := disco.LooksLikeDiscoWrapper(packetBytes) + if src != nil { + src.dropsMu.Lock() + } + if dst != nil { + dst.dropsMu.Lock() + } if looksDisco { labels.Kind = string(packetKindDisco) + if src != nil { + src.discoRecvDropsByReason.recordDrop(reason) + } + if dst != nil { + dst.discoSendDropsByReason.recordDrop(reason) + } } else { labels.Kind = string(packetKindOther) + if src != nil { + src.otherRecvDropsByReason.recordDrop(reason) + } + if dst != nil { + dst.otherSendDropsByReason.recordDrop(reason) + } } + if src != nil { + src.dropsMu.Unlock() + } + if dst != nil { + dst.dropsMu.Unlock() + } + packetsDropped.Add(labels, 1) if verboseDropKeys[dstKey] { @@ -1220,7 +1328,6 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r } func (c *sclient) sendPkt(dst *sclient, p pkt) error { - s := c.s dstKey := dst.key // Attempt to queue for sending up to 3 times. On each attempt, if @@ -1233,7 +1340,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { for attempt := 0; attempt < 3; attempt++ { select { case <-dst.done: - s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected) + c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonGoneDisconnected) dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt) return nil default: @@ -1247,7 +1354,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { select { case pkt := <-sendQueue: - s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead) + c.s.recordDrop(c, dst, pkt.bs, c.key, dstKey, dropReasonQueueHead) c.recordQueueTime(pkt.enqueuedAt) default: } @@ -1255,7 +1362,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { // Failed to make room for packet. This can happen in a heavily // contended queue with racing writers. Give up and tail-drop in // this case to keep reader unblocked. - s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail) + c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonQueueTail) dst.debugLogf("sendPkt attempt %d dropped, queue full") return nil @@ -1497,7 +1604,10 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info return clientKey, info, nil } -func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { +func (c *sclient) recvPacket(frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { + s := c.s + br := c.br + if frameLen < keyLen { return zpub, nil, errors.New("short send packet frame") } @@ -1512,12 +1622,15 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodeP if _, err := io.ReadFull(br, contents); err != nil { return zpub, nil, err } + s.packetsRecv.Add(1) s.bytesRecv.Add(int64(len(contents))) if disco.LooksLikeDiscoWrapper(contents) { s.packetsRecvDisco.Add(1) + c.packetsRecvDisco.Add(1) } else { s.packetsRecvOther.Add(1) + c.packetsRecvOther.Add(1) } return dstKey, contents, nil } @@ -1598,6 +1711,15 @@ type sclient struct { // client that it's trying to establish a direct connection // through us with a peer we have no record of. peerGoneLim *rate.Limiter + + packetsRecvDisco, packetsRecvOther, packetsSentDisco, packetsSentOther atomic.Uint64 + + // dropsMu guards the below packet drop metrics + dropsMu sync.Mutex + discoRecvDropsByReason dropsByReason + discoSendDropsByReason dropsByReason + otherRecvDropsByReason dropsByReason + otherSendDropsByReason dropsByReason } func (c *sclient) presentFlags() PeerPresentFlags { @@ -1708,9 +1830,9 @@ func (c *sclient) onSendLoopDone() { for { select { case pkt := <-c.sendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) case pkt := <-c.discoSendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) default: return } @@ -1917,7 +2039,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) defer func() { // Stats update. if err != nil { - c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError) + c.s.recordDrop(nil, c, contents, srcKey, c.key, dropReasonWriteError) } else { c.s.packetsSent.Add(1) c.s.bytesSent.Add(int64(len(contents))) @@ -2149,6 +2271,8 @@ func (s *Server) ExpVar() expvar.Var { m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize) m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount) m.Set("counter_buffered_write_frames", s.bufferedWriteFrames) + m.Set("packets_recv_drop_rate_by_client", collectorVar{s.packetsRecvDropRateByClient}) + m.Set("packets_send_drop_rate_by_client", collectorVar{s.packetsSentDropRateByClient}) var expvarVersion expvar.String expvarVersion.Set(version.Long()) m.Set("version", &expvarVersion) @@ -2341,3 +2465,43 @@ func (w *lazyBufioWriter) Flush() error { return err } + +// monitorQueueDepths maintains histograms of send queue depths for disco and +// non-disco traffic. It observes queue depths for all active clients every 10 +// seconds. +func (s *Server) collectDropsByClient() { + t := time.NewTicker(10 * time.Second) + + var clients []*clientSet + + for { + select { + case <-t.C: + clients = clients[:0] + s.mu.Lock() + for _, cs := range s.clients { + clients = append(clients, cs) + } + s.mu.Unlock() + + for _, cs := range clients { + if c := cs.activeClient.Load(); c != nil { + discoRecv, otherRecv, discoSent, otherSent := c.packetsRecvDisco.Swap(0), c.packetsRecvOther.Swap(0), c.packetsSentDisco.Swap(0), c.packetsSentOther.Swap(0) + c.dropsMu.Lock() + c.discoRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindDisco, float64(discoRecv), false) + c.otherRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindOther, float64(otherRecv), false) + c.discoSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindDisco, float64(discoSent), true) + c.otherSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindOther, float64(otherSent), true) + c.discoRecvDropsByReason.reset() + c.otherRecvDropsByReason.reset() + c.discoSendDropsByReason.reset() + c.otherSendDropsByReason.reset() + c.dropsMu.Unlock() + } + } + + // clients does retain clientSets, but these are fairly lightweight since they just point at actual clients + clients = clients[:0] + } + } +} diff --git a/derp/prometheus_adapter.go b/derp/prometheus_adapter.go new file mode 100644 index 000000000..0ab4537ce --- /dev/null +++ b/derp/prometheus_adapter.go @@ -0,0 +1,31 @@ +package derp + +import ( + "io" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" +) + +var ( + expFormat = expfmt.NewFormat(expfmt.TypeTextPlain) +) + +// collectorVar implements expvar.Var and metrics.PrometheusWriter +type collectorVar struct { + prometheus.Collector +} + +func (cw collectorVar) String() string { + return `"CollectorVar"` +} + +func (cw collectorVar) WritePrometheus(w io.Writer, name string) { + reg := prometheus.NewRegistry() + _ = reg.Register(cw) + mfs, _ := reg.Gather() + enc := expfmt.NewEncoder(w, expFormat) + for _, mf := range mfs { + _ = enc.Encode(mf) + } +}