diff --git a/derp/derp_server.go b/derp/derp_server.go index 702024b69..005374e0c 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -141,6 +141,8 @@ type Server struct { removePktForwardOther expvar.Int avgQueueDuration *uint64 // In milliseconds; accessed atomically tcpRtt metrics.LabelMap // histogram + meshUpdateBatchSize *metrics.Histogram + meshUpdateLoopCount *metrics.Histogram // verifyClientsLocalTailscaled only accepts client connections to the DERP // server if the clientKey is a known peer in the network, as specified by a @@ -323,6 +325,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { sentTo: map[key.NodePublic]map[key.NodePublic]int64{}, avgQueueDuration: new(uint64), tcpRtt: metrics.LabelMap{Label: "le"}, + meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}), + meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}), keyOfAddr: map[netip.AddrPort]key.NodePublic{}, clock: tstime.StdClock{}, } @@ -758,7 +762,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem } if c.canMesh { - c.meshUpdate = make(chan struct{}) + c.meshUpdate = make(chan struct{}, 1) // must be buffered; >1 is fine but wasteful } if clientInfo != nil { c.info = *clientInfo @@ -1143,13 +1147,18 @@ func (c *sclient) requestPeerGoneWrite(peer key.NodePublic, reason PeerGoneReaso } } +// requestMeshUpdate notes that a c's peerStateChange has been appended to and +// should now be written. +// +// It does not block. If a meshUpdate is already pending for this client, it +// does nothing. func (c *sclient) requestMeshUpdate() { if !c.canMesh { panic("unexpected requestMeshUpdate") } select { case c.meshUpdate <- struct{}{}: - case <-c.done: + default: } } @@ -1671,62 +1680,47 @@ func (c *sclient) sendPeerPresent(peer key.NodePublic, ipPort netip.AddrPort, fl return err } -// sendMeshUpdates drains as many mesh peerStateChange entries as -// possible into the write buffer WITHOUT flushing or otherwise -// blocking (as it holds c.s.mu while working). If it can't drain them -// all, it schedules itself to be called again in the future. +// sendMeshUpdates drains all mesh peerStateChange entries into the write buffer +// without flushing. func (c *sclient) sendMeshUpdates() error { - c.s.mu.Lock() - defer c.s.mu.Unlock() + var lastBatch []peerConnState // memory to best effort reuse - // allow all happened-before mesh update request goroutines to complete, if - // we don't finish the task we'll queue another below. -drainUpdates: - for { - select { - case <-c.meshUpdate: - default: - break drainUpdates + // takeAll returns c.peerStateChange and empties it. + takeAll := func() []peerConnState { + c.s.mu.Lock() + defer c.s.mu.Unlock() + if len(c.peerStateChange) == 0 { + return nil } + batch := c.peerStateChange + if cap(lastBatch) > 16 { + lastBatch = nil + } + c.peerStateChange = lastBatch[:0] + return batch } - writes := 0 - for _, pcs := range c.peerStateChange { - avail := c.bw.Available() - var err error - if pcs.present { - if avail <= frameHeaderLen+peerPresentFrameLen { - break + for loops := 0; ; loops++ { + batch := takeAll() + if len(batch) == 0 { + c.s.meshUpdateLoopCount.Observe(float64(loops)) + return nil + } + c.s.meshUpdateBatchSize.Observe(float64(len(batch))) + + for _, pcs := range batch { + var err error + if pcs.present { + err = c.sendPeerPresent(pcs.peer, pcs.ipPort, pcs.flags) + } else { + err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected) } - err = c.sendPeerPresent(pcs.peer, pcs.ipPort, pcs.flags) - } else { - if avail <= frameHeaderLen+peerGoneFrameLen { - break + if err != nil { + return err } - err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected) } - if err != nil { - // Shouldn't happen, though, as we're writing - // into available buffer space, not the - // network. - return err - } - writes++ + lastBatch = batch } - - remain := copy(c.peerStateChange, c.peerStateChange[writes:]) - c.peerStateChange = c.peerStateChange[:remain] - - // Did we manage to write them all into the bufio buffer without flushing? - if len(c.peerStateChange) == 0 { - if cap(c.peerStateChange) > 16 { - c.peerStateChange = nil - } - } else { - // Didn't finish in the buffer space provided; schedule a future run. - go c.requestMeshUpdate() - } - return nil } // sendPacket writes contents to the client in a RecvPacket frame. If @@ -1955,6 +1949,8 @@ func (s *Server) ExpVar() expvar.Var { return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) })) m.Set("counter_tcp_rtt", &s.tcpRtt) + m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize) + m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount) var expvarVersion expvar.String expvarVersion.Set(version.Long()) m.Set("version", &expvarVersion)