mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-30 15:53:45 +00:00
wgengine/magicsock: enable setting relay epAddr's as bestAddr (#16229)
relayManager can now hand endpoint a relay epAddr for it to consider as bestAddr. endpoint and Conn disco ping/pong handling are now VNI-aware. Updates tailscale/corp#27502 Updates tailscale/corp#29422 Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
parent
4456f77af7
commit
67b1693c13
@ -99,6 +99,27 @@ type endpoint struct {
|
|||||||
relayCapable bool // whether the node is capable of speaking via a [tailscale.com/net/udprelay.Server]
|
relayCapable bool // whether the node is capable of speaking via a [tailscale.com/net/udprelay.Server]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// relayEndpointReady determines whether the given relay addr should be
|
||||||
|
// installed as de.bestAddr. It is only called by [relayManager] once it has
|
||||||
|
// determined addr is functional via [disco.Pong] reception.
|
||||||
|
func (de *endpoint) relayEndpointReady(addr epAddr, latency time.Duration) {
|
||||||
|
de.c.mu.Lock()
|
||||||
|
defer de.c.mu.Unlock()
|
||||||
|
de.mu.Lock()
|
||||||
|
defer de.mu.Unlock()
|
||||||
|
|
||||||
|
maybeBetter := addrQuality{addr, latency, pingSizeToPktLen(0, addr)}
|
||||||
|
if !betterAddr(maybeBetter, de.bestAddr) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Promote maybeBetter to bestAddr.
|
||||||
|
// TODO(jwhited): collapse path change logging with endpoint.handlePongConnLocked()
|
||||||
|
de.c.logf("magicsock: disco: node %v %v now using %v mtu=%v", de.publicKey.ShortString(), de.discoShort(), maybeBetter.epAddr, maybeBetter.wireMTU)
|
||||||
|
de.setBestAddrLocked(maybeBetter)
|
||||||
|
de.c.peerMap.setNodeKeyForEpAddr(addr, de.publicKey)
|
||||||
|
}
|
||||||
|
|
||||||
func (de *endpoint) setBestAddrLocked(v addrQuality) {
|
func (de *endpoint) setBestAddrLocked(v addrQuality) {
|
||||||
if v.epAddr != de.bestAddr.epAddr {
|
if v.epAddr != de.bestAddr.epAddr {
|
||||||
de.probeUDPLifetime.resetCycleEndpointLocked()
|
de.probeUDPLifetime.resetCycleEndpointLocked()
|
||||||
@ -1575,11 +1596,10 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src epAdd
|
|||||||
de.mu.Lock()
|
de.mu.Lock()
|
||||||
defer de.mu.Unlock()
|
defer de.mu.Unlock()
|
||||||
|
|
||||||
if src.vni.isSet() {
|
if src.vni.isSet() && src != de.bestAddr.epAddr {
|
||||||
// TODO(jwhited): fall through once [relayManager] is able to set an
|
// "src" is not our bestAddr, but [relayManager] might be in the
|
||||||
// [epAddr] as de.bestAddr. We do not need to reference any
|
// middle of probing it, awaiting pong reception. Make it aware.
|
||||||
// [endpointState] for Geneve-encapsulated disco, we store nothing
|
de.c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(m, di, src)
|
||||||
// about them there.
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1605,7 +1625,9 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src epAdd
|
|||||||
now := mono.Now()
|
now := mono.Now()
|
||||||
latency := now.Sub(sp.at)
|
latency := now.Sub(sp.at)
|
||||||
|
|
||||||
if !isDerp {
|
if !isDerp && !src.vni.isSet() {
|
||||||
|
// Note: we check vni.isSet() as relay [epAddr]'s are not stored in
|
||||||
|
// endpointState, they are either de.bestAddr or not.
|
||||||
st, ok := de.endpointState[sp.to.ap]
|
st, ok := de.endpointState[sp.to.ap]
|
||||||
if !ok {
|
if !ok {
|
||||||
// This is no longer an endpoint we care about.
|
// This is no longer an endpoint we care about.
|
||||||
@ -1643,6 +1665,11 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src epAdd
|
|||||||
if !isDerp {
|
if !isDerp {
|
||||||
thisPong := addrQuality{sp.to, latency, tstun.WireMTU(pingSizeToPktLen(sp.size, sp.to))}
|
thisPong := addrQuality{sp.to, latency, tstun.WireMTU(pingSizeToPktLen(sp.size, sp.to))}
|
||||||
if betterAddr(thisPong, de.bestAddr) {
|
if betterAddr(thisPong, de.bestAddr) {
|
||||||
|
if src.vni.isSet() {
|
||||||
|
// This would be unexpected. Switching to a Geneve-encapsulated
|
||||||
|
// path should only happen in de.relayEndpointReady().
|
||||||
|
de.c.logf("[unexpected] switching to Geneve-encapsulated path %v from %v", thisPong, de.bestAddr)
|
||||||
|
}
|
||||||
de.c.logf("magicsock: disco: node %v %v now using %v mtu=%v tx=%x", de.publicKey.ShortString(), de.discoShort(), sp.to, thisPong.wireMTU, m.TxID[:6])
|
de.c.logf("magicsock: disco: node %v %v now using %v mtu=%v tx=%x", de.publicKey.ShortString(), de.discoShort(), sp.to, thisPong.wireMTU, m.TxID[:6])
|
||||||
de.debugUpdates.Add(EndpointChange{
|
de.debugUpdates.Add(EndpointChange{
|
||||||
When: time.Now(),
|
When: time.Now(),
|
||||||
|
@ -2132,28 +2132,10 @@ func (c *Conn) handlePingLocked(dm *disco.Ping, src epAddr, di *discoInfo, derpN
|
|||||||
di.lastPingTime = time.Now()
|
di.lastPingTime = time.Now()
|
||||||
isDerp := src.ap.Addr() == tailcfg.DerpMagicIPAddr
|
isDerp := src.ap.Addr() == tailcfg.DerpMagicIPAddr
|
||||||
|
|
||||||
if src.vni.isSet() {
|
// numNodes tracks how many nodes (node keys) are associated with the disco
|
||||||
// TODO(jwhited): check for matching [endpoint.bestAddr] once that data
|
// key tied to this inbound ping. Multiple nodes may share the same disco
|
||||||
// structure is VNI-aware and [relayManager] can mutate it. We do not
|
// key in the case of node sharing and users switching accounts.
|
||||||
// need to reference any [endpointState] for Geneve-encapsulated disco,
|
var numNodes int
|
||||||
// we store nothing about them there.
|
|
||||||
c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(dm, di, src)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we can figure out with certainty which node key this disco
|
|
||||||
// message is for, eagerly update our IP:port<>node and disco<>node
|
|
||||||
// mappings to make p2p path discovery faster in simple
|
|
||||||
// cases. Without this, disco would still work, but would be
|
|
||||||
// reliant on DERP call-me-maybe to establish the disco<>node
|
|
||||||
// mapping, and on subsequent disco handlePongConnLocked to establish
|
|
||||||
// the IP:port<>disco mapping.
|
|
||||||
if nk, ok := c.unambiguousNodeKeyOfPingLocked(dm, di.discoKey, derpNodeSrc); ok {
|
|
||||||
if !isDerp {
|
|
||||||
c.peerMap.setNodeKeyForEpAddr(src, nk)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we got a ping over DERP, then derpNodeSrc is non-zero and we reply
|
// If we got a ping over DERP, then derpNodeSrc is non-zero and we reply
|
||||||
// over DERP (in which case ipDst is also a DERP address).
|
// over DERP (in which case ipDst is also a DERP address).
|
||||||
// But if the ping was over UDP (ipDst is not a DERP address), then dstKey
|
// But if the ping was over UDP (ipDst is not a DERP address), then dstKey
|
||||||
@ -2161,35 +2143,81 @@ func (c *Conn) handlePingLocked(dm *disco.Ping, src epAddr, di *discoInfo, derpN
|
|||||||
// a dstKey if the dst ip:port is DERP.
|
// a dstKey if the dst ip:port is DERP.
|
||||||
dstKey := derpNodeSrc
|
dstKey := derpNodeSrc
|
||||||
|
|
||||||
// Remember this route if not present.
|
switch {
|
||||||
var numNodes int
|
case src.vni.isSet():
|
||||||
var dup bool
|
if isDerp {
|
||||||
if isDerp {
|
c.logf("[unexpected] got Geneve-encapsulated disco ping from %v/%v over DERP", src, derpNodeSrc)
|
||||||
if ep, ok := c.peerMap.endpointForNodeKey(derpNodeSrc); ok {
|
|
||||||
if ep.addCandidateEndpoint(src.ap, dm.TxID) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
numNodes = 1
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
c.peerMap.forEachEndpointWithDiscoKey(di.discoKey, func(ep *endpoint) (keepGoing bool) {
|
|
||||||
if ep.addCandidateEndpoint(src.ap, dm.TxID) {
|
|
||||||
dup = true
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
numNodes++
|
|
||||||
if numNodes == 1 && dstKey.IsZero() {
|
|
||||||
dstKey = ep.publicKey
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if dup {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if numNodes > 1 {
|
|
||||||
// Zero it out if it's ambiguous, so sendDiscoMessage logging
|
var bestEpAddr epAddr
|
||||||
// isn't confusing.
|
var discoKey key.DiscoPublic
|
||||||
dstKey = key.NodePublic{}
|
ep, ok := c.peerMap.endpointForEpAddr(src)
|
||||||
|
if ok {
|
||||||
|
ep.mu.Lock()
|
||||||
|
bestEpAddr = ep.bestAddr.epAddr
|
||||||
|
ep.mu.Unlock()
|
||||||
|
disco := ep.disco.Load()
|
||||||
|
if disco != nil {
|
||||||
|
discoKey = disco.key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if src == bestEpAddr && discoKey == di.discoKey {
|
||||||
|
// We have an associated endpoint with src as its bestAddr. Set
|
||||||
|
// numNodes so we TX a pong further down.
|
||||||
|
numNodes = 1
|
||||||
|
} else {
|
||||||
|
// We have no [endpoint] in the [peerMap] for this relay [epAddr]
|
||||||
|
// using it as a bestAddr. [relayManager] might be in the middle of
|
||||||
|
// probing it or attempting to set it as best via
|
||||||
|
// [endpoint.relayEndpointReady()]. Make [relayManager] aware.
|
||||||
|
c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(dm, di, src)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default: // no VNI
|
||||||
|
// If we can figure out with certainty which node key this disco
|
||||||
|
// message is for, eagerly update our [epAddr]<>node and disco<>node
|
||||||
|
// mappings to make p2p path discovery faster in simple
|
||||||
|
// cases. Without this, disco would still work, but would be
|
||||||
|
// reliant on DERP call-me-maybe to establish the disco<>node
|
||||||
|
// mapping, and on subsequent disco handlePongConnLocked to establish
|
||||||
|
// the IP:port<>disco mapping.
|
||||||
|
if nk, ok := c.unambiguousNodeKeyOfPingLocked(dm, di.discoKey, derpNodeSrc); ok {
|
||||||
|
if !isDerp {
|
||||||
|
c.peerMap.setNodeKeyForEpAddr(src, nk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remember this route if not present.
|
||||||
|
var dup bool
|
||||||
|
if isDerp {
|
||||||
|
if ep, ok := c.peerMap.endpointForNodeKey(derpNodeSrc); ok {
|
||||||
|
if ep.addCandidateEndpoint(src.ap, dm.TxID) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
numNodes = 1
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
c.peerMap.forEachEndpointWithDiscoKey(di.discoKey, func(ep *endpoint) (keepGoing bool) {
|
||||||
|
if ep.addCandidateEndpoint(src.ap, dm.TxID) {
|
||||||
|
dup = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
numNodes++
|
||||||
|
if numNodes == 1 && dstKey.IsZero() {
|
||||||
|
dstKey = ep.publicKey
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if dup {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if numNodes > 1 {
|
||||||
|
// Zero it out if it's ambiguous, so sendDiscoMessage logging
|
||||||
|
// isn't confusing.
|
||||||
|
dstKey = key.NodePublic{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,18 @@ type peerMap struct {
|
|||||||
byEpAddr map[epAddr]*peerInfo
|
byEpAddr map[epAddr]*peerInfo
|
||||||
byNodeID map[tailcfg.NodeID]*peerInfo
|
byNodeID map[tailcfg.NodeID]*peerInfo
|
||||||
|
|
||||||
|
// relayEpAddrByNodeKey ensures we only hold a single relay
|
||||||
|
// [epAddr] (vni.isSet()) for a given node key in byEpAddr, vs letting them
|
||||||
|
// grow unbounded. Relay [epAddr]'s are dynamically created by
|
||||||
|
// [relayManager] during path discovery, and are only useful to track in
|
||||||
|
// peerMap so long as they are the endpoint.bestAddr. [relayManager] handles
|
||||||
|
// all creation and initial probing responsibilities otherwise, and it does
|
||||||
|
// not depend on [peerMap].
|
||||||
|
//
|
||||||
|
// Note: This doesn't address unbounded growth of non-relay epAddr's in
|
||||||
|
// byEpAddr. That issue is being tracked in http://go/corp/29422.
|
||||||
|
relayEpAddrByNodeKey map[key.NodePublic]epAddr
|
||||||
|
|
||||||
// nodesOfDisco contains the set of nodes that are using a
|
// nodesOfDisco contains the set of nodes that are using a
|
||||||
// DiscoKey. Usually those sets will be just one node.
|
// DiscoKey. Usually those sets will be just one node.
|
||||||
nodesOfDisco map[key.DiscoPublic]set.Set[key.NodePublic]
|
nodesOfDisco map[key.DiscoPublic]set.Set[key.NodePublic]
|
||||||
@ -43,10 +55,11 @@ type peerMap struct {
|
|||||||
|
|
||||||
func newPeerMap() peerMap {
|
func newPeerMap() peerMap {
|
||||||
return peerMap{
|
return peerMap{
|
||||||
byNodeKey: map[key.NodePublic]*peerInfo{},
|
byNodeKey: map[key.NodePublic]*peerInfo{},
|
||||||
byEpAddr: map[epAddr]*peerInfo{},
|
byEpAddr: map[epAddr]*peerInfo{},
|
||||||
byNodeID: map[tailcfg.NodeID]*peerInfo{},
|
byNodeID: map[tailcfg.NodeID]*peerInfo{},
|
||||||
nodesOfDisco: map[key.DiscoPublic]set.Set[key.NodePublic]{},
|
relayEpAddrByNodeKey: map[key.NodePublic]epAddr{},
|
||||||
|
nodesOfDisco: map[key.DiscoPublic]set.Set[key.NodePublic]{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,8 +184,19 @@ func (m *peerMap) setNodeKeyForEpAddr(addr epAddr, nk key.NodePublic) {
|
|||||||
if pi := m.byEpAddr[addr]; pi != nil {
|
if pi := m.byEpAddr[addr]; pi != nil {
|
||||||
delete(pi.epAddrs, addr)
|
delete(pi.epAddrs, addr)
|
||||||
delete(m.byEpAddr, addr)
|
delete(m.byEpAddr, addr)
|
||||||
|
if addr.vni.isSet() {
|
||||||
|
delete(m.relayEpAddrByNodeKey, pi.ep.publicKey)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if pi, ok := m.byNodeKey[nk]; ok {
|
if pi, ok := m.byNodeKey[nk]; ok {
|
||||||
|
if addr.vni.isSet() {
|
||||||
|
relay, ok := m.relayEpAddrByNodeKey[nk]
|
||||||
|
if ok {
|
||||||
|
delete(pi.epAddrs, relay)
|
||||||
|
delete(m.byEpAddr, relay)
|
||||||
|
}
|
||||||
|
m.relayEpAddrByNodeKey[nk] = addr
|
||||||
|
}
|
||||||
pi.epAddrs.Add(addr)
|
pi.epAddrs.Add(addr)
|
||||||
m.byEpAddr[addr] = pi
|
m.byEpAddr[addr] = pi
|
||||||
}
|
}
|
||||||
@ -204,4 +228,5 @@ func (m *peerMap) deleteEndpoint(ep *endpoint) {
|
|||||||
for ip := range pi.epAddrs {
|
for ip := range pi.epAddrs {
|
||||||
delete(m.byEpAddr, ip)
|
delete(m.byEpAddr, ip)
|
||||||
}
|
}
|
||||||
|
delete(m.relayEpAddrByNodeKey, ep.publicKey)
|
||||||
}
|
}
|
||||||
|
36
wgengine/magicsock/peermap_test.go
Normal file
36
wgengine/magicsock/peermap_test.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
// Copyright (c) Tailscale Inc & AUTHORS
|
||||||
|
// SPDX-License-Identifier: BSD-3-Clause
|
||||||
|
|
||||||
|
package magicsock
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/netip"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"tailscale.com/types/key"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_peerMap_oneRelayEpAddrPerNK(t *testing.T) {
|
||||||
|
pm := newPeerMap()
|
||||||
|
nk := key.NewNode().Public()
|
||||||
|
ep := &endpoint{
|
||||||
|
nodeID: 1,
|
||||||
|
publicKey: nk,
|
||||||
|
}
|
||||||
|
ed := &endpointDisco{key: key.NewDisco().Public()}
|
||||||
|
ep.disco.Store(ed)
|
||||||
|
pm.upsertEndpoint(ep, key.DiscoPublic{})
|
||||||
|
vni := virtualNetworkID{}
|
||||||
|
vni.set(1)
|
||||||
|
relayEpAddrA := epAddr{ap: netip.MustParseAddrPort("127.0.0.1:1"), vni: vni}
|
||||||
|
relayEpAddrB := epAddr{ap: netip.MustParseAddrPort("127.0.0.1:2"), vni: vni}
|
||||||
|
pm.setNodeKeyForEpAddr(relayEpAddrA, nk)
|
||||||
|
pm.setNodeKeyForEpAddr(relayEpAddrB, nk)
|
||||||
|
if len(pm.byEpAddr) != 1 {
|
||||||
|
t.Fatalf("expected 1 epAddr in byEpAddr, got: %d", len(pm.byEpAddr))
|
||||||
|
}
|
||||||
|
got := pm.relayEpAddrByNodeKey[nk]
|
||||||
|
if got != relayEpAddrB {
|
||||||
|
t.Fatalf("expected relay epAddr %v, got: %v", relayEpAddrB, got)
|
||||||
|
}
|
||||||
|
}
|
@ -24,6 +24,11 @@ import (
|
|||||||
// relayManager manages allocation, handshaking, and initial probing (disco
|
// relayManager manages allocation, handshaking, and initial probing (disco
|
||||||
// ping/pong) of [tailscale.com/net/udprelay.Server] endpoints. The zero value
|
// ping/pong) of [tailscale.com/net/udprelay.Server] endpoints. The zero value
|
||||||
// is ready for use.
|
// is ready for use.
|
||||||
|
//
|
||||||
|
// [relayManager] methods can be called by [Conn] and [endpoint] while their .mu
|
||||||
|
// mutexes are held. Therefore, in order to avoid deadlocks, [relayManager] must
|
||||||
|
// never attempt to acquire those mutexes, including synchronous calls back
|
||||||
|
// towards [Conn] or [endpoint] methods that acquire them.
|
||||||
type relayManager struct {
|
type relayManager struct {
|
||||||
initOnce sync.Once
|
initOnce sync.Once
|
||||||
|
|
||||||
@ -164,6 +169,7 @@ func (r *relayManager) runLoop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type relayHandshakeDiscoMsgEvent struct {
|
type relayHandshakeDiscoMsgEvent struct {
|
||||||
|
conn *Conn // for access to [Conn] if there is no associated [relayHandshakeWork]
|
||||||
msg disco.Message
|
msg disco.Message
|
||||||
disco key.DiscoPublic
|
disco key.DiscoPublic
|
||||||
from netip.AddrPort
|
from netip.AddrPort
|
||||||
@ -366,7 +372,7 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc
|
|||||||
ok bool
|
ok bool
|
||||||
)
|
)
|
||||||
apv := addrPortVNI{event.from, event.vni}
|
apv := addrPortVNI{event.from, event.vni}
|
||||||
switch event.msg.(type) {
|
switch msg := event.msg.(type) {
|
||||||
case *disco.BindUDPRelayEndpointChallenge:
|
case *disco.BindUDPRelayEndpointChallenge:
|
||||||
work, ok = r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{event.disco, event.vni}]
|
work, ok = r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{event.disco, event.vni}]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -392,7 +398,29 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc
|
|||||||
// Update state so that future ping/pong will route to 'work'.
|
// Update state so that future ping/pong will route to 'work'.
|
||||||
r.handshakeWorkAwaitingPong[work] = apv
|
r.handshakeWorkAwaitingPong[work] = apv
|
||||||
r.addrPortVNIToHandshakeWork[apv] = work
|
r.addrPortVNIToHandshakeWork[apv] = work
|
||||||
case *disco.Ping, *disco.Pong:
|
case *disco.Ping:
|
||||||
|
// Always TX a pong. We might not have any associated work if ping
|
||||||
|
// reception raced with our call to [endpoint.relayEndpointReady()], so
|
||||||
|
// err on the side of enabling the remote side to use this path.
|
||||||
|
//
|
||||||
|
// Conn.handlePingLocked() makes efforts to suppress duplicate pongs
|
||||||
|
// where the same ping can be received both via raw socket and UDP
|
||||||
|
// socket on Linux. We make no such efforts here as the raw socket BPF
|
||||||
|
// program does not support Geneve-encapsulated disco, and is also
|
||||||
|
// disabled by default.
|
||||||
|
vni := virtualNetworkID{}
|
||||||
|
vni.set(event.vni)
|
||||||
|
go event.conn.sendDiscoMessage(epAddr{ap: event.from, vni: vni}, key.NodePublic{}, event.disco, &disco.Pong{
|
||||||
|
TxID: msg.TxID,
|
||||||
|
Src: event.from,
|
||||||
|
}, discoVerboseLog)
|
||||||
|
|
||||||
|
work, ok = r.addrPortVNIToHandshakeWork[apv]
|
||||||
|
if !ok {
|
||||||
|
// No outstanding work tied to this [addrPortVNI], return early.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case *disco.Pong:
|
||||||
work, ok = r.addrPortVNIToHandshakeWork[apv]
|
work, ok = r.addrPortVNIToHandshakeWork[apv]
|
||||||
if !ok {
|
if !ok {
|
||||||
// No outstanding work tied to this [addrPortVNI], discard.
|
// No outstanding work tied to this [addrPortVNI], discard.
|
||||||
@ -436,9 +464,13 @@ func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshak
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// This relay endpoint is functional.
|
// This relay endpoint is functional.
|
||||||
// TODO(jwhited): Set it on done.work.ep.bestAddr if it is a betterAddr().
|
vni := virtualNetworkID{}
|
||||||
// We also need to conn.peerMap.setNodeKeyForEpAddr(), and ensure we clean
|
vni.set(done.work.se.VNI)
|
||||||
// it up when bestAddr changes, too.
|
addr := epAddr{ap: done.pongReceivedFrom, vni: vni}
|
||||||
|
// ep.relayEndpointReady() must be called in a new goroutine to prevent
|
||||||
|
// deadlocks as it acquires [endpoint] & [Conn] mutexes. See [relayManager]
|
||||||
|
// docs for details.
|
||||||
|
go done.work.ep.relayEndpointReady(addr, done.latency)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) {
|
func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) {
|
||||||
@ -613,6 +645,9 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
|
|||||||
// latency, so send another ping. Since the handshake is
|
// latency, so send another ping. Since the handshake is
|
||||||
// complete we do not need to send an answer in front of this
|
// complete we do not need to send an answer in front of this
|
||||||
// one.
|
// one.
|
||||||
|
//
|
||||||
|
// We don't need to TX a pong, that was already handled for us
|
||||||
|
// in handleRxHandshakeDiscoMsgRunLoop().
|
||||||
txPing(msgEvent.from, nil)
|
txPing(msgEvent.from, nil)
|
||||||
case *disco.Pong:
|
case *disco.Pong:
|
||||||
at, ok := sentPingAt[msg.TxID]
|
at, ok := sentPingAt[msg.TxID]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user