mirror of
https://github.com/tailscale/tailscale.git
synced 2025-05-29 10:48:31 +00:00
derp: wait for send goroutine to finish before returning from run
I saw a test flake due to the sender goroutine logging (ultimately to t.Logf) after the server was closed. This makes sure the all goroutines are cleaned up before Server.Close returns.
This commit is contained in:
parent
521ad7b0fc
commit
8c4cef60f8
@ -24,6 +24,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/crypto/nacl/box"
|
"golang.org/x/crypto/nacl/box"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
"tailscale.com/metrics"
|
"tailscale.com/metrics"
|
||||||
"tailscale.com/types/key"
|
"tailscale.com/types/key"
|
||||||
"tailscale.com/types/logger"
|
"tailscale.com/types/logger"
|
||||||
@ -247,16 +248,35 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
|
|||||||
}
|
}
|
||||||
defer s.unregisterClient(c)
|
defer s.unregisterClient(c)
|
||||||
|
|
||||||
return c.run()
|
return c.run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sclient) run() error {
|
// run serves the client until there's an error.
|
||||||
go c.sender()
|
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
|
||||||
|
func (c *sclient) run(ctx context.Context) error {
|
||||||
|
// Launch sender, but don't return from run until sender goroutine is done.
|
||||||
|
var grp errgroup.Group
|
||||||
|
sendCtx, cancelSender := context.WithCancel(ctx)
|
||||||
|
grp.Go(func() error { return c.sendLoop(sendCtx) })
|
||||||
|
defer func() {
|
||||||
|
cancelSender()
|
||||||
|
if err := grp.Wait(); err != nil && !c.s.isClosed() {
|
||||||
|
c.logf("sender failed: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ft, fl, err := readFrameHeader(c.br)
|
ft, fl, err := readFrameHeader(c.br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("client %x: readFrameHeader: %v", c.key, err)
|
if errors.Is(err, io.EOF) {
|
||||||
|
c.logf("read EOF")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if c.s.isClosed() {
|
||||||
|
c.logf("closing; server closed")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
|
||||||
}
|
}
|
||||||
switch ft {
|
switch ft {
|
||||||
case frameNotePreferred:
|
case frameNotePreferred:
|
||||||
@ -518,17 +538,12 @@ func (c *sclient) setPreferred(v bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sclient) sender() {
|
func (c *sclient) sendLoop(ctx context.Context) error {
|
||||||
// If the sender shuts down unilaterally due to an error, close so
|
|
||||||
// that the receive loop unblocks and cleans up the rest.
|
|
||||||
defer c.nc.Close()
|
|
||||||
if err := c.sendLoop(); err != nil {
|
|
||||||
c.logf("sender failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sclient) sendLoop() error {
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
// If the sender shuts down unilaterally due to an error, close so
|
||||||
|
// that the receive loop unblocks and cleans up the rest.
|
||||||
|
c.nc.Close()
|
||||||
|
|
||||||
// Drain the send queue to count dropped packets
|
// Drain the send queue to count dropped packets
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -560,7 +575,7 @@ func (c *sclient) sendLoop() error {
|
|||||||
// First, a non-blocking select (with a default) that
|
// First, a non-blocking select (with a default) that
|
||||||
// does as many non-flushing writes as possible.
|
// does as many non-flushing writes as possible.
|
||||||
select {
|
select {
|
||||||
case <-c.done:
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
case msg := <-c.sendQueue:
|
case msg := <-c.sendQueue:
|
||||||
werr = c.sendPacket(msg.src, msg.bs)
|
werr = c.sendPacket(msg.src, msg.bs)
|
||||||
@ -578,7 +593,7 @@ func (c *sclient) sendLoop() error {
|
|||||||
|
|
||||||
// Then a blocking select with same:
|
// Then a blocking select with same:
|
||||||
select {
|
select {
|
||||||
case <-c.done:
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
case msg := <-c.sendQueue:
|
case msg := <-c.sendQueue:
|
||||||
werr = c.sendPacket(msg.src, msg.bs)
|
werr = c.sendPacket(msg.src, msg.bs)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user