diff --git a/derp/derp_server.go b/derp/derp_server.go index 08fd280a9..ad75dd6d0 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -170,6 +170,8 @@ type Server struct { meshUpdateBatchSize *metrics.Histogram meshUpdateLoopCount *metrics.Histogram bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + nonDiscoSendQueueDepths *metrics.Histogram + discoSendQueueDepths *metrics.Histogram // verifyClientsLocalTailscaled only accepts client connections to the DERP // server if the clientKey is a known peer in the network, as specified by a @@ -364,25 +366,27 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { runtime.ReadMemStats(&ms) s := &Server{ - debug: envknob.Bool("DERP_DEBUG_LOGS"), - privateKey: privateKey, - publicKey: privateKey.Public(), - logf: logf, - limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100), - packetsRecvByKind: metrics.LabelMap{Label: "kind"}, - clients: map[key.NodePublic]*clientSet{}, - clientsMesh: map[key.NodePublic]PacketForwarder{}, - netConns: map[Conn]chan struct{}{}, - memSys0: ms.Sys, - watchers: set.Set[*sclient]{}, - peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{}, - avgQueueDuration: new(uint64), - tcpRtt: metrics.LabelMap{Label: "le"}, - meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}), - meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}), - bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}), - keyOfAddr: map[netip.AddrPort]key.NodePublic{}, - clock: tstime.StdClock{}, + debug: envknob.Bool("DERP_DEBUG_LOGS"), + privateKey: privateKey, + publicKey: privateKey.Public(), + logf: logf, + limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100), + packetsRecvByKind: metrics.LabelMap{Label: "kind"}, + clients: map[key.NodePublic]*clientSet{}, + clientsMesh: map[key.NodePublic]PacketForwarder{}, + netConns: map[Conn]chan struct{}{}, + memSys0: ms.Sys, + watchers: set.Set[*sclient]{}, + peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{}, + avgQueueDuration: new(uint64), + tcpRtt: metrics.LabelMap{Label: "le"}, + meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}), + meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}), + bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}), + nonDiscoSendQueueDepths: metrics.NewHistogram([]float64{0, 1, 2, 4, 8, 16, 32}), + discoSendQueueDepths: metrics.NewHistogram([]float64{0, 1, 2, 4, 8, 16, 32}), + keyOfAddr: map[netip.AddrPort]key.NodePublic{}, + clock: tstime.StdClock{}, } s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get(string(packetKindDisco)) @@ -391,6 +395,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { genPacketsDroppedCounters() s.perClientSendQueueDepth = getPerClientSendQueueDepth() + + go s.monitorQueueDepths() return s } @@ -2149,6 +2155,8 @@ func (s *Server) ExpVar() expvar.Var { m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize) m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount) m.Set("counter_buffered_write_frames", s.bufferedWriteFrames) + m.Set("counter_non_disco_sendqueue_depth", s.nonDiscoSendQueueDepths) + m.Set("counter_disco_sendqueue_depth", s.discoSendQueueDepths) var expvarVersion expvar.String expvarVersion.Set(version.Long()) m.Set("version", &expvarVersion) @@ -2341,3 +2349,36 @@ func (w *lazyBufioWriter) Flush() error { return err } + +// monitorQueueDepths maintains histograms of send queue depths for disco and +// non-disco traffic. It observes queue depths for all active clients every 10 +// seconds. +func (s *Server) monitorQueueDepths() { + t := time.NewTicker(10 * time.Second) + var nonDiscoDepths []int + var discoDepths []int + + for { + select { + case <-t.C: + nonDiscoDepths = nonDiscoDepths[:0] + discoDepths = nonDiscoDepths[:0] + s.mu.Lock() + for _, cs := range s.clients { + c := cs.activeClient.Load() + if c != nil { + nonDiscoDepths = append(nonDiscoDepths, len(c.sendQueue)) + discoDepths = append(discoDepths, len(c.discoSendQueue)) + } + } + s.mu.Unlock() + + for _, depth := range nonDiscoDepths { + s.nonDiscoSendQueueDepths.Observe(float64(depth)) + } + for _, depth := range discoDepths { + s.discoSendQueueDepths.Observe(float64(depth)) + } + } + } +}