From 7dfa26778e7ca36a34e7d50c0f80fb60f6f54540 Mon Sep 17 00:00:00 2001 From: Alex Chan Date: Tue, 30 Sep 2025 09:02:56 +0100 Subject: [PATCH] derp/derphttp: de-flake DERP HTTP clients tests with memnet and synctest Using memnet and synctest removes flakiness caused by real networking and subtle timing differences. Additionally, remove the `t.Logf` call inside the server's shutdown goroutine that was causing a false positive data race detection. The race detector is flagging a double write during this `t.Logf` call. This is a common pattern, noted in golang/go#40343 and elsehwere in this file, where using `t.Logf` after a test has finished can interact poorly with the test runner. This is a long-standing issue which became more common after rewriting this test to use memnet and synctest. Fixed #17355 Signed-off-by: Alex Chan --- derp/derphttp/derphttp_test.go | 304 +++++++++++++++++---------------- 1 file changed, 154 insertions(+), 150 deletions(-) diff --git a/derp/derphttp/derphttp_test.go b/derp/derphttp/derphttp_test.go index 36c11f4fc..76681d498 100644 --- a/derp/derphttp/derphttp_test.go +++ b/derp/derphttp/derphttp_test.go @@ -8,6 +8,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "flag" "fmt" "maps" @@ -18,11 +19,13 @@ import ( "strings" "sync" "testing" + "testing/synctest" "time" "tailscale.com/derp" "tailscale.com/derp/derphttp" "tailscale.com/derp/derpserver" + "tailscale.com/net/memnet" "tailscale.com/net/netmon" "tailscale.com/net/netx" "tailscale.com/tailcfg" @@ -224,24 +227,21 @@ func TestPing(t *testing.T) { const testMeshKey = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" -func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpserver.Server) { +func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpserver.Server, ln *memnet.Listener) { s = derpserver.New(k, t.Logf) httpsrv := &http.Server{ TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), Handler: derpserver.Handler(s), } - ln, err := net.Listen("tcp4", "localhost:0") - if err != nil { - t.Fatal(err) - } + ln = memnet.Listen("localhost:0") + serverURL = "http://" + ln.Addr().String() s.SetMeshKey(testMeshKey) go func() { if err := httpsrv.Serve(ln); err != nil { - if err == http.ErrServerClosed { - t.Logf("server closed") + if errors.Is(err, net.ErrClosed) { return } panic(err) @@ -250,7 +250,7 @@ func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpse return } -func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string) (c *derphttp.Client) { +func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string, ln *memnet.Listener) (c *derphttp.Client) { c, err := derphttp.NewClient(watcherPrivateKey, serverToWatchURL, t.Logf, netmon.NewStatic()) if err != nil { t.Fatal(err) @@ -260,6 +260,7 @@ func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToW t.Fatal(err) } c.MeshKey = k + c.SetURLDialer(ln.Dial) return } @@ -267,170 +268,171 @@ func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToW // updates after a different thread breaks and reconnects the connection, while // the watcher is waiting on recv(). func TestBreakWatcherConnRecv(t *testing.T) { - // TODO(bradfitz): use synctest + memnet instead + synctest.Test(t, func(t *testing.T) { + // Set the wait time before a retry after connection failure to be much lower. + // This needs to be early in the test, for defer to run right at the end after + // the DERP client has finished. + tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) - // Set the wait time before a retry after connection failure to be much lower. - // This needs to be early in the test, for defer to run right at the end after - // the DERP client has finished. - tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) + var wg sync.WaitGroup + // Make the watcher server + serverPrivateKey1 := key.NewNode() + _, s1, ln1 := newTestServer(t, serverPrivateKey1) + defer s1.Close() + defer ln1.Close() - var wg sync.WaitGroup - // Make the watcher server - serverPrivateKey1 := key.NewNode() - _, s1 := newTestServer(t, serverPrivateKey1) - defer s1.Close() + // Make the watched server + serverPrivateKey2 := key.NewNode() + serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2) + defer s2.Close() + defer ln2.Close() - // Make the watched server - serverPrivateKey2 := key.NewNode() - serverURL2, s2 := newTestServer(t, serverPrivateKey2) - defer s2.Close() + // Make the watcher (but it is not connected yet) + watcher := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2) + defer watcher.Close() - // Make the watcher (but it is not connected yet) - watcher := newWatcherClient(t, serverPrivateKey1, serverURL2) - defer watcher.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + watcherChan := make(chan int, 1) + defer close(watcherChan) + errChan := make(chan error, 1) - watcherChan := make(chan int, 1) - defer close(watcherChan) - errChan := make(chan error, 1) + // Start the watcher thread (which connects to the watched server) + wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 + go func() { + defer wg.Done() + var peers int + add := func(m derp.PeerPresentMessage) { + t.Logf("add: %v", m.Key.ShortString()) + peers++ + // Signal that the watcher has run + watcherChan <- peers + } + remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } + notifyErr := func(err error) { + select { + case errChan <- err: + case <-ctx.Done(): + } + } - // Start the watcher thread (which connects to the watched server) - wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 - go func() { - defer wg.Done() - var peers int - add := func(m derp.PeerPresentMessage) { - t.Logf("add: %v", m.Key.ShortString()) - peers++ - // Signal that the watcher has run - watcherChan <- peers - } - remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } - notifyErr := func(err error) { + watcher.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyErr) + }() + + synctest.Wait() + + // Wait for the watcher to run, then break the connection and check if it + // reconnected and received peer updates. + for range 10 { select { - case errChan <- err: - case <-ctx.Done(): + case peers := <-watcherChan: + if peers != 1 { + t.Fatalf("wrong number of peers added during watcher connection: have %d, want 1", peers) + } + case err := <-errChan: + if err.Error() != "derp.Recv: EOF" { + t.Fatalf("expected notifyError connection error to be EOF, got %v", err) + } } + + synctest.Wait() + + watcher.BreakConnection(watcher) + // re-establish connection by sending a packet + watcher.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) } - - watcher.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyErr) - }() - - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - - // Wait for the watcher to run, then break the connection and check if it - // reconnected and received peer updates. - for range 10 { - select { - case peers := <-watcherChan: - if peers != 1 { - t.Fatalf("wrong number of peers added during watcher connection: have %d, want 1", peers) - } - case err := <-errChan: - if !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatalf("expected notifyError connection error to contain 'use of closed network connection', got %v", err) - } - case <-timer.C: - t.Fatalf("watcher did not process the peer update") - } - timer.Reset(5 * time.Second) - watcher.BreakConnection(watcher) - // re-establish connection by sending a packet - watcher.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) - } - cancel() // Cancel the context to stop the watcher loop. - wg.Wait() + cancel() // Cancel the context to stop the watcher loop. + wg.Wait() + }) } // Test that a watcher connection successfully reconnects and processes peer // updates after a different thread breaks and reconnects the connection, while // the watcher is not waiting on recv(). func TestBreakWatcherConn(t *testing.T) { - // TODO(bradfitz): use synctest + memnet instead + synctest.Test(t, func(t *testing.T) { + // Set the wait time before a retry after connection failure to be much lower. + // This needs to be early in the test, for defer to run right at the end after + // the DERP client has finished. + tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) - // Set the wait time before a retry after connection failure to be much lower. - // This needs to be early in the test, for defer to run right at the end after - // the DERP client has finished. - tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) + var wg sync.WaitGroup + // Make the watcher server + serverPrivateKey1 := key.NewNode() + _, s1, ln1 := newTestServer(t, serverPrivateKey1) + defer s1.Close() + defer ln1.Close() - var wg sync.WaitGroup - // Make the watcher server - serverPrivateKey1 := key.NewNode() - _, s1 := newTestServer(t, serverPrivateKey1) - defer s1.Close() + // Make the watched server + serverPrivateKey2 := key.NewNode() + serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2) + defer s2.Close() + defer ln2.Close() - // Make the watched server - serverPrivateKey2 := key.NewNode() - serverURL2, s2 := newTestServer(t, serverPrivateKey2) - defer s2.Close() + // Make the watcher (but it is not connected yet) + watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2) + defer watcher1.Close() - // Make the watcher (but it is not connected yet) - watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2) - defer watcher1.Close() + ctx, cancel := context.WithCancel(context.Background()) - ctx, cancel := context.WithCancel(context.Background()) + watcherChan := make(chan int, 1) + breakerChan := make(chan bool, 1) + errorChan := make(chan error, 1) - watcherChan := make(chan int, 1) - breakerChan := make(chan bool, 1) - errorChan := make(chan error, 1) + // Start the watcher thread (which connects to the watched server) + wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 + go func() { + defer wg.Done() + var peers int + add := func(m derp.PeerPresentMessage) { + t.Logf("add: %v", m.Key.ShortString()) + peers++ + // Signal that the watcher has run + watcherChan <- peers + select { + case <-ctx.Done(): + return + // Wait for breaker to run + case <-breakerChan: + } + } + remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } + notifyError := func(err error) { + errorChan <- err + } - // Start the watcher thread (which connects to the watched server) - wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 - go func() { - defer wg.Done() - var peers int - add := func(m derp.PeerPresentMessage) { - t.Logf("add: %v", m.Key.ShortString()) - peers++ - // Signal that the watcher has run - watcherChan <- peers + watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyError) + }() + + synctest.Wait() + + // Wait for the watcher to run, then break the connection and check if it + // reconnected and received peer updates. + for range 10 { select { - case <-ctx.Done(): - return - // Wait for breaker to run - case <-breakerChan: + case peers := <-watcherChan: + if peers != 1 { + t.Fatalf("wrong number of peers added during watcher connection have %d, want 1", peers) + } + case err := <-errorChan: + if !errors.Is(err, net.ErrClosed) { + t.Fatalf("expected notifyError connection error to fail with ErrClosed, got %v", err) + } } + + synctest.Wait() + + watcher1.BreakConnection(watcher1) + // re-establish connection by sending a packet + watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) + // signal that the breaker is done + breakerChan <- true } - remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } - notifyError := func(err error) { - errorChan <- err - } - - watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyError) - }() - - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - - // Wait for the watcher to run, then break the connection and check if it - // reconnected and received peer updates. - for range 10 { - select { - case peers := <-watcherChan: - if peers != 1 { - t.Fatalf("wrong number of peers added during watcher connection have %d, want 1", peers) - } - case err := <-errorChan: - if !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatalf("expected notifyError connection error to contain 'use of closed network connection', got %v", err) - } - case <-timer.C: - t.Fatalf("watcher did not process the peer update") - } - watcher1.BreakConnection(watcher1) - // re-establish connection by sending a packet - watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) - // signal that the breaker is done - breakerChan <- true - - timer.Reset(5 * time.Second) - } - watcher1.Close() - cancel() - wg.Wait() + watcher1.Close() + cancel() + wg.Wait() + }) } func noopAdd(derp.PeerPresentMessage) {} @@ -444,12 +446,13 @@ func TestRunWatchConnectionLoopServeConnect(t *testing.T) { defer cancel() priv := key.NewNode() - serverURL, s := newTestServer(t, priv) + serverURL, s, ln := newTestServer(t, priv) defer s.Close() + defer ln.Close() pub := priv.Public() - watcher := newWatcherClient(t, priv, serverURL) + watcher := newWatcherClient(t, priv, serverURL, ln) defer watcher.Close() // Test connecting to ourselves, and that we get hung up on. @@ -518,13 +521,14 @@ func TestNotifyError(t *testing.T) { defer cancel() priv := key.NewNode() - serverURL, s := newTestServer(t, priv) + serverURL, s, ln := newTestServer(t, priv) defer s.Close() + defer ln.Close() pub := priv.Public() // Test early error notification when c.connect fails. - watcher := newWatcherClient(t, priv, serverURL) + watcher := newWatcherClient(t, priv, serverURL, ln) watcher.SetURLDialer(netx.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) { t.Helper() return nil, fmt.Errorf("test error: %s", addr)