derp/derphttp: add watch reconnection tests from #9719

Co-authored-by: Val <valerie@tailscale.com>
Signed-off-by: Anton Tolchanov <anton@tailscale.com>
This commit is contained in:
Anton Tolchanov 2023-10-25 21:41:24 +01:00 committed by Anton Tolchanov
parent 3d7fb6c21d
commit 3114a1c88d
2 changed files with 193 additions and 1 deletions

View File

@ -9,6 +9,7 @@
"crypto/tls" "crypto/tls"
"net" "net"
"net/http" "net/http"
"net/netip"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -206,3 +207,193 @@ func TestPing(t *testing.T) {
t.Fatalf("Ping: %v", err) t.Fatalf("Ping: %v", err)
} }
} }
func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derp.Server) {
s = derp.NewServer(k, t.Logf)
httpsrv := &http.Server{
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
Handler: Handler(s),
}
ln, err := net.Listen("tcp4", "localhost:0")
if err != nil {
t.Fatal(err)
}
serverURL = "http://" + ln.Addr().String()
s.SetMeshKey("1234")
go func() {
if err := httpsrv.Serve(ln); err != nil {
if err == http.ErrServerClosed {
t.Logf("server closed")
return
}
panic(err)
}
}()
return
}
func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string) (c *Client) {
c, err := NewClient(watcherPrivateKey, serverToWatchURL, t.Logf)
if err != nil {
t.Fatal(err)
}
c.MeshKey = "1234"
return
}
// breakConnection breaks the connection, which should trigger a reconnect.
func (c *Client) breakConnection(brokenClient *derp.Client) {
c.mu.Lock()
defer c.mu.Unlock()
if c.client != brokenClient {
return
}
if c.netConn != nil {
c.netConn.Close()
c.netConn = nil
}
c.client = nil
}
// Test that a watcher connection successfully reconnects and processes peer
// updates after a different thread breaks and reconnects the connection, while
// the watcher is waiting on recv().
func TestBreakWatcherConnRecv(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()
// Make the watcher server
serverPrivateKey1 := key.NewNode()
_, s1 := newTestServer(t, serverPrivateKey1)
defer s1.Close()
// Make the watched server
serverPrivateKey2 := key.NewNode()
serverURL2, s2 := newTestServer(t, serverPrivateKey2)
defer s2.Close()
// Make the watcher (but it is not connected yet)
watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2)
defer watcher1.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watcherChan := make(chan int, 1)
// Set the wait time after a connection fails to much lower
origRetryInterval := retryInterval
retryInterval = 50 * time.Millisecond
defer func() { retryInterval = origRetryInterval }()
// Start the watcher thread (which connects to the watched server)
wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
go func() {
defer wg.Done()
var peers int
add := func(k key.NodePublic, _ netip.AddrPort) {
t.Logf("add: %v", k.ShortString())
peers++
// Signal that the watcher has run
watcherChan <- peers
}
remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- }
watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove)
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
// Wait for the watcher to run, then break the connection and check if it
// reconnected and received peer updates.
for i := 0; i < 10; i++ {
select {
case peers := <-watcherChan:
if peers != 1 {
t.Fatal("wrong number of peers added during watcher connection")
}
case <-timer.C:
t.Fatalf("watcher did not process the peer update")
}
watcher1.breakConnection(watcher1.client)
// re-establish connection by sending a packet
watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
timer.Reset(5 * time.Second)
}
}
// Test that a watcher connection successfully reconnects and processes peer
// updates after a different thread breaks and reconnects the connection, while
// the watcher is not waiting on recv().
func TestBreakWatcherConn(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()
// Make the watcher server
serverPrivateKey1 := key.NewNode()
_, s1 := newTestServer(t, serverPrivateKey1)
defer s1.Close()
// Make the watched server
serverPrivateKey2 := key.NewNode()
serverURL2, s2 := newTestServer(t, serverPrivateKey2)
defer s2.Close()
// Make the watcher (but it is not connected yet)
watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2)
defer watcher1.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watcherChan := make(chan int, 1)
breakerChan := make(chan bool, 1)
// Set the wait time after a connection fails to much lower
origRetryInterval := retryInterval
retryInterval = 50 * time.Millisecond
defer func() { retryInterval = origRetryInterval }()
// Start the watcher thread (which connects to the watched server)
wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
go func() {
defer wg.Done()
var peers int
add := func(k key.NodePublic, _ netip.AddrPort) {
t.Logf("add: %v", k.ShortString())
peers++
// Signal that the watcher has run
watcherChan <- peers
// Wait for breaker to run
<-breakerChan
}
remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- }
watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove)
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
// Wait for the watcher to run, then break the connection and check if it
// reconnected and received peer updates.
for i := 0; i < 10; i++ {
select {
case peers := <-watcherChan:
if peers != 1 {
t.Fatal("wrong number of peers added during watcher connection")
}
case <-timer.C:
t.Fatalf("watcher did not process the peer update")
}
watcher1.breakConnection(watcher1.client)
// re-establish connection by sending a packet
watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
// signal that the breaker is done
breakerChan <- true
timer.Reset(5 * time.Second)
}
}

View File

@ -14,6 +14,8 @@
"tailscale.com/types/logger" "tailscale.com/types/logger"
) )
var retryInterval = 5 * time.Second
// RunWatchConnectionLoop loops until ctx is done, sending // RunWatchConnectionLoop loops until ctx is done, sending
// WatchConnectionChanges and subscribing to connection changes. // WatchConnectionChanges and subscribing to connection changes.
// //
@ -42,7 +44,6 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
infoLogf = logger.Discard infoLogf = logger.Discard
} }
logf := c.logf logf := c.logf
const retryInterval = 5 * time.Second
const statusInterval = 10 * time.Second const statusInterval = 10 * time.Second
var ( var (
mu sync.Mutex mu sync.Mutex