From c34b350efa2d14ca43565d47695430205a3af692 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 21 Mar 2020 20:43:50 -0700 Subject: [PATCH] derp: remove the mutex around and closing of send channel Makes it less complicated. Signed-off-by: Brad Fitzpatrick --- derp/derp_server.go | 62 +++++++++++++++------------------------------ 1 file changed, 21 insertions(+), 41 deletions(-) diff --git a/derp/derp_server.go b/derp/derp_server.go index c74ed3843..08df9b724 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -252,15 +252,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error } func (c *sclient) run() error { - defer func() { - // Atomically close+remove send queue, so racing writers don't - // send to closed channel. - c.mu.Lock() - close(c.sendQueue) - c.sendQueue = nil - c.mu.Unlock() - }() - go c.sender() for { @@ -320,16 +311,6 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return nil } - dst.mu.RLock() - defer dst.mu.RUnlock() - if dst.sendQueue == nil { - s.packetsDropped.Add(1) - s.packetsDroppedGone.Add(1) - if debug { - c.logf("dropping packet for shutdown client %x", dstKey) - } - } - msg := sendMsg{ bs: contents, } @@ -340,6 +321,16 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { // the queue is full, try to drop from queue head to prioritize // fresher packets. for attempt := 0; attempt < 3; attempt++ { + select { + case <-dst.done: + s.packetsDropped.Add(1) + s.packetsDroppedGone.Add(1) + if debug { + c.logf("dropping packet for shutdown client %x", dstKey) + } + return nil + default: + } select { case dst.sendQueue <- msg: return nil @@ -480,6 +471,7 @@ type sclient struct { logf logger.Logf done <-chan struct{} // closed when connection closes remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() + sendQueue chan sendMsg // messages (packets) queued to this client; never closed // Owned by run, not thread-safe. br *bufio.Reader @@ -488,9 +480,6 @@ type sclient struct { // Owned by sender, not thread-safe. bw *bufio.Writer - - mu sync.RWMutex // guards access to sendQueue for shutdown. - sendQueue chan sendMsg // messages (packets) queued to this client } // sendMsg is a request to write a frame to an sclient (usually a data packet). @@ -540,27 +529,18 @@ func (c *sclient) sender() { } func (c *sclient) sendLoop() error { - c.mu.RLock() - queue := c.sendQueue - c.mu.RUnlock() - - if queue == nil { - // Uncommon race, sclient shut down before this loop ever - // started. Nothing to do here, move along. - return nil - } - defer func() { // Drain the send queue to count dropped packets for { - _, ok := <-queue - if !ok { - break - } - c.s.packetsDropped.Add(1) - c.s.packetsDroppedGone.Add(1) - if debug { - c.logf("dropping packet for shutdown %x", c.key) + select { + case <-c.sendQueue: + c.s.packetsDropped.Add(1) + c.s.packetsDroppedGone.Add(1) + if debug { + c.logf("dropping packet for shutdown %x", c.key) + } + default: + return } } }() @@ -578,7 +558,7 @@ func (c *sclient) sendLoop() error { case <-c.done: return nil - case msg, ok := <-queue: + case msg, ok := <-c.sendQueue: if !ok { return nil }