From 6de4a021bb45e24aece07c3bf64dda15da49cfb0 Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Fri, 16 May 2025 13:51:40 -0700 Subject: [PATCH] wgengine/magicsock: implement relayManager handshaking (#15977) CallMeMaybeVia reception and endpoint allocation have been collapsed to a single event channel. discoInfo caching for active relay handshakes is now implemented. Updates tailscale/corp#27502 Signed-off-by: Jordan Whited --- wgengine/magicsock/endpoint.go | 2 +- wgengine/magicsock/magicsock.go | 2 +- wgengine/magicsock/relaymanager.go | 445 +++++++++++++++++++++--- wgengine/magicsock/relaymanager_test.go | 5 +- 4 files changed, 396 insertions(+), 58 deletions(-) diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index f88dab29d..e834c277c 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -1871,7 +1871,7 @@ func (de *endpoint) resetLocked() { } } de.probeUDPLifetime.resetCycleEndpointLocked() - de.c.relayManager.cancelOutstandingWork(de) + de.c.relayManager.stopWork(de) } func (de *endpoint) numStopAndReset() int64 { diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index cf3ef2352..05f4cf56d 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -1960,7 +1960,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke c.discoShort, epDisco.short, via.ServerDisco.ShortString(), ep.publicKey.ShortString(), derpStr(src.String()), len(via.AddrPorts)) - c.relayManager.handleCallMeMaybeVia(via) + c.relayManager.handleCallMeMaybeVia(ep, via) } else { c.dlogf("[v1] magicsock: disco: %v<-%v (%v, %v) got call-me-maybe, %d endpoints", c.discoShort, epDisco.short, diff --git a/wgengine/magicsock/relaymanager.go b/wgengine/magicsock/relaymanager.go index b1732ff41..a63754371 100644 --- a/wgengine/magicsock/relaymanager.go +++ b/wgengine/magicsock/relaymanager.go @@ -16,6 +16,7 @@ import ( "tailscale.com/disco" udprelay "tailscale.com/net/udprelay/endpoint" "tailscale.com/types/key" + "tailscale.com/types/ptr" "tailscale.com/util/httpm" "tailscale.com/util/set" ) @@ -28,21 +29,24 @@ type relayManager struct { // =================================================================== // The following fields are owned by a single goroutine, runLoop(). - serversByAddrPort set.Set[netip.AddrPort] - allocWorkByEndpoint map[*endpoint]*relayEndpointAllocWork + serversByAddrPort map[netip.AddrPort]key.DiscoPublic + serversByDisco map[key.DiscoPublic]netip.AddrPort + allocWorkByEndpoint map[*endpoint]*relayEndpointAllocWork + handshakeWorkByEndpointByServerDisco map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork + handshakeWorkByServerDiscoVNI map[serverDiscoVNI]*relayHandshakeWork // =================================================================== // The following chan fields serve event inputs to a single goroutine, // runLoop(). allocateHandshakeCh chan *endpoint allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent + handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent cancelWorkCh chan *endpoint newServerEndpointCh chan newRelayServerEndpointEvent rxChallengeCh chan relayHandshakeChallengeEvent - rxCallMeMaybeViaCh chan *disco.CallMeMaybeVia discoInfoMu sync.Mutex // guards the following field - discoInfoByServerDisco map[key.DiscoPublic]*discoInfo + discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo // runLoopStoppedCh is written to by runLoop() upon return, enabling event // writers to restart it when they are blocked (see @@ -50,21 +54,60 @@ type relayManager struct { runLoopStoppedCh chan struct{} } -type newRelayServerEndpointEvent struct { - ep *endpoint - se udprelay.ServerEndpoint +// serverDiscoVNI represents a [tailscale.com/net/udprelay.Server] disco key +// and Geneve header VNI value for a given [udprelay.ServerEndpoint]. +type serverDiscoVNI struct { + serverDisco key.DiscoPublic + vni uint32 } +// relayHandshakeWork serves to track in-progress relay handshake work for a +// [udprelay.ServerEndpoint]. This structure is immutable once initialized. +type relayHandshakeWork struct { + ep *endpoint + se udprelay.ServerEndpoint + + // In order to not deadlock, runLoop() must select{} read doneCh when + // attempting to write into rxChallengeCh, and the handshake work goroutine + // must close(doneCh) before attempting to write to + // relayManager.handshakeWorkDoneCh. + rxChallengeCh chan relayHandshakeChallengeEvent + doneCh chan struct{} + + ctx context.Context + cancel context.CancelFunc + wg *sync.WaitGroup +} + +// newRelayServerEndpointEvent indicates a new [udprelay.ServerEndpoint] has +// become known either via allocation with a relay server, or via +// [disco.CallMeMaybeVia] reception. This structure is immutable once +// initialized. +type newRelayServerEndpointEvent struct { + ep *endpoint + se udprelay.ServerEndpoint + server netip.AddrPort // zero value if learned via [disco.CallMeMaybeVia] +} + +// relayEndpointAllocWorkDoneEvent indicates relay server endpoint allocation +// work for an [*endpoint] has completed. This structure is immutable once +// initialized. type relayEndpointAllocWorkDoneEvent struct { - ep *endpoint work *relayEndpointAllocWork } -// activeWork returns true if there is outstanding allocation or handshaking -// work, otherwise it returns false. -func (r *relayManager) activeWork() bool { - return len(r.allocWorkByEndpoint) > 0 - // TODO(jwhited): consider handshaking work +// relayEndpointHandshakeWorkDoneEvent indicates relay server endpoint handshake +// work for an [*endpoint] has completed. This structure is immutable once +// initialized. +type relayEndpointHandshakeWorkDoneEvent struct { + work *relayHandshakeWork + answerSentTo netip.AddrPort // zero value if answer was not transmitted +} + +// activeWorkRunLoop returns true if there is outstanding allocation or +// handshaking work, otherwise it returns false. +func (r *relayManager) activeWorkRunLoop() bool { + return len(r.allocWorkByEndpoint) > 0 || len(r.handshakeWorkByEndpointByServerDisco) > 0 } // runLoop is a form of event loop. It ensures exclusive access to most of @@ -77,43 +120,40 @@ func (r *relayManager) runLoop() { for { select { case ep := <-r.allocateHandshakeCh: - r.cancelAndClearWork(ep) - r.allocateAllServersForEndpoint(ep) - if !r.activeWork() { + r.stopWorkRunLoop(ep, stopHandshakeWorkOnlyKnownServers) + r.allocateAllServersRunLoop(ep) + if !r.activeWorkRunLoop() { return } - case msg := <-r.allocateWorkDoneCh: - work, ok := r.allocWorkByEndpoint[msg.ep] - if ok && work == msg.work { + case done := <-r.allocateWorkDoneCh: + work, ok := r.allocWorkByEndpoint[done.work.ep] + if ok && work == done.work { // Verify the work in the map is the same as the one that we're // cleaning up. New events on r.allocateHandshakeCh can // overwrite pre-existing keys. - delete(r.allocWorkByEndpoint, msg.ep) + delete(r.allocWorkByEndpoint, done.work.ep) } - if !r.activeWork() { + if !r.activeWorkRunLoop() { return } case ep := <-r.cancelWorkCh: - r.cancelAndClearWork(ep) - if !r.activeWork() { + r.stopWorkRunLoop(ep, stopHandshakeWorkAllServers) + if !r.activeWorkRunLoop() { return } - case newEndpoint := <-r.newServerEndpointCh: - _ = newEndpoint - // TODO(jwhited): implement - if !r.activeWork() { + case newServerEndpoint := <-r.newServerEndpointCh: + r.handleNewServerEndpointRunLoop(newServerEndpoint) + if !r.activeWorkRunLoop() { + return + } + case done := <-r.handshakeWorkDoneCh: + r.handleHandshakeWorkDoneRunLoop(done) + if !r.activeWorkRunLoop() { return } case challenge := <-r.rxChallengeCh: - _ = challenge - // TODO(jwhited): implement - if !r.activeWork() { - return - } - case via := <-r.rxCallMeMaybeViaCh: - _ = via - // TODO(jwhited): implement - if !r.activeWork() { + r.handleRxChallengeRunLoop(challenge) + if !r.activeWorkRunLoop() { return } } @@ -142,30 +182,93 @@ type relayEndpointAllocWork struct { // init initializes [relayManager] if it is not already initialized. func (r *relayManager) init() { r.initOnce.Do(func() { - r.discoInfoByServerDisco = make(map[key.DiscoPublic]*discoInfo) + r.discoInfoByServerDisco = make(map[key.DiscoPublic]*relayHandshakeDiscoInfo) + r.serversByDisco = make(map[key.DiscoPublic]netip.AddrPort) + r.serversByAddrPort = make(map[netip.AddrPort]key.DiscoPublic) r.allocWorkByEndpoint = make(map[*endpoint]*relayEndpointAllocWork) + r.handshakeWorkByEndpointByServerDisco = make(map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork) + r.handshakeWorkByServerDiscoVNI = make(map[serverDiscoVNI]*relayHandshakeWork) r.allocateHandshakeCh = make(chan *endpoint) r.allocateWorkDoneCh = make(chan relayEndpointAllocWorkDoneEvent) + r.handshakeWorkDoneCh = make(chan relayEndpointHandshakeWorkDoneEvent) r.cancelWorkCh = make(chan *endpoint) r.newServerEndpointCh = make(chan newRelayServerEndpointEvent) r.rxChallengeCh = make(chan relayHandshakeChallengeEvent) - r.rxCallMeMaybeViaCh = make(chan *disco.CallMeMaybeVia) r.runLoopStoppedCh = make(chan struct{}, 1) go r.runLoop() }) } +// relayHandshakeDiscoInfo serves to cache a [*discoInfo] for outstanding +// [*relayHandshakeWork] against a given relay server. +type relayHandshakeDiscoInfo struct { + work set.Set[*relayHandshakeWork] // guarded by relayManager.discoInfoMu + di *discoInfo // immutable once initialized +} + +// ensureDiscoInfoFor ensures a [*discoInfo] will be returned by discoInfo() for +// the server disco key associated with 'work'. Callers must also call +// derefDiscoInfoFor() when 'work' is complete. +func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) { + r.discoInfoMu.Lock() + defer r.discoInfoMu.Unlock() + di, ok := r.discoInfoByServerDisco[work.se.ServerDisco] + if !ok { + di = &relayHandshakeDiscoInfo{} + di.work.Make() + r.discoInfoByServerDisco[work.se.ServerDisco] = di + } + di.work.Add(work) + if di.di == nil { + di.di = &discoInfo{ + discoKey: work.se.ServerDisco, + discoShort: work.se.ServerDisco.ShortString(), + sharedKey: work.ep.c.discoPrivate.Shared(work.se.ServerDisco), + } + } +} + +// derefDiscoInfoFor decrements the reference count of the [*discoInfo] +// associated with 'work'. +func (r *relayManager) derefDiscoInfoFor(work *relayHandshakeWork) { + r.discoInfoMu.Lock() + defer r.discoInfoMu.Unlock() + di, ok := r.discoInfoByServerDisco[work.se.ServerDisco] + if !ok { + // TODO(jwhited): unexpected + return + } + di.work.Delete(work) + if di.work.Len() == 0 { + delete(r.discoInfoByServerDisco, work.se.ServerDisco) + } +} + // discoInfo returns a [*discoInfo] for 'serverDisco' if there is an // active/ongoing handshake with it, otherwise it returns nil, false. func (r *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok bool) { r.discoInfoMu.Lock() defer r.discoInfoMu.Unlock() di, ok := r.discoInfoByServerDisco[serverDisco] - return di, ok + if ok { + return di.di, ok + } + return nil, false } -func (r *relayManager) handleCallMeMaybeVia(dm *disco.CallMeMaybeVia) { - relayManagerInputEvent(r, nil, &r.rxCallMeMaybeViaCh, dm) +func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, dm *disco.CallMeMaybeVia) { + se := udprelay.ServerEndpoint{ + ServerDisco: dm.ServerDisco, + LamportID: dm.LamportID, + AddrPorts: dm.AddrPorts, + VNI: dm.VNI, + } + se.BindLifetime.Duration = dm.BindLifetime + se.SteadyStateLifetime.Duration = dm.SteadyStateLifetime + relayManagerInputEvent(r, nil, &r.newServerEndpointCh, newRelayServerEndpointEvent{ + ep: ep, + se: se, + }) } func (r *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRelayEndpointChallenge, di *discoInfo, src netip.AddrPort, vni uint32) { @@ -178,9 +281,9 @@ func (r *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRela // [relayManager] initialization will make `*eventCh`, so it must be passed as // a pointer to a channel. // -// 'ctx' can be used for returning when runLoop is waiting for the caller to -// return, i.e. the calling goroutine was birthed by runLoop and is cancelable -// via 'ctx'. 'ctx' may be nil. +// 'ctx' can be used for returning when runLoop is waiting for the calling +// goroutine to return, i.e. the calling goroutine was birthed by runLoop and is +// cancelable via 'ctx'. 'ctx' may be nil. func relayManagerInputEvent[T any](r *relayManager, ctx context.Context, eventCh *chan T, event T) { r.init() var ctxDoneCh <-chan struct{} @@ -206,24 +309,258 @@ func (r *relayManager) allocateAndHandshakeAllServers(ep *endpoint) { relayManagerInputEvent(r, nil, &r.allocateHandshakeCh, ep) } -// cancelOutstandingWork cancels all outstanding allocation & handshaking work -// for 'ep'. -func (r *relayManager) cancelOutstandingWork(ep *endpoint) { +// stopWork stops all outstanding allocation & handshaking work for 'ep'. +func (r *relayManager) stopWork(ep *endpoint) { relayManagerInputEvent(r, nil, &r.cancelWorkCh, ep) } -// cancelAndClearWork cancels & clears any outstanding work for 'ep'. -func (r *relayManager) cancelAndClearWork(ep *endpoint) { +// stopHandshakeWorkFilter represents filters for handshake work cancellation +type stopHandshakeWorkFilter bool + +const ( + stopHandshakeWorkAllServers stopHandshakeWorkFilter = false + stopHandshakeWorkOnlyKnownServers = true +) + +// stopWorkRunLoop cancels & clears outstanding allocation and handshaking +// work for 'ep'. Handshake work cancellation is subject to the filter supplied +// in 'f'. +func (r *relayManager) stopWorkRunLoop(ep *endpoint, f stopHandshakeWorkFilter) { allocWork, ok := r.allocWorkByEndpoint[ep] if ok { allocWork.cancel() allocWork.wg.Wait() delete(r.allocWorkByEndpoint, ep) } - // TODO(jwhited): cancel & clear handshake work + byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[ep] + if ok { + for disco, handshakeWork := range byServerDisco { + _, knownServer := r.serversByDisco[disco] + if knownServer || f == stopHandshakeWorkAllServers { + handshakeWork.cancel() + handshakeWork.wg.Wait() + delete(byServerDisco, disco) + delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{handshakeWork.se.ServerDisco, handshakeWork.se.VNI}) + } + } + if len(byServerDisco) == 0 { + delete(r.handshakeWorkByEndpointByServerDisco, ep) + } + } } -func (r *relayManager) allocateAllServersForEndpoint(ep *endpoint) { +func (r *relayManager) handleRxChallengeRunLoop(challenge relayHandshakeChallengeEvent) { + work, ok := r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{challenge.disco, challenge.vni}] + if !ok { + return + } + select { + case <-work.doneCh: + return + case work.rxChallengeCh <- challenge: + return + } +} + +func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshakeWorkDoneEvent) { + byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[done.work.ep] + if !ok { + return + } + work, ok := byServerDisco[done.work.se.ServerDisco] + if !ok || work != done.work { + return + } + delete(byServerDisco, done.work.se.ServerDisco) + if len(byServerDisco) == 0 { + delete(r.handshakeWorkByEndpointByServerDisco, done.work.ep) + } + delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{done.work.se.ServerDisco, done.work.se.VNI}) + if !done.answerSentTo.IsValid() { + // The handshake timed out. + return + } + // We received a challenge from and transmitted an answer towards the relay + // server. + // TODO(jwhited): Make the associated [*endpoint] aware of this + // [tailscale.com/net/udprelay.ServerEndpoint]. +} + +func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) { + // Check for duplicate work by server disco + VNI. + sdv := serverDiscoVNI{newServerEndpoint.se.ServerDisco, newServerEndpoint.se.VNI} + existingWork, ok := r.handshakeWorkByServerDiscoVNI[sdv] + if ok { + // There's in-progress handshake work for the server disco + VNI, which + // uniquely identify a [udprelay.ServerEndpoint]. Compare Lamport + // IDs to determine which is newer. + if existingWork.se.LamportID >= newServerEndpoint.se.LamportID { + // The existing work is a duplicate or newer. Return early. + return + } + + // The existing work is no longer valid, clean it up. Be sure to lookup + // by the existing work's [*endpoint], not the incoming "new" work as + // they are not necessarily matching. + existingWork.cancel() + existingWork.wg.Wait() + delete(r.handshakeWorkByServerDiscoVNI, sdv) + byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[existingWork.ep] + if ok { + delete(byServerDisco, sdv.serverDisco) + if len(byServerDisco) == 0 { + delete(r.handshakeWorkByEndpointByServerDisco, existingWork.ep) + } + } + } + + // Check for duplicate work by [*endpoint] + server disco. + byServerDisco, ok := r.handshakeWorkByEndpointByServerDisco[newServerEndpoint.ep] + if ok { + existingWork, ok := byServerDisco[newServerEndpoint.se.ServerDisco] + if ok { + if newServerEndpoint.se.LamportID <= existingWork.se.LamportID { + // The "new" server endpoint is outdated or duplicate in + // consideration against existing handshake work. Return early. + return + } + // Cancel existing handshake that has a lower lamport ID. + existingWork.cancel() + existingWork.wg.Wait() + delete(r.handshakeWorkByServerDiscoVNI, sdv) + delete(byServerDisco, sdv.serverDisco) + if len(byServerDisco) == 0 { + delete(r.handshakeWorkByEndpointByServerDisco, existingWork.ep) + } + } + } + + // We're now reasonably sure we're dealing with the latest + // [udprelay.ServerEndpoint] from a server event order perspective + // (LamportID). Update server disco key tracking if appropriate. + if newServerEndpoint.server.IsValid() { + serverDisco, ok := r.serversByAddrPort[newServerEndpoint.server] + if !ok { + // Allocation raced with an update to our known servers set. This + // server is no longer known. Return early. + return + } + if serverDisco.Compare(newServerEndpoint.se.ServerDisco) != 0 { + // The server's disco key has either changed, or simply become + // known for the first time. In the former case we end up detaching + // any in-progress handshake work from a "known" relay server. + // Practically speaking we expect the detached work to fail + // if the server key did in fact change (server restart) while we + // were attempting to handshake with it. It is possible, though + // unlikely, for a server addr:port to effectively move between + // nodes. Either way, there is no harm in detaching existing work, + // and we explicitly let that happen for the rare case the detached + // handshake would complete and remain functional. + delete(r.serversByDisco, serverDisco) + delete(r.serversByAddrPort, newServerEndpoint.server) + r.serversByDisco[serverDisco] = newServerEndpoint.server + r.serversByAddrPort[newServerEndpoint.server] = serverDisco + } + } + + // We're ready to start a new handshake. + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + work := &relayHandshakeWork{ + ep: newServerEndpoint.ep, + se: newServerEndpoint.se, + doneCh: make(chan struct{}), + ctx: ctx, + cancel: cancel, + wg: wg, + } + if byServerDisco == nil { + byServerDisco = make(map[key.DiscoPublic]*relayHandshakeWork) + r.handshakeWorkByEndpointByServerDisco[newServerEndpoint.ep] = byServerDisco + } + byServerDisco[newServerEndpoint.se.ServerDisco] = work + r.handshakeWorkByServerDiscoVNI[sdv] = work + + wg.Add(1) + go r.handshakeServerEndpoint(work) +} + +func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) { + defer work.wg.Done() + + done := relayEndpointHandshakeWorkDoneEvent{work: work} + r.ensureDiscoInfoFor(work) + + defer func() { + r.derefDiscoInfoFor(work) + close(work.doneCh) + relayManagerInputEvent(r, work.ctx, &r.handshakeWorkDoneCh, done) + work.cancel() + }() + + sentBindAny := false + bind := &disco.BindUDPRelayEndpoint{} + for _, addrPort := range work.se.AddrPorts { + if addrPort.IsValid() { + sentBindAny = true + go work.ep.c.sendDiscoMessage(addrPort, ptr.To(work.se.VNI), key.NodePublic{}, work.se.ServerDisco, bind, discoLog) + } + } + if !sentBindAny { + return + } + + // Limit goroutine lifetime to a reasonable duration. This is intentionally + // detached and independent of 'BindLifetime' to prevent relay server + // (mis)configuration from negatively impacting client resource usage. + const maxHandshakeLifetime = time.Second * 30 + timer := time.NewTimer(min(work.se.BindLifetime.Duration, maxHandshakeLifetime)) + defer timer.Stop() + + // Wait for cancellation, a challenge to be rx'd, or handshake lifetime to + // expire. Our initial implementation values simplicity over other aspects, + // e.g. it is not resilient to any packet loss. + // + // We may want to eventually consider [disc.BindUDPRelayEndpoint] + // retransmission lacking challenge rx, and + // [disco.BindUDPRelayEndpointAnswer] duplication in front of + // [disco.Ping] until [disco.Ping] or [disco.Pong] is received. + select { + case <-work.ctx.Done(): + return + case challenge := <-work.rxChallengeCh: + answer := &disco.BindUDPRelayEndpointAnswer{Answer: challenge.challenge} + done.answerSentTo = challenge.from + // Send answer back to relay server. Typically sendDiscoMessage() calls + // are invoked via a new goroutine in attempt to limit crypto+syscall + // time contributing to system backpressure, and to fire roundtrip + // latency-relevant messages as closely together as possible. We + // intentionally don't do that here, because: + // 1. The primary backpressure concern is around the work.rxChallengeCh + // writer on the [Conn] packet rx path, who is already unblocked + // since we read from the channel. Relay servers only ever tx one + // challenge per rx'd bind message for a given (the first seen) src. + // 2. runLoop() may be waiting for this 'work' to complete if + // explicitly canceled for some reason elsewhere, but this is + // typically only around [*endpoint] and/or [Conn] shutdown. + // 3. It complicates the defer()'d [*discoInfo] deref and 'work' + // completion event order. sendDiscoMessage() assumes the related + // [*discoInfo] is still available. We also don't want the + // [*endpoint] to send a [disco.Ping] before the + // [disco.BindUDPRelayEndpointAnswer] has gone out, otherwise the + // remote side will never see the ping, delaying/preventing the + // [udprelay.ServerEndpoint] from becoming fully operational. + // 4. This is a singular tx with no roundtrip latency measurements + // involved. + work.ep.c.sendDiscoMessage(challenge.from, ptr.To(work.se.VNI), key.NodePublic{}, work.se.ServerDisco, answer, discoLog) + return + case <-timer.C: + // The handshake timed out. + return + } +} + +func (r *relayManager) allocateAllServersRunLoop(ep *endpoint) { if len(r.serversByAddrPort) == 0 { return } @@ -231,17 +568,17 @@ func (r *relayManager) allocateAllServersForEndpoint(ep *endpoint) { started := &relayEndpointAllocWork{ep: ep, cancel: cancel, wg: &sync.WaitGroup{}} for k := range r.serversByAddrPort { started.wg.Add(1) - go r.allocateEndpoint(ctx, started.wg, k, ep) + go r.allocateSingleServer(ctx, started.wg, k, ep) } r.allocWorkByEndpoint[ep] = started go func() { started.wg.Wait() started.cancel() - relayManagerInputEvent(r, ctx, &r.allocateWorkDoneCh, relayEndpointAllocWorkDoneEvent{ep: ep, work: started}) + relayManagerInputEvent(r, ctx, &r.allocateWorkDoneCh, relayEndpointAllocWorkDoneEvent{work: started}) }() } -func (r *relayManager) allocateEndpoint(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { +func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { // TODO(jwhited): introduce client metrics counters for notable failures defer wg.Done() var b bytes.Buffer diff --git a/wgengine/magicsock/relaymanager_test.go b/wgengine/magicsock/relaymanager_test.go index 579dceb53..3b75db9f6 100644 --- a/wgengine/magicsock/relaymanager_test.go +++ b/wgengine/magicsock/relaymanager_test.go @@ -8,6 +8,7 @@ import ( "testing" "tailscale.com/disco" + "tailscale.com/types/key" ) func TestRelayManagerInitAndIdle(t *testing.T) { @@ -16,11 +17,11 @@ func TestRelayManagerInitAndIdle(t *testing.T) { <-rm.runLoopStoppedCh rm = relayManager{} - rm.cancelOutstandingWork(&endpoint{}) + rm.stopWork(&endpoint{}) <-rm.runLoopStoppedCh rm = relayManager{} - rm.handleCallMeMaybeVia(&disco.CallMeMaybeVia{}) + rm.handleCallMeMaybeVia(&endpoint{c: &Conn{discoPrivate: key.NewDisco()}}, &disco.CallMeMaybeVia{ServerDisco: key.NewDisco().Public()}) <-rm.runLoopStoppedCh rm = relayManager{}