derp/derphttp: add a context and infoLogger option to RunWatchConnectionLoop

This commit is contained in:
Brad Fitzpatrick 2021-02-12 10:58:43 -08:00
parent 1632f9fd6b
commit 741d654aa3
2 changed files with 30 additions and 9 deletions

View File

@ -5,6 +5,7 @@
package main
import (
"context"
"errors"
"fmt"
"log"
@ -40,6 +41,6 @@ func startMeshWithHost(s *derp.Server, host string) error {
c.MeshKey = s.MeshKey()
add := func(k key.Public) { s.AddPacketForwarder(k, c) }
remove := func(k key.Public) { s.RemovePacketForwarder(k, c) }
go c.RunWatchConnectionLoop(s.PublicKey(), add, remove)
go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove)
return nil
}

View File

@ -5,20 +5,32 @@
package derphttp
import (
"context"
"sync"
"time"
"tailscale.com/derp"
"tailscale.com/types/key"
"tailscale.com/types/logger"
)
// RunWatchConnectionLoop loops forever, sending WatchConnectionChanges and subscribing to
// RunWatchConnectionLoop loops until ctx is done, sending WatchConnectionChanges and subscribing to
// connection changes.
//
// If the server's public key is ignoreServerKey, RunWatchConnectionLoop returns.
//
// Otherwise, the add and remove funcs are called as clients come & go.
func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove func(key.Public)) {
//
// infoLogf, if non-nil, is the logger to write periodic status
// updates about how many peers are on the server. Error log output is
// set to the c's logger, regardless of infoLogf's value.
//
// To force RunWatchConnectionLoop to return quickly, its ctx needs to
// be closed, and c itself needs to be closed.
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.Public, infoLogf logger.Logf, add, remove func(key.Public)) {
if infoLogf == nil {
infoLogf = logger.Discard
}
logf := c.logf
const retryInterval = 5 * time.Second
const statusInterval = 10 * time.Second
@ -45,7 +57,7 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove
if loggedConnected {
return
}
logf("connected; %d peers", len(present))
infoLogf("connected; %d peers", len(present))
loggedConnected = true
}
@ -79,12 +91,21 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove
}
}
for {
sleep := func(d time.Duration) {
t := time.NewTimer(d)
select {
case <-ctx.Done():
t.Stop()
case <-t.C:
}
}
for ctx.Err() == nil {
err := c.WatchConnectionChanges()
if err != nil {
clear()
logf("WatchConnectionChanges: %v", err)
time.Sleep(retryInterval)
sleep(retryInterval)
continue
}
@ -97,7 +118,7 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove
if err != nil {
clear()
logf("Recv: %v", err)
time.Sleep(retryInterval)
sleep(retryInterval)
break
}
if connGen != lastConnGen {
@ -114,9 +135,8 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove
}
if now := time.Now(); now.Sub(lastStatus) > statusInterval {
lastStatus = now
logf("%d peers", len(present))
infoLogf("%d peers", len(present))
}
}
}
}