attempt 1

Signed-off-by: Percy Wegmann <percy@tailscale.com>
This commit is contained in:
Percy Wegmann 2025-01-17 19:36:01 -06:00
parent c79b736a85
commit 19ded2b977
No known key found for this signature in database
GPG Key ID: 29D8CDEB4C13D48B

View File

@ -170,6 +170,8 @@ type Server struct {
meshUpdateBatchSize *metrics.Histogram
meshUpdateLoopCount *metrics.Histogram
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
nonDiscoSendQueueDepths *metrics.Histogram
discoSendQueueDepths *metrics.Histogram
// verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a
@ -364,25 +366,27 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
runtime.ReadMemStats(&ms)
s := &Server{
debug: envknob.Bool("DERP_DEBUG_LOGS"),
privateKey: privateKey,
publicKey: privateKey.Public(),
logf: logf,
limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100),
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
clients: map[key.NodePublic]*clientSet{},
clientsMesh: map[key.NodePublic]PacketForwarder{},
netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys,
watchers: set.Set[*sclient]{},
peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{},
avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"},
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
clock: tstime.StdClock{},
debug: envknob.Bool("DERP_DEBUG_LOGS"),
privateKey: privateKey,
publicKey: privateKey.Public(),
logf: logf,
limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100),
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
clients: map[key.NodePublic]*clientSet{},
clientsMesh: map[key.NodePublic]PacketForwarder{},
netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys,
watchers: set.Set[*sclient]{},
peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{},
avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"},
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
nonDiscoSendQueueDepths: metrics.NewHistogram([]float64{0, 1, 2, 4, 8, 16, 32}),
discoSendQueueDepths: metrics.NewHistogram([]float64{0, 1, 2, 4, 8, 16, 32}),
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
clock: tstime.StdClock{},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get(string(packetKindDisco))
@ -391,6 +395,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
genPacketsDroppedCounters()
s.perClientSendQueueDepth = getPerClientSendQueueDepth()
go s.monitorQueueDepths()
return s
}
@ -2149,6 +2155,8 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
m.Set("counter_buffered_write_frames", s.bufferedWriteFrames)
m.Set("counter_non_disco_sendqueue_depth", s.nonDiscoSendQueueDepths)
m.Set("counter_disco_sendqueue_depth", s.discoSendQueueDepths)
var expvarVersion expvar.String
expvarVersion.Set(version.Long())
m.Set("version", &expvarVersion)
@ -2341,3 +2349,36 @@ func (w *lazyBufioWriter) Flush() error {
return err
}
// monitorQueueDepths maintains histograms of send queue depths for disco and
// non-disco traffic. It observes queue depths for all active clients every 10
// seconds.
func (s *Server) monitorQueueDepths() {
t := time.NewTicker(10 * time.Second)
var nonDiscoDepths []int
var discoDepths []int
for {
select {
case <-t.C:
nonDiscoDepths = nonDiscoDepths[:0]
discoDepths = nonDiscoDepths[:0]
s.mu.Lock()
for _, cs := range s.clients {
c := cs.activeClient.Load()
if c != nil {
nonDiscoDepths = append(nonDiscoDepths, len(c.sendQueue))
discoDepths = append(discoDepths, len(c.discoSendQueue))
}
}
s.mu.Unlock()
for _, depth := range nonDiscoDepths {
s.nonDiscoSendQueueDepths.Observe(float64(depth))
}
for _, depth := range discoDepths {
s.discoSendQueueDepths.Observe(float64(depth))
}
}
}
}