diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 3788708a8..c2d18d707 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -1562,10 +1562,18 @@ func pktLenToPingSize(mtu tstun.WireMTU, is6 bool) int { // It should be called with the Conn.mu held. // // It reports whether m.TxID corresponds to a ping that this endpoint sent. -func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip.AddrPort) (knownTxID bool) { +func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip.AddrPort, vni virtualNetworkID) (knownTxID bool) { de.mu.Lock() defer de.mu.Unlock() + if vni.isSet() { + // TODO(jwhited): check for matching [endpoint.bestAddr] once that data + // structure is VNI-aware and [relayManager] can mutate it. We do not + // need to reference any [endpointState] for Geneve-encapsulated disco, + // we store nothing about them there. + return false + } + isDerp := src.Addr() == tailcfg.DerpMagicIPAddr sp, ok := de.sentPing[m.TxID] diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 61cdf4954..5b0f28a33 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -1802,6 +1802,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke return } var geneve packet.GeneveHeader + var vni virtualNetworkID if isGeneveEncap { err := geneve.Decode(msg) if err != nil { @@ -1810,6 +1811,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke c.logf("[unexpected] geneve header decoding error: %v", err) return } + vni.set(geneve.VNI) msg = msg[packet.GeneveFixedHeaderLength:] } // The control bit should only be set for relay handshake messages @@ -1923,33 +1925,30 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke c.logf("[unexpected] %T packets should not come from a relay server with Geneve control bit set", dm) return } - c.relayManager.handleBindUDPRelayEndpointChallenge(challenge, di, src, geneve.VNI) + c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(challenge, di, src, geneve.VNI) return } switch dm := dm.(type) { case *disco.Ping: metricRecvDiscoPing.Add(1) - if isGeneveEncap { - // TODO(jwhited): handle Geneve-encapsulated disco ping. - return - } - c.handlePingLocked(dm, src, di, derpNodeSrc) + c.handlePingLocked(dm, src, vni, di, derpNodeSrc) case *disco.Pong: metricRecvDiscoPong.Add(1) - if isGeneveEncap { - // TODO(jwhited): handle Geneve-encapsulated disco pong. - return - } // There might be multiple nodes for the sender's DiscoKey. // Ask each to handle it, stopping once one reports that // the Pong's TxID was theirs. + knownTxID := false c.peerMap.forEachEndpointWithDiscoKey(sender, func(ep *endpoint) (keepGoing bool) { - if ep.handlePongConnLocked(dm, di, src) { + if ep.handlePongConnLocked(dm, di, src, vni) { + knownTxID = true return false } return true }) + if !knownTxID && vni.isSet() { + c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(dm, di, src, vni.get()) + } case *disco.CallMeMaybe, *disco.CallMeMaybeVia: var via *disco.CallMeMaybeVia isVia := false @@ -2048,12 +2047,21 @@ func (c *Conn) unambiguousNodeKeyOfPingLocked(dm *disco.Ping, dk key.DiscoPublic // di is the discoInfo of the source of the ping. // derpNodeSrc is non-zero if the ping arrived via DERP. -func (c *Conn) handlePingLocked(dm *disco.Ping, src netip.AddrPort, di *discoInfo, derpNodeSrc key.NodePublic) { +func (c *Conn) handlePingLocked(dm *disco.Ping, src netip.AddrPort, vni virtualNetworkID, di *discoInfo, derpNodeSrc key.NodePublic) { likelyHeartBeat := src == di.lastPingFrom && time.Since(di.lastPingTime) < 5*time.Second di.lastPingFrom = src di.lastPingTime = time.Now() isDerp := src.Addr() == tailcfg.DerpMagicIPAddr + if vni.isSet() { + // TODO(jwhited): check for matching [endpoint.bestAddr] once that data + // structure is VNI-aware and [relayManager] can mutate it. We do not + // need to reference any [endpointState] for Geneve-encapsulated disco, + // we store nothing about them there. + c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(dm, di, src, vni.get()) + return + } + // If we can figure out with certainty which node key this disco // message is for, eagerly update our IP:port<>node and disco<>node // mappings to make p2p path discovery faster in simple diff --git a/wgengine/magicsock/relaymanager.go b/wgengine/magicsock/relaymanager.go index 0b19bb83f..d9fd1fa24 100644 --- a/wgengine/magicsock/relaymanager.go +++ b/wgengine/magicsock/relaymanager.go @@ -14,15 +14,16 @@ import ( "time" "tailscale.com/disco" + "tailscale.com/net/stun" udprelay "tailscale.com/net/udprelay/endpoint" "tailscale.com/types/key" "tailscale.com/util/httpm" "tailscale.com/util/set" ) -// relayManager manages allocation and handshaking of -// [tailscale.com/net/udprelay.Server] endpoints. The zero value is ready for -// use. +// relayManager manages allocation, handshaking, and initial probing (disco +// ping/pong) of [tailscale.com/net/udprelay.Server] endpoints. The zero value +// is ready for use. type relayManager struct { initOnce sync.Once @@ -33,16 +34,18 @@ type relayManager struct { allocWorkByEndpoint map[*endpoint]*relayEndpointAllocWork handshakeWorkByEndpointByServerDisco map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork handshakeWorkByServerDiscoVNI map[serverDiscoVNI]*relayHandshakeWork + handshakeWorkAwaitingPong map[*relayHandshakeWork]addrPortVNI + addrPortVNIToHandshakeWork map[addrPortVNI]*relayHandshakeWork // =================================================================== // The following chan fields serve event inputs to a single goroutine, // runLoop(). - allocateHandshakeCh chan *endpoint - allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent - handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent - cancelWorkCh chan *endpoint - newServerEndpointCh chan newRelayServerEndpointEvent - rxChallengeCh chan relayHandshakeChallengeEvent + allocateHandshakeCh chan *endpoint + allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent + handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent + cancelWorkCh chan *endpoint + newServerEndpointCh chan newRelayServerEndpointEvent + rxHandshakeDiscoMsgCh chan relayHandshakeDiscoMsgEvent discoInfoMu sync.Mutex // guards the following field discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo @@ -66,16 +69,16 @@ type relayHandshakeWork struct { ep *endpoint se udprelay.ServerEndpoint - // In order to not deadlock, runLoop() must select{} read doneCh when - // attempting to write into rxChallengeCh, and the handshake work goroutine - // must close(doneCh) before attempting to write to - // relayManager.handshakeWorkDoneCh. - rxChallengeCh chan relayHandshakeChallengeEvent - doneCh chan struct{} + // handshakeServerEndpoint() always writes to doneCh (len 1) when it + // returns. It may end up writing the same event afterward to + // relayManager.handshakeWorkDoneCh if runLoop() can receive it. runLoop() + // must select{} read on doneCh to prevent deadlock when attempting to write + // to rxDiscoMsgCh. + rxDiscoMsgCh chan relayHandshakeDiscoMsgEvent + doneCh chan relayEndpointHandshakeWorkDoneEvent ctx context.Context cancel context.CancelFunc - wg *sync.WaitGroup } // newRelayServerEndpointEvent indicates a new [udprelay.ServerEndpoint] has @@ -99,8 +102,9 @@ type relayEndpointAllocWorkDoneEvent struct { // work for an [*endpoint] has completed. This structure is immutable once // initialized. type relayEndpointHandshakeWorkDoneEvent struct { - work *relayHandshakeWork - answerSentTo netip.AddrPort // zero value if answer was not transmitted + work *relayHandshakeWork + pongReceivedFrom netip.AddrPort // or zero value if handshake or ping/pong did not complete + latency time.Duration // only relevant if pongReceivedFrom.IsValid() } // activeWorkRunLoop returns true if there is outstanding allocation or @@ -150,8 +154,8 @@ func (r *relayManager) runLoop() { if !r.activeWorkRunLoop() { return } - case challenge := <-r.rxChallengeCh: - r.handleRxChallengeRunLoop(challenge) + case discoMsgEvent := <-r.rxHandshakeDiscoMsgCh: + r.handleRxHandshakeDiscoMsgRunLoop(discoMsgEvent) if !r.activeWorkRunLoop() { return } @@ -159,12 +163,12 @@ func (r *relayManager) runLoop() { } } -type relayHandshakeChallengeEvent struct { - challenge [32]byte - disco key.DiscoPublic - from netip.AddrPort - vni uint32 - at time.Time +type relayHandshakeDiscoMsgEvent struct { + msg disco.Message + disco key.DiscoPublic + from netip.AddrPort + vni uint32 + at time.Time } // relayEndpointAllocWork serves to track in-progress relay endpoint allocation @@ -187,12 +191,14 @@ func (r *relayManager) init() { r.allocWorkByEndpoint = make(map[*endpoint]*relayEndpointAllocWork) r.handshakeWorkByEndpointByServerDisco = make(map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork) r.handshakeWorkByServerDiscoVNI = make(map[serverDiscoVNI]*relayHandshakeWork) + r.handshakeWorkAwaitingPong = make(map[*relayHandshakeWork]addrPortVNI) + r.addrPortVNIToHandshakeWork = make(map[addrPortVNI]*relayHandshakeWork) r.allocateHandshakeCh = make(chan *endpoint) r.allocateWorkDoneCh = make(chan relayEndpointAllocWorkDoneEvent) r.handshakeWorkDoneCh = make(chan relayEndpointHandshakeWorkDoneEvent) r.cancelWorkCh = make(chan *endpoint) r.newServerEndpointCh = make(chan newRelayServerEndpointEvent) - r.rxChallengeCh = make(chan relayHandshakeChallengeEvent) + r.rxHandshakeDiscoMsgCh = make(chan relayHandshakeDiscoMsgEvent) r.runLoopStoppedCh = make(chan struct{}, 1) go r.runLoop() }) @@ -270,8 +276,11 @@ func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, dm *disco.CallMeMaybeV }) } -func (r *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRelayEndpointChallenge, di *discoInfo, src netip.AddrPort, vni uint32) { - relayManagerInputEvent(r, nil, &r.rxChallengeCh, relayHandshakeChallengeEvent{challenge: dm.Challenge, disco: di.discoKey, from: src, vni: vni, at: time.Now()}) +// handleGeneveEncapDiscoMsgNotBestAddr handles reception of Geneve-encapsulated +// disco messages if they are not associated with any known +// [*endpoint.bestAddr]. +func (r *relayManager) handleGeneveEncapDiscoMsgNotBestAddr(dm disco.Message, di *discoInfo, src netip.AddrPort, vni uint32) { + relayManagerInputEvent(r, nil, &r.rxHandshakeDiscoMsgCh, relayHandshakeDiscoMsgEvent{msg: dm, disco: di.discoKey, from: src, vni: vni, at: time.Now()}) } // relayManagerInputEvent initializes [relayManager] if necessary, starts @@ -337,26 +346,68 @@ func (r *relayManager) stopWorkRunLoop(ep *endpoint, f stopHandshakeWorkFilter) _, knownServer := r.serversByDisco[disco] if knownServer || f == stopHandshakeWorkAllServers { handshakeWork.cancel() - handshakeWork.wg.Wait() - delete(byServerDisco, disco) - delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{handshakeWork.se.ServerDisco, handshakeWork.se.VNI}) + done := <-handshakeWork.doneCh + r.handleHandshakeWorkDoneRunLoop(done) } } - if len(byServerDisco) == 0 { - delete(r.handshakeWorkByEndpointByServerDisco, ep) - } } } -func (r *relayManager) handleRxChallengeRunLoop(challenge relayHandshakeChallengeEvent) { - work, ok := r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{challenge.disco, challenge.vni}] - if !ok { +// addrPortVNI represents a combined netip.AddrPort and Geneve header virtual +// network identifier. +type addrPortVNI struct { + addrPort netip.AddrPort + vni uint32 +} + +func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDiscoMsgEvent) { + var ( + work *relayHandshakeWork + ok bool + ) + apv := addrPortVNI{event.from, event.vni} + switch event.msg.(type) { + case *disco.BindUDPRelayEndpointChallenge: + work, ok = r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{event.disco, event.vni}] + if !ok { + // No outstanding work tied to this challenge, discard. + return + } + _, ok = r.handshakeWorkAwaitingPong[work] + if ok { + // We've seen a challenge for this relay endpoint previously, + // discard. Servers only respond to the first src ip:port they see + // binds from. + return + } + _, ok = r.addrPortVNIToHandshakeWork[apv] + if ok { + // There is existing work for the same [addrPortVNI] that is not + // 'work'. If both instances happen to be on the same server we + // could attempt to resolve event order using LamportID. For now + // just leave both work instances alone and take no action other + // than to discard this challenge msg. + return + } + // Update state so that future ping/pong will route to 'work'. + r.handshakeWorkAwaitingPong[work] = apv + r.addrPortVNIToHandshakeWork[apv] = work + case *disco.Ping, *disco.Pong: + work, ok = r.addrPortVNIToHandshakeWork[apv] + if !ok { + // No outstanding work tied to this [addrPortVNI], discard. + return + } + default: + // Unexpected message type, discard. return } select { - case <-work.doneCh: + case done := <-work.doneCh: + // handshakeServerEndpoint() returned, clean up its state. + r.handleHandshakeWorkDoneRunLoop(done) return - case work.rxChallengeCh <- challenge: + case work.rxDiscoMsgCh <- event: return } } @@ -375,14 +426,17 @@ func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshak delete(r.handshakeWorkByEndpointByServerDisco, done.work.ep) } delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{done.work.se.ServerDisco, done.work.se.VNI}) - if !done.answerSentTo.IsValid() { - // The handshake timed out. + apv, ok := r.handshakeWorkAwaitingPong[work] + if ok { + delete(r.handshakeWorkAwaitingPong, work) + delete(r.addrPortVNIToHandshakeWork, apv) + } + if !done.pongReceivedFrom.IsValid() { + // The handshake or ping/pong probing timed out. return } - // We received a challenge from and transmitted an answer towards the relay - // server. - // TODO(jwhited): Make the associated [*endpoint] aware of this - // [tailscale.com/net/udprelay.ServerEndpoint]. + // This relay endpoint is functional. + // TODO(jwhited): Set it on done.work.ep.bestAddr if it is a betterAddr(). } func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) { @@ -398,19 +452,10 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay return } - // The existing work is no longer valid, clean it up. Be sure to lookup - // by the existing work's [*endpoint], not the incoming "new" work as - // they are not necessarily matching. + // The existing work is no longer valid, clean it up. existingWork.cancel() - existingWork.wg.Wait() - delete(r.handshakeWorkByServerDiscoVNI, sdv) - byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[existingWork.ep] - if ok { - delete(byServerDisco, sdv.serverDisco) - if len(byServerDisco) == 0 { - delete(r.handshakeWorkByEndpointByServerDisco, existingWork.ep) - } - } + done := <-existingWork.doneCh + r.handleHandshakeWorkDoneRunLoop(done) } // Check for duplicate work by [*endpoint] + server disco. @@ -425,12 +470,8 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay } // Cancel existing handshake that has a lower lamport ID. existingWork.cancel() - existingWork.wg.Wait() - delete(r.handshakeWorkByServerDiscoVNI, sdv) - delete(byServerDisco, sdv.serverDisco) - if len(byServerDisco) == 0 { - delete(r.handshakeWorkByEndpointByServerDisco, existingWork.ep) - } + done := <-existingWork.doneCh + r.handleHandshakeWorkDoneRunLoop(done) } } @@ -464,14 +505,12 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay // We're ready to start a new handshake. ctx, cancel := context.WithCancel(context.Background()) - wg := &sync.WaitGroup{} work := &relayHandshakeWork{ ep: newServerEndpoint.ep, se: newServerEndpoint.se, - doneCh: make(chan struct{}), + doneCh: make(chan relayEndpointHandshakeWorkDoneEvent, 1), ctx: ctx, cancel: cancel, - wg: wg, } if byServerDisco == nil { byServerDisco = make(map[key.DiscoPublic]*relayHandshakeWork) @@ -480,19 +519,16 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay byServerDisco[newServerEndpoint.se.ServerDisco] = work r.handshakeWorkByServerDiscoVNI[sdv] = work - wg.Add(1) go r.handshakeServerEndpoint(work) } func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) { - defer work.wg.Done() - done := relayEndpointHandshakeWorkDoneEvent{work: work} r.ensureDiscoInfoFor(work) defer func() { r.derefDiscoInfoFor(work) - close(work.doneCh) + work.doneCh <- done relayManagerInputEvent(r, work.ctx, &r.handshakeWorkDoneCh, done) work.cancel() }() @@ -504,7 +540,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) { for _, addrPort := range work.se.AddrPorts { if addrPort.IsValid() { sentBindAny = true - go work.ep.c.sendDiscoMessage(addrPort, vni, key.NodePublic{}, work.se.ServerDisco, bind, discoLog) + go work.ep.c.sendDiscoMessage(addrPort, vni, key.NodePublic{}, work.se.ServerDisco, bind, discoVerboseLog) } } if !sentBindAny { @@ -518,46 +554,83 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) { timer := time.NewTimer(min(work.se.BindLifetime.Duration, maxHandshakeLifetime)) defer timer.Stop() - // Wait for cancellation, a challenge to be rx'd, or handshake lifetime to - // expire. Our initial implementation values simplicity over other aspects, - // e.g. it is not resilient to any packet loss. - // - // We may want to eventually consider [disc.BindUDPRelayEndpoint] - // retransmission lacking challenge rx, and - // [disco.BindUDPRelayEndpointAnswer] duplication in front of - // [disco.Ping] until [disco.Ping] or [disco.Pong] is received. - select { - case <-work.ctx.Done(): - return - case challenge := <-work.rxChallengeCh: - answer := &disco.BindUDPRelayEndpointAnswer{Answer: challenge.challenge} - done.answerSentTo = challenge.from - // Send answer back to relay server. Typically sendDiscoMessage() calls - // are invoked via a new goroutine in attempt to limit crypto+syscall - // time contributing to system backpressure, and to fire roundtrip - // latency-relevant messages as closely together as possible. We - // intentionally don't do that here, because: - // 1. The primary backpressure concern is around the work.rxChallengeCh - // writer on the [Conn] packet rx path, who is already unblocked - // since we read from the channel. Relay servers only ever tx one - // challenge per rx'd bind message for a given (the first seen) src. - // 2. runLoop() may be waiting for this 'work' to complete if - // explicitly canceled for some reason elsewhere, but this is - // typically only around [*endpoint] and/or [Conn] shutdown. - // 3. It complicates the defer()'d [*discoInfo] deref and 'work' - // completion event order. sendDiscoMessage() assumes the related - // [*discoInfo] is still available. We also don't want the - // [*endpoint] to send a [disco.Ping] before the - // [disco.BindUDPRelayEndpointAnswer] has gone out, otherwise the - // remote side will never see the ping, delaying/preventing the - // [udprelay.ServerEndpoint] from becoming fully operational. - // 4. This is a singular tx with no roundtrip latency measurements - // involved. - work.ep.c.sendDiscoMessage(challenge.from, vni, key.NodePublic{}, work.se.ServerDisco, answer, discoLog) - return - case <-timer.C: - // The handshake timed out. - return + // Limit the number of pings we will transmit. Inbound pings trigger + // outbound pings, so we want to be a little defensive. + const limitPings = 10 + + var ( + handshakeState disco.BindUDPRelayHandshakeState = disco.BindUDPRelayHandshakeStateBindSent + sentPingAt = make(map[stun.TxID]time.Time) + ) + + txPing := func(to netip.AddrPort, withAnswer *[32]byte) { + if len(sentPingAt) == limitPings { + return + } + epDisco := work.ep.disco.Load() + if epDisco == nil { + return + } + txid := stun.NewTxID() + sentPingAt[txid] = time.Now() + ping := &disco.Ping{ + TxID: txid, + NodeKey: work.ep.c.publicKeyAtomic.Load(), + } + go func() { + if withAnswer != nil { + answer := &disco.BindUDPRelayEndpointAnswer{Answer: *withAnswer} + work.ep.c.sendDiscoMessage(to, vni, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog) + } + work.ep.c.sendDiscoMessage(to, vni, key.NodePublic{}, epDisco.key, ping, discoVerboseLog) + }() + } + + // This for{select{}} is responsible for handshaking and tx'ing ping/pong + // when the handshake is complete. + for { + select { + case <-work.ctx.Done(): + return + case msgEvent := <-work.rxDiscoMsgCh: + switch msg := msgEvent.msg.(type) { + case *disco.BindUDPRelayEndpointChallenge: + if handshakeState >= disco.BindUDPRelayHandshakeStateAnswerSent { + continue + } + txPing(msgEvent.from, &msg.Challenge) + handshakeState = disco.BindUDPRelayHandshakeStateAnswerSent + case *disco.Ping: + if handshakeState < disco.BindUDPRelayHandshakeStateAnswerSent { + continue + } + // An inbound ping from the remote peer indicates we completed a + // handshake with the relay server (our answer msg was + // received). Chances are our ping was dropped before the remote + // handshake was complete. We need to rx a pong to determine + // latency, so send another ping. Since the handshake is + // complete we do not need to send an answer in front of this + // one. + txPing(msgEvent.from, nil) + case *disco.Pong: + at, ok := sentPingAt[msg.TxID] + if !ok { + continue + } + // The relay server endpoint is functional! Record the + // round-trip latency and return. + done.pongReceivedFrom = msgEvent.from + done.latency = time.Since(at) + return + default: + // unexpected message type, silently discard + continue + } + return + case <-timer.C: + // The handshake timed out. + return + } } } diff --git a/wgengine/magicsock/relaymanager_test.go b/wgengine/magicsock/relaymanager_test.go index 3b75db9f6..8276849aa 100644 --- a/wgengine/magicsock/relaymanager_test.go +++ b/wgengine/magicsock/relaymanager_test.go @@ -25,6 +25,6 @@ func TestRelayManagerInitAndIdle(t *testing.T) { <-rm.runLoopStoppedCh rm = relayManager{} - rm.handleBindUDPRelayEndpointChallenge(&disco.BindUDPRelayEndpointChallenge{}, &discoInfo{}, netip.AddrPort{}, 0) + rm.handleGeneveEncapDiscoMsgNotBestAddr(&disco.BindUDPRelayEndpointChallenge{}, &discoInfo{}, netip.AddrPort{}, 0) <-rm.runLoopStoppedCh }