mirror of
https://github.com/tailscale/tailscale.git
synced 2024-12-12 03:04:40 +00:00
8b47322acc
This commit implements probing of UDP path lifetime on the tail end of an active direct connection. Probing configuration has two parts - Cliffs, which are various timeout cliffs of interest, and CycleCanStartEvery, which limits how often a probing cycle can start, per-endpoint. Initially a statically defined default configuration will be used. The default configuration has cliffs of 10s, 30s, and 60s, with a CycleCanStartEvery of 24h. Probing results are communicated via clientmetric counters. Probing is off by default, and can be enabled via control knob. Probing is purely informational and does not yet drive any magicsock behaviors. Updates #540 Signed-off-by: Jordan Whited <jordan@tailscale.com>
945 lines
26 KiB
Go
945 lines
26 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package magicsock
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"math/rand"
|
|
"net"
|
|
"net/netip"
|
|
"reflect"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tailscale/wireguard-go/conn"
|
|
"tailscale.com/derp"
|
|
"tailscale.com/derp/derphttp"
|
|
"tailscale.com/health"
|
|
"tailscale.com/logtail/backoff"
|
|
"tailscale.com/net/dnscache"
|
|
"tailscale.com/net/tsaddr"
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/tstime/mono"
|
|
"tailscale.com/types/key"
|
|
"tailscale.com/types/logger"
|
|
"tailscale.com/util/mak"
|
|
"tailscale.com/util/sysresources"
|
|
)
|
|
|
|
// useDerpRoute reports whether magicsock should enable the DERP
|
|
// return path optimization (Issue 150).
|
|
//
|
|
// By default it's enabled, unless an environment variable
|
|
// or control says to disable it.
|
|
func (c *Conn) useDerpRoute() bool {
|
|
if b, ok := debugUseDerpRoute().Get(); ok {
|
|
return b
|
|
}
|
|
return c.controlKnobs == nil || !c.controlKnobs.DisableDRPO.Load()
|
|
}
|
|
|
|
// derpRoute is a route entry for a public key, saying that a certain
|
|
// peer should be available at DERP node derpID, as long as the
|
|
// current connection for that derpID is dc. (but dc should not be
|
|
// used to write directly; it's owned by the read/write loops)
|
|
type derpRoute struct {
|
|
derpID int
|
|
dc *derphttp.Client // don't use directly; see comment above
|
|
}
|
|
|
|
// removeDerpPeerRoute removes a DERP route entry previously added by addDerpPeerRoute.
|
|
func (c *Conn) removeDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
r2 := derpRoute{derpID, dc}
|
|
if r, ok := c.derpRoute[peer]; ok && r == r2 {
|
|
delete(c.derpRoute, peer)
|
|
}
|
|
}
|
|
|
|
// addDerpPeerRoute adds a DERP route entry, noting that peer was seen
|
|
// on DERP node derpID, at least on the connection identified by dc.
|
|
// See issue 150 for details.
|
|
func (c *Conn) addDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
mak.Set(&c.derpRoute, peer, derpRoute{derpID, dc})
|
|
}
|
|
|
|
// activeDerp contains fields for an active DERP connection.
|
|
type activeDerp struct {
|
|
c *derphttp.Client
|
|
cancel context.CancelFunc
|
|
writeCh chan<- derpWriteRequest
|
|
// lastWrite is the time of the last request for its write
|
|
// channel (currently even if there was no write).
|
|
// It is always non-nil and initialized to a non-zero Time.
|
|
lastWrite *time.Time
|
|
createTime time.Time
|
|
}
|
|
|
|
var processStartUnixNano = time.Now().UnixNano()
|
|
|
|
// pickDERPFallback returns a non-zero but deterministic DERP node to
|
|
// connect to. This is only used if netcheck couldn't find the
|
|
// nearest one (for instance, if UDP is blocked and thus STUN latency
|
|
// checks aren't working).
|
|
//
|
|
// c.mu must NOT be held.
|
|
func (c *Conn) pickDERPFallback() int {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if !c.wantDerpLocked() {
|
|
return 0
|
|
}
|
|
ids := c.derpMap.RegionIDs()
|
|
if len(ids) == 0 {
|
|
// No DERP regions in non-nil map.
|
|
return 0
|
|
}
|
|
|
|
// TODO: figure out which DERP region most of our peers are using,
|
|
// and use that region as our fallback.
|
|
//
|
|
// If we already had selected something in the past and it has any
|
|
// peers, we want to stay on it. If there are no peers at all,
|
|
// stay on whatever DERP we previously picked. If we need to pick
|
|
// one and have no peer info, pick a region randomly.
|
|
//
|
|
// We used to do the above for legacy clients, but never updated
|
|
// it for disco.
|
|
|
|
if c.myDerp != 0 {
|
|
return c.myDerp
|
|
}
|
|
|
|
h := fnv.New64()
|
|
fmt.Fprintf(h, "%p/%d", c, processStartUnixNano) // arbitrary
|
|
return ids[rand.New(rand.NewSource(int64(h.Sum64()))).Intn(len(ids))]
|
|
}
|
|
|
|
func (c *Conn) derpRegionCodeLocked(regionID int) string {
|
|
if c.derpMap == nil {
|
|
return ""
|
|
}
|
|
if dr, ok := c.derpMap.Regions[regionID]; ok {
|
|
return dr.RegionCode
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// c.mu must NOT be held.
|
|
func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if !c.wantDerpLocked() {
|
|
c.myDerp = 0
|
|
health.SetMagicSockDERPHome(0, c.homeless)
|
|
return false
|
|
}
|
|
if c.homeless {
|
|
c.myDerp = 0
|
|
health.SetMagicSockDERPHome(0, c.homeless)
|
|
return false
|
|
}
|
|
if derpNum == c.myDerp {
|
|
// No change.
|
|
return true
|
|
}
|
|
if c.myDerp != 0 && derpNum != 0 {
|
|
metricDERPHomeChange.Add(1)
|
|
}
|
|
c.myDerp = derpNum
|
|
health.SetMagicSockDERPHome(derpNum, c.homeless)
|
|
|
|
if c.privateKey.IsZero() {
|
|
// No private key yet, so DERP connections won't come up anyway.
|
|
// Return early rather than ultimately log a couple lines of noise.
|
|
return true
|
|
}
|
|
|
|
// On change, notify all currently connected DERP servers and
|
|
// start connecting to our home DERP if we are not already.
|
|
dr := c.derpMap.Regions[derpNum]
|
|
if dr == nil {
|
|
c.logf("[unexpected] magicsock: derpMap.Regions[%v] is nil", derpNum)
|
|
} else {
|
|
c.logf("magicsock: home is now derp-%v (%v)", derpNum, c.derpMap.Regions[derpNum].RegionCode)
|
|
}
|
|
for i, ad := range c.activeDerp {
|
|
go ad.c.NotePreferred(i == c.myDerp)
|
|
}
|
|
c.goDerpConnect(derpNum)
|
|
return true
|
|
}
|
|
|
|
// startDerpHomeConnectLocked starts connecting to our DERP home, if any.
|
|
//
|
|
// c.mu must be held.
|
|
func (c *Conn) startDerpHomeConnectLocked() {
|
|
c.goDerpConnect(c.myDerp)
|
|
}
|
|
|
|
// goDerpConnect starts a goroutine to start connecting to the given
|
|
// DERP node.
|
|
//
|
|
// c.mu may be held, but does not need to be.
|
|
func (c *Conn) goDerpConnect(node int) {
|
|
if node == 0 {
|
|
return
|
|
}
|
|
go c.derpWriteChanOfAddr(netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(node)), key.NodePublic{})
|
|
}
|
|
|
|
var (
|
|
bufferedDerpWrites int
|
|
bufferedDerpWritesOnce sync.Once
|
|
)
|
|
|
|
// bufferedDerpWritesBeforeDrop returns how many packets writes can be queued
|
|
// up the DERP client to write on the wire before we start dropping.
|
|
func bufferedDerpWritesBeforeDrop() int {
|
|
// For mobile devices, always return the previous minimum value of 32;
|
|
// we can do this outside the sync.Once to avoid that overhead.
|
|
if runtime.GOOS == "ios" || runtime.GOOS == "android" {
|
|
return 32
|
|
}
|
|
|
|
bufferedDerpWritesOnce.Do(func() {
|
|
// Some rough sizing: for the previous fixed value of 32, the
|
|
// total consumed memory can be:
|
|
// = numDerpRegions * messages/region * sizeof(message)
|
|
//
|
|
// For sake of this calculation, assume 100 DERP regions; at
|
|
// time of writing (2023-04-03), we have 24.
|
|
//
|
|
// A reasonable upper bound for the worst-case average size of
|
|
// a message is a *disco.CallMeMaybe message with 16 endpoints;
|
|
// since sizeof(netip.AddrPort) = 32, that's 512 bytes. Thus:
|
|
// = 100 * 32 * 512
|
|
// = 1638400 (1.6MiB)
|
|
//
|
|
// On a reasonably-small node with 4GiB of memory that's
|
|
// connected to each region and handling a lot of load, 1.6MiB
|
|
// is about 0.04% of the total system memory.
|
|
//
|
|
// For sake of this calculation, then, let's double that memory
|
|
// usage to 0.08% and scale based on total system memory.
|
|
//
|
|
// For a 16GiB Linux box, this should buffer just over 256
|
|
// messages.
|
|
systemMemory := sysresources.TotalMemory()
|
|
memoryUsable := float64(systemMemory) * 0.0008
|
|
|
|
const (
|
|
theoreticalDERPRegions = 100
|
|
messageMaximumSizeBytes = 512
|
|
)
|
|
bufferedDerpWrites = int(memoryUsable / (theoreticalDERPRegions * messageMaximumSizeBytes))
|
|
|
|
// Never drop below the previous minimum value.
|
|
if bufferedDerpWrites < 32 {
|
|
bufferedDerpWrites = 32
|
|
}
|
|
})
|
|
return bufferedDerpWrites
|
|
}
|
|
|
|
// derpWriteChanOfAddr returns a DERP client for fake UDP addresses that
|
|
// represent DERP servers, creating them as necessary. For real UDP
|
|
// addresses, it returns nil.
|
|
//
|
|
// If peer is non-zero, it can be used to find an active reverse
|
|
// path, without using addr.
|
|
func (c *Conn) derpWriteChanOfAddr(addr netip.AddrPort, peer key.NodePublic) chan<- derpWriteRequest {
|
|
if addr.Addr() != tailcfg.DerpMagicIPAddr {
|
|
return nil
|
|
}
|
|
regionID := int(addr.Port())
|
|
|
|
if c.networkDown() {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if !c.wantDerpLocked() || c.closed {
|
|
return nil
|
|
}
|
|
if c.derpMap == nil || c.derpMap.Regions[regionID] == nil {
|
|
return nil
|
|
}
|
|
if c.privateKey.IsZero() {
|
|
c.logf("magicsock: DERP lookup of %v with no private key; ignoring", addr)
|
|
return nil
|
|
}
|
|
|
|
// See if we have a connection open to that DERP node ID
|
|
// first. If so, might as well use it. (It's a little
|
|
// arbitrary whether we use this one vs. the reverse route
|
|
// below when we have both.)
|
|
ad, ok := c.activeDerp[regionID]
|
|
if ok {
|
|
*ad.lastWrite = time.Now()
|
|
c.setPeerLastDerpLocked(peer, regionID, regionID)
|
|
return ad.writeCh
|
|
}
|
|
|
|
// If we don't have an open connection to the peer's home DERP
|
|
// node, see if we have an open connection to a DERP node
|
|
// where we'd heard from that peer already. For instance,
|
|
// perhaps peer's home is Frankfurt, but they dialed our home DERP
|
|
// node in SF to reach us, so we can reply to them using our
|
|
// SF connection rather than dialing Frankfurt. (Issue 150)
|
|
if !peer.IsZero() && c.useDerpRoute() {
|
|
if r, ok := c.derpRoute[peer]; ok {
|
|
if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc {
|
|
c.setPeerLastDerpLocked(peer, r.derpID, regionID)
|
|
*ad.lastWrite = time.Now()
|
|
return ad.writeCh
|
|
}
|
|
}
|
|
}
|
|
|
|
why := "home-keep-alive"
|
|
if !peer.IsZero() {
|
|
why = peer.ShortString()
|
|
}
|
|
c.logf("magicsock: adding connection to derp-%v for %v", regionID, why)
|
|
|
|
firstDerp := false
|
|
if c.activeDerp == nil {
|
|
firstDerp = true
|
|
c.activeDerp = make(map[int]activeDerp)
|
|
c.prevDerp = make(map[int]*syncs.WaitGroupChan)
|
|
}
|
|
|
|
// Note that derphttp.NewRegionClient does not dial the server
|
|
// (it doesn't block) so it is safe to do under the c.mu lock.
|
|
dc := derphttp.NewRegionClient(c.privateKey, c.logf, c.netMon, func() *tailcfg.DERPRegion {
|
|
// Warning: it is not legal to acquire
|
|
// magicsock.Conn.mu from this callback.
|
|
// It's run from derphttp.Client.connect (via Send, etc)
|
|
// and the lock ordering rules are that magicsock.Conn.mu
|
|
// must be acquired before derphttp.Client.mu.
|
|
// See https://github.com/tailscale/tailscale/issues/3726
|
|
if c.connCtx.Err() != nil {
|
|
// We're closing anyway; return nil to stop dialing.
|
|
return nil
|
|
}
|
|
derpMap := c.derpMapAtomic.Load()
|
|
if derpMap == nil {
|
|
return nil
|
|
}
|
|
return derpMap.Regions[regionID]
|
|
})
|
|
|
|
dc.SetCanAckPings(true)
|
|
dc.NotePreferred(c.myDerp == regionID)
|
|
dc.SetAddressFamilySelector(derpAddrFamSelector{c})
|
|
dc.DNSCache = dnscache.Get()
|
|
|
|
ctx, cancel := context.WithCancel(c.connCtx)
|
|
ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop())
|
|
|
|
ad.c = dc
|
|
ad.writeCh = ch
|
|
ad.cancel = cancel
|
|
ad.lastWrite = new(time.Time)
|
|
*ad.lastWrite = time.Now()
|
|
ad.createTime = time.Now()
|
|
c.activeDerp[regionID] = ad
|
|
metricNumDERPConns.Set(int64(len(c.activeDerp)))
|
|
c.logActiveDerpLocked()
|
|
c.setPeerLastDerpLocked(peer, regionID, regionID)
|
|
c.scheduleCleanStaleDerpLocked()
|
|
|
|
// Build a startGate for the derp reader+writer
|
|
// goroutines, so they don't start running until any
|
|
// previous generation is closed.
|
|
startGate := syncs.ClosedChan()
|
|
if prev := c.prevDerp[regionID]; prev != nil {
|
|
startGate = prev.DoneChan()
|
|
}
|
|
// And register a WaitGroup(Chan) for this generation.
|
|
wg := syncs.NewWaitGroupChan()
|
|
wg.Add(2)
|
|
c.prevDerp[regionID] = wg
|
|
|
|
if firstDerp {
|
|
startGate = c.derpStarted
|
|
go func() {
|
|
dc.Connect(ctx)
|
|
close(c.derpStarted)
|
|
c.muCond.Broadcast()
|
|
}()
|
|
}
|
|
|
|
go c.runDerpReader(ctx, addr, dc, wg, startGate)
|
|
go c.runDerpWriter(ctx, dc, ch, wg, startGate)
|
|
go c.derpActiveFunc()
|
|
|
|
return ad.writeCh
|
|
}
|
|
|
|
// setPeerLastDerpLocked notes that peer is now being written to via
|
|
// the provided DERP regionID, and that the peer advertises a DERP
|
|
// home region ID of homeID.
|
|
//
|
|
// If there's any change, it logs.
|
|
//
|
|
// c.mu must be held.
|
|
func (c *Conn) setPeerLastDerpLocked(peer key.NodePublic, regionID, homeID int) {
|
|
if peer.IsZero() {
|
|
return
|
|
}
|
|
old := c.peerLastDerp[peer]
|
|
if old == regionID {
|
|
return
|
|
}
|
|
c.peerLastDerp[peer] = regionID
|
|
|
|
var newDesc string
|
|
switch {
|
|
case regionID == homeID && regionID == c.myDerp:
|
|
newDesc = "shared home"
|
|
case regionID == homeID:
|
|
newDesc = "their home"
|
|
case regionID == c.myDerp:
|
|
newDesc = "our home"
|
|
case regionID != homeID:
|
|
newDesc = "alt"
|
|
}
|
|
if old == 0 {
|
|
c.logf("[v1] magicsock: derp route for %s set to derp-%d (%s)", peer.ShortString(), regionID, newDesc)
|
|
} else {
|
|
c.logf("[v1] magicsock: derp route for %s changed from derp-%d => derp-%d (%s)", peer.ShortString(), old, regionID, newDesc)
|
|
}
|
|
}
|
|
|
|
// derpReadResult is the type sent by Conn.runDerpReader to connBind.receiveDERP
|
|
// when a derp.ReceivedPacket is available.
|
|
//
|
|
// Notably, it doesn't include the derp.ReceivedPacket because we
|
|
// don't want to give the receiver access to the aliased []byte. To
|
|
// get at the packet contents they need to call copyBuf to copy it
|
|
// out, which also releases the buffer.
|
|
type derpReadResult struct {
|
|
regionID int
|
|
n int // length of data received
|
|
src key.NodePublic
|
|
// copyBuf is called to copy the data to dst. It returns how
|
|
// much data was copied, which will be n if dst is large
|
|
// enough. copyBuf can only be called once.
|
|
// If copyBuf is nil, that's a signal from the sender to ignore
|
|
// this message.
|
|
copyBuf func(dst []byte) int
|
|
}
|
|
|
|
// runDerpReader runs in a goroutine for the life of a DERP
|
|
// connection, handling received packets.
|
|
func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netip.AddrPort, dc *derphttp.Client, wg *syncs.WaitGroupChan, startGate <-chan struct{}) {
|
|
defer wg.Decr()
|
|
defer dc.Close()
|
|
|
|
select {
|
|
case <-startGate:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
didCopy := make(chan struct{}, 1)
|
|
regionID := int(derpFakeAddr.Port())
|
|
res := derpReadResult{regionID: regionID}
|
|
var pkt derp.ReceivedPacket
|
|
res.copyBuf = func(dst []byte) int {
|
|
n := copy(dst, pkt.Data)
|
|
didCopy <- struct{}{}
|
|
return n
|
|
}
|
|
|
|
defer health.SetDERPRegionConnectedState(regionID, false)
|
|
defer health.SetDERPRegionHealth(regionID, "")
|
|
|
|
// peerPresent is the set of senders we know are present on this
|
|
// connection, based on messages we've received from the server.
|
|
peerPresent := map[key.NodePublic]bool{}
|
|
bo := backoff.NewBackoff(fmt.Sprintf("derp-%d", regionID), c.logf, 5*time.Second)
|
|
var lastPacketTime time.Time
|
|
var lastPacketSrc key.NodePublic
|
|
|
|
for {
|
|
msg, connGen, err := dc.RecvDetail()
|
|
if err != nil {
|
|
health.SetDERPRegionConnectedState(regionID, false)
|
|
// Forget that all these peers have routes.
|
|
for peer := range peerPresent {
|
|
delete(peerPresent, peer)
|
|
c.removeDerpPeerRoute(peer, regionID, dc)
|
|
}
|
|
if err == derphttp.ErrClientClosed {
|
|
return
|
|
}
|
|
if c.networkDown() {
|
|
c.logf("[v1] magicsock: derp.Recv(derp-%d): network down, closing", regionID)
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
c.logf("magicsock: [%p] derp.Recv(derp-%d): %v", dc, regionID, err)
|
|
|
|
// If our DERP connection broke, it might be because our network
|
|
// conditions changed. Start that check.
|
|
c.ReSTUN("derp-recv-error")
|
|
|
|
// Back off a bit before reconnecting.
|
|
bo.BackOff(ctx, err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
continue
|
|
}
|
|
bo.BackOff(ctx, nil) // reset
|
|
|
|
now := time.Now()
|
|
if lastPacketTime.IsZero() || now.Sub(lastPacketTime) > 5*time.Second {
|
|
health.NoteDERPRegionReceivedFrame(regionID)
|
|
lastPacketTime = now
|
|
}
|
|
|
|
switch m := msg.(type) {
|
|
case derp.ServerInfoMessage:
|
|
health.SetDERPRegionConnectedState(regionID, true)
|
|
health.SetDERPRegionHealth(regionID, "") // until declared otherwise
|
|
c.logf("magicsock: derp-%d connected; connGen=%v", regionID, connGen)
|
|
continue
|
|
case derp.ReceivedPacket:
|
|
pkt = m
|
|
res.n = len(m.Data)
|
|
res.src = m.Source
|
|
if logDerpVerbose() {
|
|
c.logf("magicsock: got derp-%v packet: %q", regionID, m.Data)
|
|
}
|
|
// If this is a new sender we hadn't seen before, remember it and
|
|
// register a route for this peer.
|
|
if res.src != lastPacketSrc { // avoid map lookup w/ high throughput single peer
|
|
lastPacketSrc = res.src
|
|
if _, ok := peerPresent[res.src]; !ok {
|
|
peerPresent[res.src] = true
|
|
c.addDerpPeerRoute(res.src, regionID, dc)
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case c.derpRecvCh <- res:
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-didCopy:
|
|
continue
|
|
}
|
|
case derp.PingMessage:
|
|
// Best effort reply to the ping.
|
|
pingData := [8]byte(m)
|
|
go func() {
|
|
if err := dc.SendPong(pingData); err != nil {
|
|
c.logf("magicsock: derp-%d SendPong error: %v", regionID, err)
|
|
}
|
|
}()
|
|
continue
|
|
case derp.HealthMessage:
|
|
health.SetDERPRegionHealth(regionID, m.Problem)
|
|
continue
|
|
case derp.PeerGoneMessage:
|
|
switch m.Reason {
|
|
case derp.PeerGoneReasonDisconnected:
|
|
// Do nothing.
|
|
case derp.PeerGoneReasonNotHere:
|
|
metricRecvDiscoDERPPeerNotHere.Add(1)
|
|
c.logf("[unexpected] magicsock: derp-%d does not know about peer %s, removing route",
|
|
regionID, key.NodePublic(m.Peer).ShortString())
|
|
default:
|
|
metricRecvDiscoDERPPeerGoneUnknown.Add(1)
|
|
c.logf("[unexpected] magicsock: derp-%d peer %s gone, reason %v, removing route",
|
|
regionID, key.NodePublic(m.Peer).ShortString(), m.Reason)
|
|
}
|
|
c.removeDerpPeerRoute(key.NodePublic(m.Peer), regionID, dc)
|
|
continue
|
|
default:
|
|
// Ignore.
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
type derpWriteRequest struct {
|
|
addr netip.AddrPort
|
|
pubKey key.NodePublic
|
|
b []byte // copied; ownership passed to receiver
|
|
}
|
|
|
|
// runDerpWriter runs in a goroutine for the life of a DERP
|
|
// connection, handling received packets.
|
|
func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) {
|
|
defer wg.Decr()
|
|
select {
|
|
case <-startGate:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case wr := <-ch:
|
|
err := dc.Send(wr.pubKey, wr.b)
|
|
if err != nil {
|
|
c.logf("magicsock: derp.Send(%v): %v", wr.addr, err)
|
|
metricSendDERPError.Add(1)
|
|
} else {
|
|
metricSendDERP.Add(1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) (int, error) {
|
|
health.ReceiveDERP.Enter()
|
|
defer health.ReceiveDERP.Exit()
|
|
|
|
for dm := range c.derpRecvCh {
|
|
if c.isClosed() {
|
|
break
|
|
}
|
|
n, ep := c.processDERPReadResult(dm, buffs[0])
|
|
if n == 0 {
|
|
// No data read occurred. Wait for another packet.
|
|
continue
|
|
}
|
|
metricRecvDataDERP.Add(1)
|
|
sizes[0] = n
|
|
eps[0] = ep
|
|
return 1, nil
|
|
}
|
|
return 0, net.ErrClosed
|
|
}
|
|
|
|
func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *endpoint) {
|
|
if dm.copyBuf == nil {
|
|
return 0, nil
|
|
}
|
|
var regionID int
|
|
n, regionID = dm.n, dm.regionID
|
|
ncopy := dm.copyBuf(b)
|
|
if ncopy != n {
|
|
err := fmt.Errorf("received DERP packet of length %d that's too big for WireGuard buf size %d", n, ncopy)
|
|
c.logf("magicsock: %v", err)
|
|
return 0, nil
|
|
}
|
|
|
|
ipp := netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(regionID))
|
|
if c.handleDiscoMessage(b[:n], ipp, dm.src, discoRXPathDERP) {
|
|
return 0, nil
|
|
}
|
|
|
|
var ok bool
|
|
c.mu.Lock()
|
|
ep, ok = c.peerMap.endpointForNodeKey(dm.src)
|
|
c.mu.Unlock()
|
|
if !ok {
|
|
// We don't know anything about this node key, nothing to
|
|
// record or process.
|
|
return 0, nil
|
|
}
|
|
|
|
ep.noteRecvActivity(ipp, mono.Now())
|
|
if stats := c.stats.Load(); stats != nil {
|
|
stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n)
|
|
}
|
|
return n, ep
|
|
}
|
|
|
|
// SetDERPMap controls which (if any) DERP servers are used.
|
|
// A nil value means to disable DERP; it's disabled by default.
|
|
func (c *Conn) SetDERPMap(dm *tailcfg.DERPMap) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
var derpAddr = debugUseDERPAddr()
|
|
if derpAddr != "" {
|
|
derpPort := 443
|
|
if debugUseDERPHTTP() {
|
|
// Match the port for -dev in derper.go
|
|
derpPort = 3340
|
|
}
|
|
dm = &tailcfg.DERPMap{
|
|
OmitDefaultRegions: true,
|
|
Regions: map[int]*tailcfg.DERPRegion{
|
|
999: {
|
|
RegionID: 999,
|
|
Nodes: []*tailcfg.DERPNode{{
|
|
Name: "999dev",
|
|
RegionID: 999,
|
|
HostName: derpAddr,
|
|
DERPPort: derpPort,
|
|
}},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
if reflect.DeepEqual(dm, c.derpMap) {
|
|
return
|
|
}
|
|
|
|
c.derpMapAtomic.Store(dm)
|
|
old := c.derpMap
|
|
c.derpMap = dm
|
|
if dm == nil {
|
|
c.closeAllDerpLocked("derp-disabled")
|
|
return
|
|
}
|
|
|
|
// Reconnect any DERP region that changed definitions.
|
|
if old != nil {
|
|
changes := false
|
|
for rid, oldDef := range old.Regions {
|
|
if reflect.DeepEqual(oldDef, dm.Regions[rid]) {
|
|
continue
|
|
}
|
|
changes = true
|
|
if rid == c.myDerp {
|
|
c.myDerp = 0
|
|
}
|
|
c.closeDerpLocked(rid, "derp-region-redefined")
|
|
}
|
|
if changes {
|
|
c.logActiveDerpLocked()
|
|
}
|
|
}
|
|
|
|
go c.ReSTUN("derp-map-update")
|
|
}
|
|
func (c *Conn) wantDerpLocked() bool { return c.derpMap != nil }
|
|
|
|
// c.mu must be held.
|
|
func (c *Conn) closeAllDerpLocked(why string) {
|
|
if len(c.activeDerp) == 0 {
|
|
return // without the useless log statement
|
|
}
|
|
for i := range c.activeDerp {
|
|
c.closeDerpLocked(i, why)
|
|
}
|
|
c.logActiveDerpLocked()
|
|
}
|
|
|
|
// DebugBreakDERPConns breaks all DERP connections for debug/testing reasons.
|
|
func (c *Conn) DebugBreakDERPConns() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if len(c.activeDerp) == 0 {
|
|
c.logf("magicsock: DebugBreakDERPConns: no active DERP connections")
|
|
return nil
|
|
}
|
|
c.closeAllDerpLocked("debug-break-derp")
|
|
c.startDerpHomeConnectLocked()
|
|
return nil
|
|
}
|
|
|
|
// maybeCloseDERPsOnRebind, in response to a rebind, closes all
|
|
// DERP connections that don't have a local address in okayLocalIPs
|
|
// and pings all those that do.
|
|
func (c *Conn) maybeCloseDERPsOnRebind(okayLocalIPs []netip.Prefix) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
for regionID, ad := range c.activeDerp {
|
|
la, err := ad.c.LocalAddr()
|
|
if err != nil {
|
|
c.closeOrReconnectDERPLocked(regionID, "rebind-no-localaddr")
|
|
continue
|
|
}
|
|
if !tsaddr.PrefixesContainsIP(okayLocalIPs, la.Addr()) {
|
|
c.closeOrReconnectDERPLocked(regionID, "rebind-default-route-change")
|
|
continue
|
|
}
|
|
regionID := regionID
|
|
dc := ad.c
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
if err := dc.Ping(ctx); err != nil {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.closeOrReconnectDERPLocked(regionID, "rebind-ping-fail")
|
|
return
|
|
}
|
|
c.logf("post-rebind ping of DERP region %d okay", regionID)
|
|
}()
|
|
}
|
|
c.logActiveDerpLocked()
|
|
}
|
|
|
|
// closeOrReconnectDERPLocked closes the DERP connection to the
|
|
// provided regionID and starts reconnecting it if it's our current
|
|
// home DERP.
|
|
//
|
|
// why is a reason for logging.
|
|
//
|
|
// c.mu must be held.
|
|
func (c *Conn) closeOrReconnectDERPLocked(regionID int, why string) {
|
|
c.closeDerpLocked(regionID, why)
|
|
if !c.privateKey.IsZero() && c.myDerp == regionID {
|
|
c.startDerpHomeConnectLocked()
|
|
}
|
|
}
|
|
|
|
// c.mu must be held.
|
|
// It is the responsibility of the caller to call logActiveDerpLocked after any set of closes.
|
|
func (c *Conn) closeDerpLocked(regionID int, why string) {
|
|
if ad, ok := c.activeDerp[regionID]; ok {
|
|
c.logf("magicsock: closing connection to derp-%v (%v), age %v", regionID, why, time.Since(ad.createTime).Round(time.Second))
|
|
go ad.c.Close()
|
|
ad.cancel()
|
|
delete(c.activeDerp, regionID)
|
|
metricNumDERPConns.Set(int64(len(c.activeDerp)))
|
|
}
|
|
}
|
|
|
|
// c.mu must be held.
|
|
func (c *Conn) logActiveDerpLocked() {
|
|
now := time.Now()
|
|
c.logf("magicsock: %v active derp conns%s", len(c.activeDerp), logger.ArgWriter(func(buf *bufio.Writer) {
|
|
if len(c.activeDerp) == 0 {
|
|
return
|
|
}
|
|
buf.WriteString(":")
|
|
c.foreachActiveDerpSortedLocked(func(node int, ad activeDerp) {
|
|
fmt.Fprintf(buf, " derp-%d=cr%v,wr%v", node, simpleDur(now.Sub(ad.createTime)), simpleDur(now.Sub(*ad.lastWrite)))
|
|
})
|
|
}))
|
|
}
|
|
|
|
// c.mu must be held.
|
|
func (c *Conn) foreachActiveDerpSortedLocked(fn func(regionID int, ad activeDerp)) {
|
|
if len(c.activeDerp) < 2 {
|
|
for id, ad := range c.activeDerp {
|
|
fn(id, ad)
|
|
}
|
|
return
|
|
}
|
|
ids := make([]int, 0, len(c.activeDerp))
|
|
for id := range c.activeDerp {
|
|
ids = append(ids, id)
|
|
}
|
|
sort.Ints(ids)
|
|
for _, id := range ids {
|
|
fn(id, c.activeDerp[id])
|
|
}
|
|
}
|
|
|
|
func (c *Conn) cleanStaleDerp() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.closed {
|
|
return
|
|
}
|
|
c.derpCleanupTimerArmed = false
|
|
|
|
tooOld := time.Now().Add(-derpInactiveCleanupTime)
|
|
dirty := false
|
|
someNonHomeOpen := false
|
|
for i, ad := range c.activeDerp {
|
|
if i == c.myDerp {
|
|
continue
|
|
}
|
|
if ad.lastWrite.Before(tooOld) {
|
|
c.closeDerpLocked(i, "idle")
|
|
dirty = true
|
|
} else {
|
|
someNonHomeOpen = true
|
|
}
|
|
}
|
|
if dirty {
|
|
c.logActiveDerpLocked()
|
|
}
|
|
if someNonHomeOpen {
|
|
c.scheduleCleanStaleDerpLocked()
|
|
}
|
|
}
|
|
|
|
func (c *Conn) scheduleCleanStaleDerpLocked() {
|
|
if c.derpCleanupTimerArmed {
|
|
// Already going to fire soon. Let the existing one
|
|
// fire lest it get infinitely delayed by repeated
|
|
// calls to scheduleCleanStaleDerpLocked.
|
|
return
|
|
}
|
|
c.derpCleanupTimerArmed = true
|
|
if c.derpCleanupTimer != nil {
|
|
c.derpCleanupTimer.Reset(derpCleanStaleInterval)
|
|
} else {
|
|
c.derpCleanupTimer = time.AfterFunc(derpCleanStaleInterval, c.cleanStaleDerp)
|
|
}
|
|
}
|
|
|
|
// DERPs reports the number of active DERP connections.
|
|
func (c *Conn) DERPs() int {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
return len(c.activeDerp)
|
|
}
|
|
|
|
func (c *Conn) derpRegionCodeOfIDLocked(regionID int) string {
|
|
if c.derpMap == nil {
|
|
return ""
|
|
}
|
|
if r, ok := c.derpMap.Regions[regionID]; ok {
|
|
return r.RegionCode
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// derpAddrFamSelector is the derphttp.AddressFamilySelector we pass
|
|
// to derphttp.Client.SetAddressFamilySelector.
|
|
//
|
|
// It provides the hint as to whether in an IPv4-vs-IPv6 race that
|
|
// IPv4 should be held back a bit to give IPv6 a better-than-50/50
|
|
// chance of winning. We only return true when we believe IPv6 will
|
|
// work anyway, so we don't artificially delay the connection speed.
|
|
type derpAddrFamSelector struct{ c *Conn }
|
|
|
|
func (s derpAddrFamSelector) PreferIPv6() bool {
|
|
if r := s.c.lastNetCheckReport.Load(); r != nil {
|
|
return r.IPv6
|
|
}
|
|
return false
|
|
}
|
|
|
|
const (
|
|
// derpInactiveCleanupTime is how long a non-home DERP connection
|
|
// needs to be idle (last written to) before we close it.
|
|
derpInactiveCleanupTime = 60 * time.Second
|
|
|
|
// derpCleanStaleInterval is how often cleanStaleDerp runs when there
|
|
// are potentially-stale DERP connections to close.
|
|
derpCleanStaleInterval = 15 * time.Second
|
|
)
|