diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 9edc6403e..af4666665 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -100,25 +100,33 @@ type endpoint struct { relayCapable bool // whether the node is capable of speaking via a [tailscale.com/net/udprelay.Server] } -// relayEndpointReady determines whether the given relay addr should be -// installed as de.bestAddr. It is only called by [relayManager] once it has -// determined addr is functional via [disco.Pong] reception. -func (de *endpoint) relayEndpointReady(addr epAddr, latency time.Duration) { +// udpRelayEndpointReady determines whether the given relay [addrQuality] should +// be installed as de.bestAddr. It is only called by [relayManager] once it has +// determined maybeBest is functional via [disco.Pong] reception. +func (de *endpoint) udpRelayEndpointReady(maybeBest addrQuality) { de.c.mu.Lock() defer de.c.mu.Unlock() de.mu.Lock() defer de.mu.Unlock() - maybeBetter := addrQuality{addr, latency, pingSizeToPktLen(0, addr)} - if !betterAddr(maybeBetter, de.bestAddr) { + if maybeBest.relayServerDisco.Compare(de.bestAddr.relayServerDisco) == 0 { + // TODO(jwhited): add some observability for this case, e.g. did we + // flip transports during a de.bestAddr transition from untrusted to + // trusted? + // + // If these are equal we must set maybeBest as bestAddr, otherwise we + // could leave a stale bestAddr if it goes over a different + // address family or src. + } else if !betterAddr(maybeBest, de.bestAddr) { return } - // Promote maybeBetter to bestAddr. + // Promote maybeBest to bestAddr. // TODO(jwhited): collapse path change logging with endpoint.handlePongConnLocked() - de.c.logf("magicsock: disco: node %v %v now using %v mtu=%v", de.publicKey.ShortString(), de.discoShort(), maybeBetter.epAddr, maybeBetter.wireMTU) - de.setBestAddrLocked(maybeBetter) - de.c.peerMap.setNodeKeyForEpAddr(addr, de.publicKey) + de.c.logf("magicsock: disco: node %v %v now using %v mtu=%v", de.publicKey.ShortString(), de.discoShort(), maybeBest.epAddr, maybeBest.wireMTU) + de.setBestAddrLocked(maybeBest) + de.trustBestAddrUntil = mono.Now().Add(trustUDPAddrDuration) + de.c.peerMap.setNodeKeyForEpAddr(maybeBest.epAddr, de.publicKey) } func (de *endpoint) setBestAddrLocked(v addrQuality) { @@ -871,7 +879,9 @@ func (de *endpoint) discoverUDPRelayPathsLocked(now mono.Time) { // TODO(jwhited): return early if there are no relay servers set, otherwise // we spin up and down relayManager.runLoop unnecessarily. de.lastUDPRelayPathDiscovery = now - de.c.relayManager.allocateAndHandshakeAllServers(de) + lastBest := de.bestAddr + lastBestIsTrusted := mono.Now().Before(de.trustBestAddrUntil) + de.c.relayManager.startUDPRelayPathDiscoveryFor(de, lastBest, lastBestIsTrusted) } // wantUDPRelayPathDiscoveryLocked reports whether we should kick off UDP relay @@ -1714,7 +1724,16 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src epAdd // Promote this pong response to our current best address if it's lower latency. // TODO(bradfitz): decide how latency vs. preference order affects decision if !isDerp { - thisPong := addrQuality{sp.to, latency, tstun.WireMTU(pingSizeToPktLen(sp.size, sp.to))} + thisPong := addrQuality{ + epAddr: sp.to, + latency: latency, + wireMTU: pingSizeToPktLen(sp.size, sp.to), + } + // TODO(jwhited): consider checking de.trustBestAddrUntil as well. If + // de.bestAddr is untrusted we may want to clear it, otherwise we could + // get stuck with a forever untrusted bestAddr that blackholes, since + // we don't clear direct UDP paths on disco ping timeout (see + // discoPingTimeout). if betterAddr(thisPong, de.bestAddr) { if src.vni.isSet() { // This would be unexpected. Switching to a Geneve-encapsulated @@ -1765,14 +1784,17 @@ func (e epAddr) String() string { return fmt.Sprintf("%v:vni:%d", e.ap.String(), e.vni.get()) } -// addrQuality is an [epAddr] with an associated latency and path mtu. +// addrQuality is an [epAddr], an optional [key.DiscoPublic] if a relay server +// is associated, a round-trip latency measurement, and path mtu. type addrQuality struct { epAddr - latency time.Duration - wireMTU tstun.WireMTU + relayServerDisco key.DiscoPublic // only relevant if epAddr.vni.isSet(), otherwise zero value + latency time.Duration + wireMTU tstun.WireMTU } func (a addrQuality) String() string { + // TODO(jwhited): consider including relayServerDisco return fmt.Sprintf("%v@%v+%v", a.epAddr, a.latency, a.wireMTU) } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 553543b0f..0933c5be2 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -2137,6 +2137,8 @@ func (c *Conn) handleDiscoMessage(msg []byte, src epAddr, shouldBeRelayHandshake } ep.mu.Lock() relayCapable := ep.relayCapable + lastBest := ep.bestAddr + lastBestIsTrusted := mono.Now().Before(ep.trustBestAddrUntil) ep.mu.Unlock() if isVia && !relayCapable { c.logf("magicsock: disco: ignoring %s from %v; %v is not known to be relay capable", msgType, sender.ShortString(), sender.ShortString()) @@ -2156,7 +2158,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src epAddr, shouldBeRelayHandshake c.discoShort, epDisco.short, via.ServerDisco.ShortString(), ep.publicKey.ShortString(), derpStr(src.String()), len(via.AddrPorts)) - c.relayManager.handleCallMeMaybeVia(ep, via) + c.relayManager.handleCallMeMaybeVia(ep, lastBest, lastBestIsTrusted, via) } else { c.dlogf("[v1] magicsock: disco: %v<-%v (%v, %v) got call-me-maybe, %d endpoints", c.discoShort, epDisco.short, @@ -2254,7 +2256,7 @@ func (c *Conn) handlePingLocked(dm *disco.Ping, src epAddr, di *discoInfo, derpN // We have no [endpoint] in the [peerMap] for this relay [epAddr] // using it as a bestAddr. [relayManager] might be in the middle of // probing it or attempting to set it as best via - // [endpoint.relayEndpointReady()]. Make [relayManager] aware. + // [endpoint.udpRelayEndpointReady()]. Make [relayManager] aware. c.relayManager.handleGeneveEncapDiscoMsgNotBestAddr(c, dm, di, src) return } diff --git a/wgengine/magicsock/relaymanager.go b/wgengine/magicsock/relaymanager.go index 6418a4364..1c173c01a 100644 --- a/wgengine/magicsock/relaymanager.go +++ b/wgengine/magicsock/relaymanager.go @@ -50,7 +50,7 @@ type relayManager struct { // =================================================================== // The following chan fields serve event inputs to a single goroutine, // runLoop(). - allocateHandshakeCh chan *endpoint + startDiscoveryCh chan endpointWithLastBest allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent cancelWorkCh chan *endpoint @@ -77,8 +77,8 @@ type serverDiscoVNI struct { // relayHandshakeWork serves to track in-progress relay handshake work for a // [udprelay.ServerEndpoint]. This structure is immutable once initialized. type relayHandshakeWork struct { - ep *endpoint - se udprelay.ServerEndpoint + wlb endpointWithLastBest + se udprelay.ServerEndpoint // handshakeServerEndpoint() always writes to doneCh (len 1) when it // returns. It may end up writing the same event afterward to @@ -97,7 +97,7 @@ type relayHandshakeWork struct { // [disco.CallMeMaybeVia] reception. This structure is immutable once // initialized. type newRelayServerEndpointEvent struct { - ep *endpoint + wlb endpointWithLastBest se udprelay.ServerEndpoint server netip.AddrPort // zero value if learned via [disco.CallMeMaybeVia] } @@ -142,9 +142,9 @@ func (r *relayManager) runLoop() { for { select { - case ep := <-r.allocateHandshakeCh: - if !r.hasActiveWorkForEndpointRunLoop(ep) { - r.allocateAllServersRunLoop(ep) + case startDiscovery := <-r.startDiscoveryCh: + if !r.hasActiveWorkForEndpointRunLoop(startDiscovery.ep) { + r.allocateAllServersRunLoop(startDiscovery) } if !r.hasActiveWorkRunLoop() { return @@ -153,7 +153,7 @@ func (r *relayManager) runLoop() { work, ok := r.allocWorkByEndpoint[done.work.ep] if ok && work == done.work { // Verify the work in the map is the same as the one that we're - // cleaning up. New events on r.allocateHandshakeCh can + // cleaning up. New events on r.startDiscoveryCh can // overwrite pre-existing keys. delete(r.allocWorkByEndpoint, done.work.ep) } @@ -237,7 +237,7 @@ func (r *relayManager) init() { r.handshakeWorkByServerDiscoVNI = make(map[serverDiscoVNI]*relayHandshakeWork) r.handshakeWorkAwaitingPong = make(map[*relayHandshakeWork]addrPortVNI) r.addrPortVNIToHandshakeWork = make(map[addrPortVNI]*relayHandshakeWork) - r.allocateHandshakeCh = make(chan *endpoint) + r.startDiscoveryCh = make(chan endpointWithLastBest) r.allocateWorkDoneCh = make(chan relayEndpointAllocWorkDoneEvent) r.handshakeWorkDoneCh = make(chan relayEndpointHandshakeWorkDoneEvent) r.cancelWorkCh = make(chan *endpoint) @@ -273,7 +273,7 @@ func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) { di.di = &discoInfo{ discoKey: work.se.ServerDisco, discoShort: work.se.ServerDisco.ShortString(), - sharedKey: work.ep.c.discoPrivate.Shared(work.se.ServerDisco), + sharedKey: work.wlb.ep.c.discoPrivate.Shared(work.se.ServerDisco), } } } @@ -306,7 +306,7 @@ func (r *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok return nil, false } -func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, dm *disco.CallMeMaybeVia) { +func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, lastBest addrQuality, lastBestIsTrusted bool, dm *disco.CallMeMaybeVia) { se := udprelay.ServerEndpoint{ ServerDisco: dm.ServerDisco, LamportID: dm.LamportID, @@ -316,7 +316,11 @@ func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, dm *disco.CallMeMaybeV se.BindLifetime.Duration = dm.BindLifetime se.SteadyStateLifetime.Duration = dm.SteadyStateLifetime relayManagerInputEvent(r, nil, &r.newServerEndpointCh, newRelayServerEndpointEvent{ - ep: ep, + wlb: endpointWithLastBest{ + ep: ep, + lastBest: lastBest, + lastBestIsTrusted: lastBestIsTrusted, + }, se: se, }) } @@ -360,11 +364,19 @@ func relayManagerInputEvent[T any](r *relayManager, ctx context.Context, eventCh } } -// allocateAndHandshakeAllServers kicks off allocation and handshaking of relay -// endpoints for 'ep' on all known relay servers if there is no outstanding -// work. -func (r *relayManager) allocateAndHandshakeAllServers(ep *endpoint) { - relayManagerInputEvent(r, nil, &r.allocateHandshakeCh, ep) +// endpointWithLastBest represents an [*endpoint], its last bestAddr, and if +// the last bestAddr was trusted (see endpoint.trustBestAddrUntil) at the time +// of init. This structure is immutable once initialized. +type endpointWithLastBest struct { + ep *endpoint + lastBest addrQuality + lastBestIsTrusted bool +} + +// startUDPRelayPathDiscoveryFor starts UDP relay path discovery for ep on all +// known relay servers if ep has no in-progress work. +func (r *relayManager) startUDPRelayPathDiscoveryFor(ep *endpoint, lastBest addrQuality, lastBestIsTrusted bool) { + relayManagerInputEvent(r, nil, &r.startDiscoveryCh, endpointWithLastBest{ep, lastBest, lastBestIsTrusted}) } // stopWork stops all outstanding allocation & handshaking work for 'ep'. @@ -432,7 +444,7 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc r.addrPortVNIToHandshakeWork[apv] = work case *disco.Ping: // Always TX a pong. We might not have any associated work if ping - // reception raced with our call to [endpoint.relayEndpointReady()], so + // reception raced with our call to [endpoint.udpRelayEndpointReady()], so // err on the side of enabling the remote side to use this path. // // Conn.handlePingLocked() makes efforts to suppress duplicate pongs @@ -473,7 +485,7 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc } func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshakeWorkDoneEvent) { - byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[done.work.ep] + byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[done.work.wlb.ep] if !ok { return } @@ -483,7 +495,7 @@ func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshak } delete(byServerDisco, done.work.se.ServerDisco) if len(byServerDisco) == 0 { - delete(r.handshakeWorkByEndpointByServerDisco, done.work.ep) + delete(r.handshakeWorkByEndpointByServerDisco, done.work.wlb.ep) } delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{done.work.se.ServerDisco, done.work.se.VNI}) apv, ok := r.handshakeWorkAwaitingPong[work] @@ -499,10 +511,15 @@ func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshak vni := virtualNetworkID{} vni.set(done.work.se.VNI) addr := epAddr{ap: done.pongReceivedFrom, vni: vni} - // ep.relayEndpointReady() must be called in a new goroutine to prevent + // ep.udpRelayEndpointReady() must be called in a new goroutine to prevent // deadlocks as it acquires [endpoint] & [Conn] mutexes. See [relayManager] // docs for details. - go done.work.ep.relayEndpointReady(addr, done.latency) + go done.work.wlb.ep.udpRelayEndpointReady(addrQuality{ + epAddr: addr, + relayServerDisco: done.work.se.ServerDisco, + latency: done.latency, + wireMTU: pingSizeToPktLen(0, addr), + }) } func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) { @@ -525,7 +542,7 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay } // Check for duplicate work by [*endpoint] + server disco. - byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[newServerEndpoint.ep] + byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[newServerEndpoint.wlb.ep] if ok { existingWork, ok := byServerDisco[newServerEndpoint.se.ServerDisco] if ok { @@ -569,10 +586,40 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay } } + if newServerEndpoint.server.IsValid() { + // Send a [disco.CallMeMaybeVia] to the remote peer if we allocated this + // endpoint, regardless of if we start a handshake below. + go r.sendCallMeMaybeVia(newServerEndpoint.wlb.ep, newServerEndpoint.se) + } + + lastBestMatchingServer := newServerEndpoint.se.ServerDisco.Compare(newServerEndpoint.wlb.lastBest.relayServerDisco) == 0 + if lastBestMatchingServer && newServerEndpoint.wlb.lastBestIsTrusted { + // This relay server endpoint is the same as [endpoint]'s bestAddr at + // the time UDP relay path discovery was started, and it was also a + // trusted path (see endpoint.trustBestAddrUntil), so return early. + // + // If we were to start a new handshake, there is a chance that we + // cause [endpoint] to blackhole some packets on its bestAddr if we end + // up shifting to a new address family or src, e.g. IPv4 to IPv6, due to + // the window of time between the handshake completing, and our call to + // udpRelayEndpointReady(). The relay server can only forward packets + // from us on a single [epAddr]. + return + } + + // TODO(jwhited): if lastBest is untrusted, consider some strategies + // to reduce the chance we blackhole if it were to transition to + // trusted during/before the new handshake: + // 1. Start by attempting a handshake with only lastBest.epAddr. If + // that fails then try the remaining [epAddr]s. + // 2. Signal bestAddr trust transitions between [endpoint] and + // [relayManager] in order to prevent a handshake from starting + // and/or stop one that is running. + // We're ready to start a new handshake. ctx, cancel := context.WithCancel(context.Background()) work := &relayHandshakeWork{ - ep: newServerEndpoint.ep, + wlb: newServerEndpoint.wlb, se: newServerEndpoint.se, rxDiscoMsgCh: make(chan relayHandshakeDiscoMsgEvent), doneCh: make(chan relayEndpointHandshakeWorkDoneEvent, 1), @@ -581,16 +628,11 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay } if byServerDisco == nil { byServerDisco = make(map[key.DiscoPublic]*relayHandshakeWork) - r.handshakeWorkByEndpointByServerDisco[newServerEndpoint.ep] = byServerDisco + r.handshakeWorkByEndpointByServerDisco[newServerEndpoint.wlb.ep] = byServerDisco } byServerDisco[newServerEndpoint.se.ServerDisco] = work r.handshakeWorkByServerDiscoVNI[sdv] = work - if newServerEndpoint.server.IsValid() { - // Send CallMeMaybeVia to the remote peer if we allocated this endpoint. - go r.sendCallMeMaybeVia(work.ep, work.se) - } - r.handshakeGeneration++ if r.handshakeGeneration == 0 { // generation must be nonzero r.handshakeGeneration++ @@ -633,7 +675,8 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat work.cancel() }() - epDisco := work.ep.disco.Load() + ep := work.wlb.ep + epDisco := ep.disco.Load() if epDisco == nil { return } @@ -653,7 +696,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat for _, addrPort := range work.se.AddrPorts { if addrPort.IsValid() { sentBindAny = true - go work.ep.c.sendDiscoMessage(epAddr{ap: addrPort, vni: vni}, key.NodePublic{}, work.se.ServerDisco, bind, discoVerboseLog) + go ep.c.sendDiscoMessage(epAddr{ap: addrPort, vni: vni}, key.NodePublic{}, work.se.ServerDisco, bind, discoVerboseLog) } } if !sentBindAny { @@ -684,15 +727,15 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat sentPingAt[txid] = time.Now() ping := &disco.Ping{ TxID: txid, - NodeKey: work.ep.c.publicKeyAtomic.Load(), + NodeKey: ep.c.publicKeyAtomic.Load(), } go func() { if withAnswer != nil { answer := &disco.BindUDPRelayEndpointAnswer{BindUDPRelayEndpointCommon: common} answer.Challenge = *withAnswer - work.ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog) + ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog) } - work.ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, epDisco.key, ping, discoVerboseLog) + ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, epDisco.key, ping, discoVerboseLog) }() } @@ -760,17 +803,17 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat } } -func (r *relayManager) allocateAllServersRunLoop(ep *endpoint) { +func (r *relayManager) allocateAllServersRunLoop(wlb endpointWithLastBest) { if len(r.serversByAddrPort) == 0 { return } ctx, cancel := context.WithCancel(context.Background()) - started := &relayEndpointAllocWork{ep: ep, cancel: cancel, wg: &sync.WaitGroup{}} + started := &relayEndpointAllocWork{ep: wlb.ep, cancel: cancel, wg: &sync.WaitGroup{}} for k := range r.serversByAddrPort { started.wg.Add(1) - go r.allocateSingleServer(ctx, started.wg, k, ep) + go r.allocateSingleServer(ctx, started.wg, k, wlb) } - r.allocWorkByEndpoint[ep] = started + r.allocWorkByEndpoint[wlb.ep] = started go func() { started.wg.Wait() relayManagerInputEvent(r, ctx, &r.allocateWorkDoneCh, relayEndpointAllocWorkDoneEvent{work: started}) @@ -829,25 +872,25 @@ func doAllocate(ctx context.Context, server netip.AddrPort, discoKeys [2]key.Dis } } -func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { +func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, wlb endpointWithLastBest) { // TODO(jwhited): introduce client metrics counters for notable failures defer wg.Done() - remoteDisco := ep.disco.Load() + remoteDisco := wlb.ep.disco.Load() if remoteDisco == nil { return } firstTry := true for { - se, err := doAllocate(ctx, server, [2]key.DiscoPublic{ep.c.discoPublic, remoteDisco.key}) + se, err := doAllocate(ctx, server, [2]key.DiscoPublic{wlb.ep.c.discoPublic, remoteDisco.key}) if err == nil { relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{ - ep: ep, + wlb: wlb, se: se, server: server, // we allocated this endpoint (vs CallMeMaybeVia reception), mark it as such }) return } - ep.c.logf("[v1] magicsock: relayManager: error allocating endpoint on %v for %v: %v", server, ep.discoShort(), err) + wlb.ep.c.logf("[v1] magicsock: relayManager: error allocating endpoint on %v for %v: %v", server, wlb.ep.discoShort(), err) var notReady errNotReady if firstTry && errors.As(err, ¬Ready) { select { diff --git a/wgengine/magicsock/relaymanager_test.go b/wgengine/magicsock/relaymanager_test.go index de282b499..8feff2f3d 100644 --- a/wgengine/magicsock/relaymanager_test.go +++ b/wgengine/magicsock/relaymanager_test.go @@ -14,7 +14,7 @@ import ( func TestRelayManagerInitAndIdle(t *testing.T) { rm := relayManager{} - rm.allocateAndHandshakeAllServers(&endpoint{}) + rm.startUDPRelayPathDiscoveryFor(&endpoint{}, addrQuality{}, false) <-rm.runLoopStoppedCh rm = relayManager{} @@ -22,7 +22,7 @@ func TestRelayManagerInitAndIdle(t *testing.T) { <-rm.runLoopStoppedCh rm = relayManager{} - rm.handleCallMeMaybeVia(&endpoint{c: &Conn{discoPrivate: key.NewDisco()}}, &disco.CallMeMaybeVia{ServerDisco: key.NewDisco().Public()}) + rm.handleCallMeMaybeVia(&endpoint{c: &Conn{discoPrivate: key.NewDisco()}}, addrQuality{}, false, &disco.CallMeMaybeVia{ServerDisco: key.NewDisco().Public()}) <-rm.runLoopStoppedCh rm = relayManager{}