mirror of
https://github.com/tailscale/tailscale.git
synced 2025-02-18 10:58:39 +00:00
![Brad Fitzpatrick](/assets/img/avatar_default.png)
The derphttp client automatically reconnects upon failure. RunWatchConnectionLoop called derphttp.Client.WatchConnectionChanges once, but that wrapper method called the underlying derp.Client.WatchConnectionChanges exactly once on derphttp.Client's currently active connection. If there's a failure, we need to re-subscribe upon all reconnections. This removes the derphttp.Client.WatchConnectionChanges method, which was basically impossible to use correctly, and changes it to be a boolean field on derphttp.Client alongside MeshKey and IsProber. Then it moves the call to the underlying derp.Client.WatchConnectionChanges to derphttp's client connection code, so it's resubscribed on any reconnect. Some paranoia is then added to make sure people hold the API right, not calling derphttp.Client.RunWatchConnectionLoop on an already-started Client without having set the bool to true. (But still auto-setting it to true if that's the first method that's been called on that derphttp.Client, as is commonly the case, and prevents existing code from breaking) Fixes tailscale/corp#9916 Supercedes tailscale/tailscale#9719 Co-authored-by: Val <valerie@tailscale.com> Co-authored-by: Irbe Krumina <irbe@tailscale.com> Co-authored-by: Anton Tolchanov <anton@tailscale.com> Signed-off-by: Brad Fitzpatrick <brad@danga.com>
155 lines
3.9 KiB
Go
155 lines
3.9 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package derphttp
|
|
|
|
import (
|
|
"context"
|
|
"net/netip"
|
|
"sync"
|
|
"time"
|
|
|
|
"tailscale.com/derp"
|
|
"tailscale.com/types/key"
|
|
"tailscale.com/types/logger"
|
|
)
|
|
|
|
// RunWatchConnectionLoop loops until ctx is done, sending
|
|
// WatchConnectionChanges and subscribing to connection changes.
|
|
//
|
|
// If the server's public key is ignoreServerKey, RunWatchConnectionLoop
|
|
// returns.
|
|
//
|
|
// Otherwise, the add and remove funcs are called as clients come & go.
|
|
//
|
|
// infoLogf, if non-nil, is the logger to write periodic status updates about
|
|
// how many peers are on the server. Error log output is set to the c's logger,
|
|
// regardless of infoLogf's value.
|
|
//
|
|
// To force RunWatchConnectionLoop to return quickly, its ctx needs to be
|
|
// closed, and c itself needs to be closed.
|
|
//
|
|
// It is a fatal error to call this on an already-started Client withoutq having
|
|
// initialized Client.WatchConnectionChanges to true.
|
|
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(key.NodePublic, netip.AddrPort), remove func(key.NodePublic)) {
|
|
if !c.WatchConnectionChanges {
|
|
if c.isStarted() {
|
|
panic("invalid use of RunWatchConnectionLoop on already-started Client without setting Client.RunWatchConnectionLoop")
|
|
}
|
|
c.WatchConnectionChanges = true
|
|
}
|
|
if infoLogf == nil {
|
|
infoLogf = logger.Discard
|
|
}
|
|
logf := c.logf
|
|
const retryInterval = 5 * time.Second
|
|
const statusInterval = 10 * time.Second
|
|
var (
|
|
mu sync.Mutex
|
|
present = map[key.NodePublic]bool{}
|
|
loggedConnected = false
|
|
)
|
|
clear := func() {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if len(present) == 0 {
|
|
return
|
|
}
|
|
logf("reconnected; clearing %d forwarding mappings", len(present))
|
|
for k := range present {
|
|
remove(k)
|
|
}
|
|
present = map[key.NodePublic]bool{}
|
|
}
|
|
lastConnGen := 0
|
|
lastStatus := c.clock.Now()
|
|
logConnectedLocked := func() {
|
|
if loggedConnected {
|
|
return
|
|
}
|
|
infoLogf("connected; %d peers", len(present))
|
|
loggedConnected = true
|
|
}
|
|
|
|
const logConnectedDelay = 200 * time.Millisecond
|
|
timer := c.clock.AfterFunc(2*time.Second, func() {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
logConnectedLocked()
|
|
})
|
|
defer timer.Stop()
|
|
|
|
updatePeer := func(k key.NodePublic, ipPort netip.AddrPort, isPresent bool) {
|
|
if isPresent {
|
|
add(k, ipPort)
|
|
} else {
|
|
remove(k)
|
|
}
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if isPresent {
|
|
present[k] = true
|
|
if !loggedConnected {
|
|
timer.Reset(logConnectedDelay)
|
|
}
|
|
} else {
|
|
// If we got a peerGone message, that means the initial connection's
|
|
// flood of peerPresent messages is done, so we can log already:
|
|
logConnectedLocked()
|
|
delete(present, k)
|
|
}
|
|
}
|
|
|
|
sleep := func(d time.Duration) {
|
|
t, tChannel := c.clock.NewTimer(d)
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Stop()
|
|
case <-tChannel:
|
|
}
|
|
}
|
|
|
|
for ctx.Err() == nil {
|
|
if c.ServerPublicKey() == ignoreServerKey {
|
|
logf("detected self-connect; ignoring host")
|
|
return
|
|
}
|
|
for {
|
|
m, connGen, err := c.RecvDetail()
|
|
if err != nil {
|
|
clear()
|
|
logf("Recv: %v", err)
|
|
sleep(retryInterval)
|
|
break
|
|
}
|
|
if connGen != lastConnGen {
|
|
lastConnGen = connGen
|
|
clear()
|
|
}
|
|
switch m := m.(type) {
|
|
case derp.PeerPresentMessage:
|
|
updatePeer(m.Key, m.IPPort, true)
|
|
case derp.PeerGoneMessage:
|
|
switch m.Reason {
|
|
case derp.PeerGoneReasonDisconnected:
|
|
// Normal case, log nothing
|
|
case derp.PeerGoneReasonNotHere:
|
|
logf("Recv: peer %s not connected to %s",
|
|
key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString())
|
|
default:
|
|
logf("Recv: peer %s not at server %s for unknown reason %v",
|
|
key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason)
|
|
}
|
|
updatePeer(key.NodePublic(m.Peer), netip.AddrPort{}, false)
|
|
default:
|
|
continue
|
|
}
|
|
if now := c.clock.Now(); now.Sub(lastStatus) > statusInterval {
|
|
lastStatus = now
|
|
infoLogf("%d peers", len(present))
|
|
}
|
|
}
|
|
}
|
|
}
|