wgengine/magicsock: implement relayManager endpoint probing (#16029)

relayManager is responsible for disco ping/pong probing of relay
endpoints once a handshake is complete.

Future work will enable relayManager to set a relay endpoint as the best
UDP path on an endpoint if appropriate.

Updates tailscale/corp#27502

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited 2025-05-28 10:45:59 -07:00 committed by GitHub
parent 842df37803
commit ffc8ec289b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 215 additions and 126 deletions

View File

@ -1562,10 +1562,18 @@ func pktLenToPingSize(mtu tstun.WireMTU, is6 bool) int {
// It should be called with the Conn.mu held. // It should be called with the Conn.mu held.
// //
// It reports whether m.TxID corresponds to a ping that this endpoint sent. // It reports whether m.TxID corresponds to a ping that this endpoint sent.
func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip.AddrPort) (knownTxID bool) { func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip.AddrPort, vni virtualNetworkID) (knownTxID bool) {
de.mu.Lock() de.mu.Lock()
defer de.mu.Unlock() defer de.mu.Unlock()
if vni.isSet() {
// TODO(jwhited): check for matching [endpoint.bestAddr] once that data
// structure is VNI-aware and [relayManager] can mutate it. We do not
// need to reference any [endpointState] for Geneve-encapsulated disco,
// we store nothing about them there.
return false
}
isDerp := src.Addr() == tailcfg.DerpMagicIPAddr isDerp := src.Addr() == tailcfg.DerpMagicIPAddr
sp, ok := de.sentPing[m.TxID] sp, ok := de.sentPing[m.TxID]

View File

@ -1802,6 +1802,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke
return return
} }
var geneve packet.GeneveHeader var geneve packet.GeneveHeader
var vni virtualNetworkID
if isGeneveEncap { if isGeneveEncap {
err := geneve.Decode(msg) err := geneve.Decode(msg)
if err != nil { if err != nil {
@ -1810,6 +1811,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke
c.logf("[unexpected] geneve header decoding error: %v", err) c.logf("[unexpected] geneve header decoding error: %v", err)
return return
} }
vni.set(geneve.VNI)
msg = msg[packet.GeneveFixedHeaderLength:] msg = msg[packet.GeneveFixedHeaderLength:]
} }
// The control bit should only be set for relay handshake messages // The control bit should only be set for relay handshake messages
@ -1923,33 +1925,30 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke
c.logf("[unexpected] %T packets should not come from a relay server with Geneve control bit set", dm) c.logf("[unexpected] %T packets should not come from a relay server with Geneve control bit set", dm)
return return
} }
c.relayManager.handleBindUDPRelayEndpointChallenge(challenge, di, src, geneve.VNI) c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(challenge, di, src, geneve.VNI)
return return
} }
switch dm := dm.(type) { switch dm := dm.(type) {
case *disco.Ping: case *disco.Ping:
metricRecvDiscoPing.Add(1) metricRecvDiscoPing.Add(1)
if isGeneveEncap { c.handlePingLocked(dm, src, vni, di, derpNodeSrc)
// TODO(jwhited): handle Geneve-encapsulated disco ping.
return
}
c.handlePingLocked(dm, src, di, derpNodeSrc)
case *disco.Pong: case *disco.Pong:
metricRecvDiscoPong.Add(1) metricRecvDiscoPong.Add(1)
if isGeneveEncap {
// TODO(jwhited): handle Geneve-encapsulated disco pong.
return
}
// There might be multiple nodes for the sender's DiscoKey. // There might be multiple nodes for the sender's DiscoKey.
// Ask each to handle it, stopping once one reports that // Ask each to handle it, stopping once one reports that
// the Pong's TxID was theirs. // the Pong's TxID was theirs.
knownTxID := false
c.peerMap.forEachEndpointWithDiscoKey(sender, func(ep *endpoint) (keepGoing bool) { c.peerMap.forEachEndpointWithDiscoKey(sender, func(ep *endpoint) (keepGoing bool) {
if ep.handlePongConnLocked(dm, di, src) { if ep.handlePongConnLocked(dm, di, src, vni) {
knownTxID = true
return false return false
} }
return true return true
}) })
if !knownTxID && vni.isSet() {
c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(dm, di, src, vni.get())
}
case *disco.CallMeMaybe, *disco.CallMeMaybeVia: case *disco.CallMeMaybe, *disco.CallMeMaybeVia:
var via *disco.CallMeMaybeVia var via *disco.CallMeMaybeVia
isVia := false isVia := false
@ -2048,12 +2047,21 @@ func (c *Conn) unambiguousNodeKeyOfPingLocked(dm *disco.Ping, dk key.DiscoPublic
// di is the discoInfo of the source of the ping. // di is the discoInfo of the source of the ping.
// derpNodeSrc is non-zero if the ping arrived via DERP. // derpNodeSrc is non-zero if the ping arrived via DERP.
func (c *Conn) handlePingLocked(dm *disco.Ping, src netip.AddrPort, di *discoInfo, derpNodeSrc key.NodePublic) { func (c *Conn) handlePingLocked(dm *disco.Ping, src netip.AddrPort, vni virtualNetworkID, di *discoInfo, derpNodeSrc key.NodePublic) {
likelyHeartBeat := src == di.lastPingFrom && time.Since(di.lastPingTime) < 5*time.Second likelyHeartBeat := src == di.lastPingFrom && time.Since(di.lastPingTime) < 5*time.Second
di.lastPingFrom = src di.lastPingFrom = src
di.lastPingTime = time.Now() di.lastPingTime = time.Now()
isDerp := src.Addr() == tailcfg.DerpMagicIPAddr isDerp := src.Addr() == tailcfg.DerpMagicIPAddr
if vni.isSet() {
// TODO(jwhited): check for matching [endpoint.bestAddr] once that data
// structure is VNI-aware and [relayManager] can mutate it. We do not
// need to reference any [endpointState] for Geneve-encapsulated disco,
// we store nothing about them there.
c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(dm, di, src, vni.get())
return
}
// If we can figure out with certainty which node key this disco // If we can figure out with certainty which node key this disco
// message is for, eagerly update our IP:port<>node and disco<>node // message is for, eagerly update our IP:port<>node and disco<>node
// mappings to make p2p path discovery faster in simple // mappings to make p2p path discovery faster in simple

View File

@ -14,15 +14,16 @@ import (
"time" "time"
"tailscale.com/disco" "tailscale.com/disco"
"tailscale.com/net/stun"
udprelay "tailscale.com/net/udprelay/endpoint" udprelay "tailscale.com/net/udprelay/endpoint"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/util/httpm" "tailscale.com/util/httpm"
"tailscale.com/util/set" "tailscale.com/util/set"
) )
// relayManager manages allocation and handshaking of // relayManager manages allocation, handshaking, and initial probing (disco
// [tailscale.com/net/udprelay.Server] endpoints. The zero value is ready for // ping/pong) of [tailscale.com/net/udprelay.Server] endpoints. The zero value
// use. // is ready for use.
type relayManager struct { type relayManager struct {
initOnce sync.Once initOnce sync.Once
@ -33,16 +34,18 @@ type relayManager struct {
allocWorkByEndpoint map[*endpoint]*relayEndpointAllocWork allocWorkByEndpoint map[*endpoint]*relayEndpointAllocWork
handshakeWorkByEndpointByServerDisco map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork handshakeWorkByEndpointByServerDisco map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork
handshakeWorkByServerDiscoVNI map[serverDiscoVNI]*relayHandshakeWork handshakeWorkByServerDiscoVNI map[serverDiscoVNI]*relayHandshakeWork
handshakeWorkAwaitingPong map[*relayHandshakeWork]addrPortVNI
addrPortVNIToHandshakeWork map[addrPortVNI]*relayHandshakeWork
// =================================================================== // ===================================================================
// The following chan fields serve event inputs to a single goroutine, // The following chan fields serve event inputs to a single goroutine,
// runLoop(). // runLoop().
allocateHandshakeCh chan *endpoint allocateHandshakeCh chan *endpoint
allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent
handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent
cancelWorkCh chan *endpoint cancelWorkCh chan *endpoint
newServerEndpointCh chan newRelayServerEndpointEvent newServerEndpointCh chan newRelayServerEndpointEvent
rxChallengeCh chan relayHandshakeChallengeEvent rxHandshakeDiscoMsgCh chan relayHandshakeDiscoMsgEvent
discoInfoMu sync.Mutex // guards the following field discoInfoMu sync.Mutex // guards the following field
discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo
@ -66,16 +69,16 @@ type relayHandshakeWork struct {
ep *endpoint ep *endpoint
se udprelay.ServerEndpoint se udprelay.ServerEndpoint
// In order to not deadlock, runLoop() must select{} read doneCh when // handshakeServerEndpoint() always writes to doneCh (len 1) when it
// attempting to write into rxChallengeCh, and the handshake work goroutine // returns. It may end up writing the same event afterward to
// must close(doneCh) before attempting to write to // relayManager.handshakeWorkDoneCh if runLoop() can receive it. runLoop()
// relayManager.handshakeWorkDoneCh. // must select{} read on doneCh to prevent deadlock when attempting to write
rxChallengeCh chan relayHandshakeChallengeEvent // to rxDiscoMsgCh.
doneCh chan struct{} rxDiscoMsgCh chan relayHandshakeDiscoMsgEvent
doneCh chan relayEndpointHandshakeWorkDoneEvent
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
wg *sync.WaitGroup
} }
// newRelayServerEndpointEvent indicates a new [udprelay.ServerEndpoint] has // newRelayServerEndpointEvent indicates a new [udprelay.ServerEndpoint] has
@ -99,8 +102,9 @@ type relayEndpointAllocWorkDoneEvent struct {
// work for an [*endpoint] has completed. This structure is immutable once // work for an [*endpoint] has completed. This structure is immutable once
// initialized. // initialized.
type relayEndpointHandshakeWorkDoneEvent struct { type relayEndpointHandshakeWorkDoneEvent struct {
work *relayHandshakeWork work *relayHandshakeWork
answerSentTo netip.AddrPort // zero value if answer was not transmitted pongReceivedFrom netip.AddrPort // or zero value if handshake or ping/pong did not complete
latency time.Duration // only relevant if pongReceivedFrom.IsValid()
} }
// activeWorkRunLoop returns true if there is outstanding allocation or // activeWorkRunLoop returns true if there is outstanding allocation or
@ -150,8 +154,8 @@ func (r *relayManager) runLoop() {
if !r.activeWorkRunLoop() { if !r.activeWorkRunLoop() {
return return
} }
case challenge := <-r.rxChallengeCh: case discoMsgEvent := <-r.rxHandshakeDiscoMsgCh:
r.handleRxChallengeRunLoop(challenge) r.handleRxHandshakeDiscoMsgRunLoop(discoMsgEvent)
if !r.activeWorkRunLoop() { if !r.activeWorkRunLoop() {
return return
} }
@ -159,12 +163,12 @@ func (r *relayManager) runLoop() {
} }
} }
type relayHandshakeChallengeEvent struct { type relayHandshakeDiscoMsgEvent struct {
challenge [32]byte msg disco.Message
disco key.DiscoPublic disco key.DiscoPublic
from netip.AddrPort from netip.AddrPort
vni uint32 vni uint32
at time.Time at time.Time
} }
// relayEndpointAllocWork serves to track in-progress relay endpoint allocation // relayEndpointAllocWork serves to track in-progress relay endpoint allocation
@ -187,12 +191,14 @@ func (r *relayManager) init() {
r.allocWorkByEndpoint = make(map[*endpoint]*relayEndpointAllocWork) r.allocWorkByEndpoint = make(map[*endpoint]*relayEndpointAllocWork)
r.handshakeWorkByEndpointByServerDisco = make(map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork) r.handshakeWorkByEndpointByServerDisco = make(map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork)
r.handshakeWorkByServerDiscoVNI = make(map[serverDiscoVNI]*relayHandshakeWork) r.handshakeWorkByServerDiscoVNI = make(map[serverDiscoVNI]*relayHandshakeWork)
r.handshakeWorkAwaitingPong = make(map[*relayHandshakeWork]addrPortVNI)
r.addrPortVNIToHandshakeWork = make(map[addrPortVNI]*relayHandshakeWork)
r.allocateHandshakeCh = make(chan *endpoint) r.allocateHandshakeCh = make(chan *endpoint)
r.allocateWorkDoneCh = make(chan relayEndpointAllocWorkDoneEvent) r.allocateWorkDoneCh = make(chan relayEndpointAllocWorkDoneEvent)
r.handshakeWorkDoneCh = make(chan relayEndpointHandshakeWorkDoneEvent) r.handshakeWorkDoneCh = make(chan relayEndpointHandshakeWorkDoneEvent)
r.cancelWorkCh = make(chan *endpoint) r.cancelWorkCh = make(chan *endpoint)
r.newServerEndpointCh = make(chan newRelayServerEndpointEvent) r.newServerEndpointCh = make(chan newRelayServerEndpointEvent)
r.rxChallengeCh = make(chan relayHandshakeChallengeEvent) r.rxHandshakeDiscoMsgCh = make(chan relayHandshakeDiscoMsgEvent)
r.runLoopStoppedCh = make(chan struct{}, 1) r.runLoopStoppedCh = make(chan struct{}, 1)
go r.runLoop() go r.runLoop()
}) })
@ -270,8 +276,11 @@ func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, dm *disco.CallMeMaybeV
}) })
} }
func (r *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRelayEndpointChallenge, di *discoInfo, src netip.AddrPort, vni uint32) { // handleGeneveEncapDiscoMsgNotBestAddr handles reception of Geneve-encapsulated
relayManagerInputEvent(r, nil, &r.rxChallengeCh, relayHandshakeChallengeEvent{challenge: dm.Challenge, disco: di.discoKey, from: src, vni: vni, at: time.Now()}) // disco messages if they are not associated with any known
// [*endpoint.bestAddr].
func (r *relayManager) handleGeneveEncapDiscoMsgNotBestAddr(dm disco.Message, di *discoInfo, src netip.AddrPort, vni uint32) {
relayManagerInputEvent(r, nil, &r.rxHandshakeDiscoMsgCh, relayHandshakeDiscoMsgEvent{msg: dm, disco: di.discoKey, from: src, vni: vni, at: time.Now()})
} }
// relayManagerInputEvent initializes [relayManager] if necessary, starts // relayManagerInputEvent initializes [relayManager] if necessary, starts
@ -337,26 +346,68 @@ func (r *relayManager) stopWorkRunLoop(ep *endpoint, f stopHandshakeWorkFilter)
_, knownServer := r.serversByDisco[disco] _, knownServer := r.serversByDisco[disco]
if knownServer || f == stopHandshakeWorkAllServers { if knownServer || f == stopHandshakeWorkAllServers {
handshakeWork.cancel() handshakeWork.cancel()
handshakeWork.wg.Wait() done := <-handshakeWork.doneCh
delete(byServerDisco, disco) r.handleHandshakeWorkDoneRunLoop(done)
delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{handshakeWork.se.ServerDisco, handshakeWork.se.VNI})
} }
} }
if len(byServerDisco) == 0 {
delete(r.handshakeWorkByEndpointByServerDisco, ep)
}
} }
} }
func (r *relayManager) handleRxChallengeRunLoop(challenge relayHandshakeChallengeEvent) { // addrPortVNI represents a combined netip.AddrPort and Geneve header virtual
work, ok := r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{challenge.disco, challenge.vni}] // network identifier.
if !ok { type addrPortVNI struct {
addrPort netip.AddrPort
vni uint32
}
func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDiscoMsgEvent) {
var (
work *relayHandshakeWork
ok bool
)
apv := addrPortVNI{event.from, event.vni}
switch event.msg.(type) {
case *disco.BindUDPRelayEndpointChallenge:
work, ok = r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{event.disco, event.vni}]
if !ok {
// No outstanding work tied to this challenge, discard.
return
}
_, ok = r.handshakeWorkAwaitingPong[work]
if ok {
// We've seen a challenge for this relay endpoint previously,
// discard. Servers only respond to the first src ip:port they see
// binds from.
return
}
_, ok = r.addrPortVNIToHandshakeWork[apv]
if ok {
// There is existing work for the same [addrPortVNI] that is not
// 'work'. If both instances happen to be on the same server we
// could attempt to resolve event order using LamportID. For now
// just leave both work instances alone and take no action other
// than to discard this challenge msg.
return
}
// Update state so that future ping/pong will route to 'work'.
r.handshakeWorkAwaitingPong[work] = apv
r.addrPortVNIToHandshakeWork[apv] = work
case *disco.Ping, *disco.Pong:
work, ok = r.addrPortVNIToHandshakeWork[apv]
if !ok {
// No outstanding work tied to this [addrPortVNI], discard.
return
}
default:
// Unexpected message type, discard.
return return
} }
select { select {
case <-work.doneCh: case done := <-work.doneCh:
// handshakeServerEndpoint() returned, clean up its state.
r.handleHandshakeWorkDoneRunLoop(done)
return return
case work.rxChallengeCh <- challenge: case work.rxDiscoMsgCh <- event:
return return
} }
} }
@ -375,14 +426,17 @@ func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshak
delete(r.handshakeWorkByEndpointByServerDisco, done.work.ep) delete(r.handshakeWorkByEndpointByServerDisco, done.work.ep)
} }
delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{done.work.se.ServerDisco, done.work.se.VNI}) delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{done.work.se.ServerDisco, done.work.se.VNI})
if !done.answerSentTo.IsValid() { apv, ok := r.handshakeWorkAwaitingPong[work]
// The handshake timed out. if ok {
delete(r.handshakeWorkAwaitingPong, work)
delete(r.addrPortVNIToHandshakeWork, apv)
}
if !done.pongReceivedFrom.IsValid() {
// The handshake or ping/pong probing timed out.
return return
} }
// We received a challenge from and transmitted an answer towards the relay // This relay endpoint is functional.
// server. // TODO(jwhited): Set it on done.work.ep.bestAddr if it is a betterAddr().
// TODO(jwhited): Make the associated [*endpoint] aware of this
// [tailscale.com/net/udprelay.ServerEndpoint].
} }
func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) { func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) {
@ -398,19 +452,10 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
return return
} }
// The existing work is no longer valid, clean it up. Be sure to lookup // The existing work is no longer valid, clean it up.
// by the existing work's [*endpoint], not the incoming "new" work as
// they are not necessarily matching.
existingWork.cancel() existingWork.cancel()
existingWork.wg.Wait() done := <-existingWork.doneCh
delete(r.handshakeWorkByServerDiscoVNI, sdv) r.handleHandshakeWorkDoneRunLoop(done)
byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[existingWork.ep]
if ok {
delete(byServerDisco, sdv.serverDisco)
if len(byServerDisco) == 0 {
delete(r.handshakeWorkByEndpointByServerDisco, existingWork.ep)
}
}
} }
// Check for duplicate work by [*endpoint] + server disco. // Check for duplicate work by [*endpoint] + server disco.
@ -425,12 +470,8 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
} }
// Cancel existing handshake that has a lower lamport ID. // Cancel existing handshake that has a lower lamport ID.
existingWork.cancel() existingWork.cancel()
existingWork.wg.Wait() done := <-existingWork.doneCh
delete(r.handshakeWorkByServerDiscoVNI, sdv) r.handleHandshakeWorkDoneRunLoop(done)
delete(byServerDisco, sdv.serverDisco)
if len(byServerDisco) == 0 {
delete(r.handshakeWorkByEndpointByServerDisco, existingWork.ep)
}
} }
} }
@ -464,14 +505,12 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
// We're ready to start a new handshake. // We're ready to start a new handshake.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
work := &relayHandshakeWork{ work := &relayHandshakeWork{
ep: newServerEndpoint.ep, ep: newServerEndpoint.ep,
se: newServerEndpoint.se, se: newServerEndpoint.se,
doneCh: make(chan struct{}), doneCh: make(chan relayEndpointHandshakeWorkDoneEvent, 1),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
wg: wg,
} }
if byServerDisco == nil { if byServerDisco == nil {
byServerDisco = make(map[key.DiscoPublic]*relayHandshakeWork) byServerDisco = make(map[key.DiscoPublic]*relayHandshakeWork)
@ -480,19 +519,16 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
byServerDisco[newServerEndpoint.se.ServerDisco] = work byServerDisco[newServerEndpoint.se.ServerDisco] = work
r.handshakeWorkByServerDiscoVNI[sdv] = work r.handshakeWorkByServerDiscoVNI[sdv] = work
wg.Add(1)
go r.handshakeServerEndpoint(work) go r.handshakeServerEndpoint(work)
} }
func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) { func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
defer work.wg.Done()
done := relayEndpointHandshakeWorkDoneEvent{work: work} done := relayEndpointHandshakeWorkDoneEvent{work: work}
r.ensureDiscoInfoFor(work) r.ensureDiscoInfoFor(work)
defer func() { defer func() {
r.derefDiscoInfoFor(work) r.derefDiscoInfoFor(work)
close(work.doneCh) work.doneCh <- done
relayManagerInputEvent(r, work.ctx, &r.handshakeWorkDoneCh, done) relayManagerInputEvent(r, work.ctx, &r.handshakeWorkDoneCh, done)
work.cancel() work.cancel()
}() }()
@ -504,7 +540,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
for _, addrPort := range work.se.AddrPorts { for _, addrPort := range work.se.AddrPorts {
if addrPort.IsValid() { if addrPort.IsValid() {
sentBindAny = true sentBindAny = true
go work.ep.c.sendDiscoMessage(addrPort, vni, key.NodePublic{}, work.se.ServerDisco, bind, discoLog) go work.ep.c.sendDiscoMessage(addrPort, vni, key.NodePublic{}, work.se.ServerDisco, bind, discoVerboseLog)
} }
} }
if !sentBindAny { if !sentBindAny {
@ -518,46 +554,83 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
timer := time.NewTimer(min(work.se.BindLifetime.Duration, maxHandshakeLifetime)) timer := time.NewTimer(min(work.se.BindLifetime.Duration, maxHandshakeLifetime))
defer timer.Stop() defer timer.Stop()
// Wait for cancellation, a challenge to be rx'd, or handshake lifetime to // Limit the number of pings we will transmit. Inbound pings trigger
// expire. Our initial implementation values simplicity over other aspects, // outbound pings, so we want to be a little defensive.
// e.g. it is not resilient to any packet loss. const limitPings = 10
//
// We may want to eventually consider [disc.BindUDPRelayEndpoint] var (
// retransmission lacking challenge rx, and handshakeState disco.BindUDPRelayHandshakeState = disco.BindUDPRelayHandshakeStateBindSent
// [disco.BindUDPRelayEndpointAnswer] duplication in front of sentPingAt = make(map[stun.TxID]time.Time)
// [disco.Ping] until [disco.Ping] or [disco.Pong] is received. )
select {
case <-work.ctx.Done(): txPing := func(to netip.AddrPort, withAnswer *[32]byte) {
return if len(sentPingAt) == limitPings {
case challenge := <-work.rxChallengeCh: return
answer := &disco.BindUDPRelayEndpointAnswer{Answer: challenge.challenge} }
done.answerSentTo = challenge.from epDisco := work.ep.disco.Load()
// Send answer back to relay server. Typically sendDiscoMessage() calls if epDisco == nil {
// are invoked via a new goroutine in attempt to limit crypto+syscall return
// time contributing to system backpressure, and to fire roundtrip }
// latency-relevant messages as closely together as possible. We txid := stun.NewTxID()
// intentionally don't do that here, because: sentPingAt[txid] = time.Now()
// 1. The primary backpressure concern is around the work.rxChallengeCh ping := &disco.Ping{
// writer on the [Conn] packet rx path, who is already unblocked TxID: txid,
// since we read from the channel. Relay servers only ever tx one NodeKey: work.ep.c.publicKeyAtomic.Load(),
// challenge per rx'd bind message for a given (the first seen) src. }
// 2. runLoop() may be waiting for this 'work' to complete if go func() {
// explicitly canceled for some reason elsewhere, but this is if withAnswer != nil {
// typically only around [*endpoint] and/or [Conn] shutdown. answer := &disco.BindUDPRelayEndpointAnswer{Answer: *withAnswer}
// 3. It complicates the defer()'d [*discoInfo] deref and 'work' work.ep.c.sendDiscoMessage(to, vni, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog)
// completion event order. sendDiscoMessage() assumes the related }
// [*discoInfo] is still available. We also don't want the work.ep.c.sendDiscoMessage(to, vni, key.NodePublic{}, epDisco.key, ping, discoVerboseLog)
// [*endpoint] to send a [disco.Ping] before the }()
// [disco.BindUDPRelayEndpointAnswer] has gone out, otherwise the }
// remote side will never see the ping, delaying/preventing the
// [udprelay.ServerEndpoint] from becoming fully operational. // This for{select{}} is responsible for handshaking and tx'ing ping/pong
// 4. This is a singular tx with no roundtrip latency measurements // when the handshake is complete.
// involved. for {
work.ep.c.sendDiscoMessage(challenge.from, vni, key.NodePublic{}, work.se.ServerDisco, answer, discoLog) select {
return case <-work.ctx.Done():
case <-timer.C: return
// The handshake timed out. case msgEvent := <-work.rxDiscoMsgCh:
return switch msg := msgEvent.msg.(type) {
case *disco.BindUDPRelayEndpointChallenge:
if handshakeState >= disco.BindUDPRelayHandshakeStateAnswerSent {
continue
}
txPing(msgEvent.from, &msg.Challenge)
handshakeState = disco.BindUDPRelayHandshakeStateAnswerSent
case *disco.Ping:
if handshakeState < disco.BindUDPRelayHandshakeStateAnswerSent {
continue
}
// An inbound ping from the remote peer indicates we completed a
// handshake with the relay server (our answer msg was
// received). Chances are our ping was dropped before the remote
// handshake was complete. We need to rx a pong to determine
// latency, so send another ping. Since the handshake is
// complete we do not need to send an answer in front of this
// one.
txPing(msgEvent.from, nil)
case *disco.Pong:
at, ok := sentPingAt[msg.TxID]
if !ok {
continue
}
// The relay server endpoint is functional! Record the
// round-trip latency and return.
done.pongReceivedFrom = msgEvent.from
done.latency = time.Since(at)
return
default:
// unexpected message type, silently discard
continue
}
return
case <-timer.C:
// The handshake timed out.
return
}
} }
} }

View File

@ -25,6 +25,6 @@ func TestRelayManagerInitAndIdle(t *testing.T) {
<-rm.runLoopStoppedCh <-rm.runLoopStoppedCh
rm = relayManager{} rm = relayManager{}
rm.handleBindUDPRelayEndpointChallenge(&disco.BindUDPRelayEndpointChallenge{}, &discoInfo{}, netip.AddrPort{}, 0) rm.handleGeneveEncapDiscoMsgNotBestAddr(&disco.BindUDPRelayEndpointChallenge{}, &discoInfo{}, netip.AddrPort{}, 0)
<-rm.runLoopStoppedCh <-rm.runLoopStoppedCh
} }