// Copyright (c) Tailscale Inc & AUTHORS // SPDX-License-Identifier: BSD-3-Clause package magicsock import ( "context" "errors" "net/netip" "sync" "time" "tailscale.com/disco" "tailscale.com/net/stun" udprelay "tailscale.com/net/udprelay/endpoint" "tailscale.com/tailcfg" "tailscale.com/tstime" "tailscale.com/types/key" "tailscale.com/util/set" ) // relayManager manages allocation, handshaking, and initial probing (disco // ping/pong) of [tailscale.com/net/udprelay.Server] endpoints. The zero value // is ready for use. // // [relayManager] methods can be called by [Conn] and [endpoint] while their .mu // mutexes are held. Therefore, in order to avoid deadlocks, [relayManager] must // never attempt to acquire those mutexes synchronously from its runLoop(), // including synchronous calls back towards [Conn] or [endpoint] methods that // acquire them. type relayManager struct { initOnce sync.Once // =================================================================== // The following fields are owned by a single goroutine, runLoop(). serversByNodeKey map[key.NodePublic]candidatePeerRelay allocWorkByCandidatePeerRelayByEndpoint map[*endpoint]map[candidatePeerRelay]*relayEndpointAllocWork allocWorkByDiscoKeysByServerNodeKey map[key.NodePublic]map[key.SortedPairOfDiscoPublic]*relayEndpointAllocWork handshakeWorkByServerDiscoByEndpoint map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork handshakeWorkByServerDiscoVNI map[serverDiscoVNI]*relayHandshakeWork handshakeWorkAwaitingPong map[*relayHandshakeWork]addrPortVNI addrPortVNIToHandshakeWork map[addrPortVNI]*relayHandshakeWork handshakeGeneration uint32 allocGeneration uint32 // =================================================================== // The following chan fields serve event inputs to a single goroutine, // runLoop(). startDiscoveryCh chan endpointWithLastBest allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent cancelWorkCh chan *endpoint newServerEndpointCh chan newRelayServerEndpointEvent rxDiscoMsgCh chan relayDiscoMsgEvent serversCh chan set.Set[candidatePeerRelay] getServersCh chan chan set.Set[candidatePeerRelay] derpHomeChangeCh chan derpHomeChangeEvent discoInfoMu sync.Mutex // guards the following field discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo // runLoopStoppedCh is written to by runLoop() upon return, enabling event // writers to restart it when they are blocked (see // relayManagerInputEvent()). runLoopStoppedCh chan struct{} } // serverDiscoVNI represents a [tailscale.com/net/udprelay.Server] disco key // and Geneve header VNI value for a given [udprelay.ServerEndpoint]. type serverDiscoVNI struct { serverDisco key.DiscoPublic vni uint32 } // relayHandshakeWork serves to track in-progress relay handshake work for a // [udprelay.ServerEndpoint]. This structure is immutable once initialized. type relayHandshakeWork struct { wlb endpointWithLastBest se udprelay.ServerEndpoint // handshakeServerEndpoint() always writes to doneCh (len 1) when it // returns. It may end up writing the same event afterward to // relayManager.handshakeWorkDoneCh if runLoop() can receive it. runLoop() // must select{} read on doneCh to prevent deadlock when attempting to write // to rxDiscoMsgCh. rxDiscoMsgCh chan relayDiscoMsgEvent doneCh chan relayEndpointHandshakeWorkDoneEvent ctx context.Context cancel context.CancelFunc } // newRelayServerEndpointEvent indicates a new [udprelay.ServerEndpoint] has // become known either via allocation with a relay server, or via // [disco.CallMeMaybeVia] reception. This structure is immutable once // initialized. type newRelayServerEndpointEvent struct { wlb endpointWithLastBest se udprelay.ServerEndpoint server candidatePeerRelay // zero value if learned via [disco.CallMeMaybeVia] } // relayEndpointAllocWorkDoneEvent indicates relay server endpoint allocation // work for an [*endpoint] has completed. This structure is immutable once // initialized. type relayEndpointAllocWorkDoneEvent struct { work *relayEndpointAllocWork allocated udprelay.ServerEndpoint // !allocated.ServerDisco.IsZero() if allocation succeeded } // relayEndpointHandshakeWorkDoneEvent indicates relay server endpoint handshake // work for an [*endpoint] has completed. This structure is immutable once // initialized. type relayEndpointHandshakeWorkDoneEvent struct { work *relayHandshakeWork pongReceivedFrom netip.AddrPort // or zero value if handshake or ping/pong did not complete latency time.Duration // only relevant if pongReceivedFrom.IsValid() } // hasActiveWorkRunLoop returns true if there is outstanding allocation or // handshaking work for any endpoint, otherwise it returns false. func (r *relayManager) hasActiveWorkRunLoop() bool { return len(r.allocWorkByCandidatePeerRelayByEndpoint) > 0 || len(r.handshakeWorkByServerDiscoByEndpoint) > 0 } // hasActiveWorkForEndpointRunLoop returns true if there is outstanding // allocation or handshaking work for the provided endpoint, otherwise it // returns false. func (r *relayManager) hasActiveWorkForEndpointRunLoop(ep *endpoint) bool { _, handshakeWork := r.handshakeWorkByServerDiscoByEndpoint[ep] _, allocWork := r.allocWorkByCandidatePeerRelayByEndpoint[ep] return handshakeWork || allocWork } // derpHomeChangeEvent represents a change in the DERP home region for the // node identified by nodeKey. This structure is immutable once initialized. type derpHomeChangeEvent struct { nodeKey key.NodePublic regionID uint16 } // handleDERPHomeChange handles a DERP home change event for nodeKey and // regionID. func (r *relayManager) handleDERPHomeChange(nodeKey key.NodePublic, regionID uint16) { relayManagerInputEvent(r, nil, &r.derpHomeChangeCh, derpHomeChangeEvent{ nodeKey: nodeKey, regionID: regionID, }) } func (r *relayManager) handleDERPHomeChangeRunLoop(event derpHomeChangeEvent) { c, ok := r.serversByNodeKey[event.nodeKey] if ok { c.derpHomeRegionID = event.regionID r.serversByNodeKey[event.nodeKey] = c } } // runLoop is a form of event loop. It ensures exclusive access to most of // [relayManager] state. func (r *relayManager) runLoop() { defer func() { r.runLoopStoppedCh <- struct{}{} }() for { select { case startDiscovery := <-r.startDiscoveryCh: if !r.hasActiveWorkForEndpointRunLoop(startDiscovery.ep) { r.allocateAllServersRunLoop(startDiscovery) } if !r.hasActiveWorkRunLoop() { return } case done := <-r.allocateWorkDoneCh: r.handleAllocWorkDoneRunLoop(done) if !r.hasActiveWorkRunLoop() { return } case ep := <-r.cancelWorkCh: r.stopWorkRunLoop(ep) if !r.hasActiveWorkRunLoop() { return } case newServerEndpoint := <-r.newServerEndpointCh: r.handleNewServerEndpointRunLoop(newServerEndpoint) if !r.hasActiveWorkRunLoop() { return } case done := <-r.handshakeWorkDoneCh: r.handleHandshakeWorkDoneRunLoop(done) if !r.hasActiveWorkRunLoop() { return } case discoMsgEvent := <-r.rxDiscoMsgCh: r.handleRxDiscoMsgRunLoop(discoMsgEvent) if !r.hasActiveWorkRunLoop() { return } case serversUpdate := <-r.serversCh: r.handleServersUpdateRunLoop(serversUpdate) if !r.hasActiveWorkRunLoop() { return } case getServersCh := <-r.getServersCh: r.handleGetServersRunLoop(getServersCh) if !r.hasActiveWorkRunLoop() { return } case derpHomeChange := <-r.derpHomeChangeCh: r.handleDERPHomeChangeRunLoop(derpHomeChange) if !r.hasActiveWorkRunLoop() { return } } } } func (r *relayManager) handleGetServersRunLoop(getServersCh chan set.Set[candidatePeerRelay]) { servers := make(set.Set[candidatePeerRelay], len(r.serversByNodeKey)) for _, v := range r.serversByNodeKey { servers.Add(v) } getServersCh <- servers } func (r *relayManager) getServers() set.Set[candidatePeerRelay] { ch := make(chan set.Set[candidatePeerRelay]) relayManagerInputEvent(r, nil, &r.getServersCh, ch) return <-ch } func (r *relayManager) handleServersUpdateRunLoop(update set.Set[candidatePeerRelay]) { for _, v := range r.serversByNodeKey { if !update.Contains(v) { delete(r.serversByNodeKey, v.nodeKey) } } for _, v := range update.Slice() { r.serversByNodeKey[v.nodeKey] = v } } type relayDiscoMsgEvent struct { conn *Conn // for access to [Conn] if there is no associated [relayHandshakeWork] msg disco.Message relayServerNodeKey key.NodePublic // nonzero if msg is a [*disco.AllocateUDPRelayEndpointResponse] disco key.DiscoPublic from netip.AddrPort vni uint32 at time.Time } // relayEndpointAllocWork serves to track in-progress relay endpoint allocation // for an [*endpoint]. This structure is immutable once initialized. type relayEndpointAllocWork struct { wlb endpointWithLastBest discoKeys key.SortedPairOfDiscoPublic candidatePeerRelay candidatePeerRelay // allocateServerEndpoint() always writes to doneCh (len 1) when it // returns. It may end up writing the same event afterward to // [relayManager.allocateWorkDoneCh] if runLoop() can receive it. runLoop() // must select{} read on doneCh to prevent deadlock when attempting to write // to rxDiscoMsgCh. rxDiscoMsgCh chan *disco.AllocateUDPRelayEndpointResponse doneCh chan relayEndpointAllocWorkDoneEvent ctx context.Context cancel context.CancelFunc } // init initializes [relayManager] if it is not already initialized. func (r *relayManager) init() { r.initOnce.Do(func() { r.discoInfoByServerDisco = make(map[key.DiscoPublic]*relayHandshakeDiscoInfo) r.serversByNodeKey = make(map[key.NodePublic]candidatePeerRelay) r.allocWorkByCandidatePeerRelayByEndpoint = make(map[*endpoint]map[candidatePeerRelay]*relayEndpointAllocWork) r.allocWorkByDiscoKeysByServerNodeKey = make(map[key.NodePublic]map[key.SortedPairOfDiscoPublic]*relayEndpointAllocWork) r.handshakeWorkByServerDiscoByEndpoint = make(map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork) r.handshakeWorkByServerDiscoVNI = make(map[serverDiscoVNI]*relayHandshakeWork) r.handshakeWorkAwaitingPong = make(map[*relayHandshakeWork]addrPortVNI) r.addrPortVNIToHandshakeWork = make(map[addrPortVNI]*relayHandshakeWork) r.startDiscoveryCh = make(chan endpointWithLastBest) r.allocateWorkDoneCh = make(chan relayEndpointAllocWorkDoneEvent) r.handshakeWorkDoneCh = make(chan relayEndpointHandshakeWorkDoneEvent) r.cancelWorkCh = make(chan *endpoint) r.newServerEndpointCh = make(chan newRelayServerEndpointEvent) r.rxDiscoMsgCh = make(chan relayDiscoMsgEvent) r.serversCh = make(chan set.Set[candidatePeerRelay]) r.getServersCh = make(chan chan set.Set[candidatePeerRelay]) r.derpHomeChangeCh = make(chan derpHomeChangeEvent) r.runLoopStoppedCh = make(chan struct{}, 1) r.runLoopStoppedCh <- struct{}{} }) } // relayHandshakeDiscoInfo serves to cache a [*discoInfo] for outstanding // [*relayHandshakeWork] against a given relay server. type relayHandshakeDiscoInfo struct { work set.Set[*relayHandshakeWork] // guarded by relayManager.discoInfoMu di *discoInfo // immutable once initialized } // ensureDiscoInfoFor ensures a [*discoInfo] will be returned by discoInfo() for // the server disco key associated with 'work'. Callers must also call // derefDiscoInfoFor() when 'work' is complete. func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) { r.discoInfoMu.Lock() defer r.discoInfoMu.Unlock() di, ok := r.discoInfoByServerDisco[work.se.ServerDisco] if !ok { di = &relayHandshakeDiscoInfo{} di.work.Make() r.discoInfoByServerDisco[work.se.ServerDisco] = di } di.work.Add(work) if di.di == nil { di.di = &discoInfo{ discoKey: work.se.ServerDisco, discoShort: work.se.ServerDisco.ShortString(), sharedKey: work.wlb.ep.c.discoPrivate.Shared(work.se.ServerDisco), } } } // derefDiscoInfoFor decrements the reference count of the [*discoInfo] // associated with 'work'. func (r *relayManager) derefDiscoInfoFor(work *relayHandshakeWork) { r.discoInfoMu.Lock() defer r.discoInfoMu.Unlock() di, ok := r.discoInfoByServerDisco[work.se.ServerDisco] if !ok { // TODO(jwhited): unexpected return } di.work.Delete(work) if di.work.Len() == 0 { delete(r.discoInfoByServerDisco, work.se.ServerDisco) } } // discoInfo returns a [*discoInfo] for 'serverDisco' if there is an // active/ongoing handshake with it, otherwise it returns nil, false. func (r *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok bool) { r.discoInfoMu.Lock() defer r.discoInfoMu.Unlock() di, ok := r.discoInfoByServerDisco[serverDisco] if ok { return di.di, ok } return nil, false } func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, lastBest addrQuality, lastBestIsTrusted bool, dm *disco.CallMeMaybeVia) { se := udprelay.ServerEndpoint{ ServerDisco: dm.ServerDisco, ClientDisco: dm.ClientDisco, LamportID: dm.LamportID, AddrPorts: dm.AddrPorts, VNI: dm.VNI, } se.BindLifetime.Duration = dm.BindLifetime se.SteadyStateLifetime.Duration = dm.SteadyStateLifetime relayManagerInputEvent(r, nil, &r.newServerEndpointCh, newRelayServerEndpointEvent{ wlb: endpointWithLastBest{ ep: ep, lastBest: lastBest, lastBestIsTrusted: lastBestIsTrusted, }, se: se, }) } // handleRxDiscoMsg handles reception of disco messages that [relayManager] // may be interested in. This includes all Geneve-encapsulated disco messages // and [*disco.AllocateUDPRelayEndpointResponse]. If dm is a // [*disco.AllocateUDPRelayEndpointResponse] then relayServerNodeKey must be // nonzero. func (r *relayManager) handleRxDiscoMsg(conn *Conn, dm disco.Message, relayServerNodeKey key.NodePublic, discoKey key.DiscoPublic, src epAddr) { relayManagerInputEvent(r, nil, &r.rxDiscoMsgCh, relayDiscoMsgEvent{ conn: conn, msg: dm, relayServerNodeKey: relayServerNodeKey, disco: discoKey, from: src.ap, vni: src.vni.get(), at: time.Now(), }) } // handleRelayServersSet handles an update of the complete relay server set. func (r *relayManager) handleRelayServersSet(servers set.Set[candidatePeerRelay]) { relayManagerInputEvent(r, nil, &r.serversCh, servers) } // relayManagerInputEvent initializes [relayManager] if necessary, starts // relayManager.runLoop() if it is not running, and writes 'event' on 'eventCh'. // // [relayManager] initialization will make `*eventCh`, so it must be passed as // a pointer to a channel. // // 'ctx' can be used for returning when runLoop is waiting for the calling // goroutine to return, i.e. the calling goroutine was birthed by runLoop and is // cancelable via 'ctx'. 'ctx' may be nil. func relayManagerInputEvent[T any](r *relayManager, ctx context.Context, eventCh *chan T, event T) { r.init() var ctxDoneCh <-chan struct{} if ctx != nil { ctxDoneCh = ctx.Done() } for { select { case <-ctxDoneCh: return case *eventCh <- event: return case <-r.runLoopStoppedCh: go r.runLoop() } } } // endpointWithLastBest represents an [*endpoint], its last bestAddr, and if // the last bestAddr was trusted (see endpoint.trustBestAddrUntil) at the time // of init. This structure is immutable once initialized. type endpointWithLastBest struct { ep *endpoint lastBest addrQuality lastBestIsTrusted bool } // startUDPRelayPathDiscoveryFor starts UDP relay path discovery for ep on all // known relay servers if ep has no in-progress work. func (r *relayManager) startUDPRelayPathDiscoveryFor(ep *endpoint, lastBest addrQuality, lastBestIsTrusted bool) { relayManagerInputEvent(r, nil, &r.startDiscoveryCh, endpointWithLastBest{ ep: ep, lastBest: lastBest, lastBestIsTrusted: lastBestIsTrusted, }) } // stopWork stops all outstanding allocation & handshaking work for 'ep'. func (r *relayManager) stopWork(ep *endpoint) { relayManagerInputEvent(r, nil, &r.cancelWorkCh, ep) } // stopWorkRunLoop cancels & clears outstanding allocation and handshaking // work for 'ep'. func (r *relayManager) stopWorkRunLoop(ep *endpoint) { byDiscoKeys, ok := r.allocWorkByCandidatePeerRelayByEndpoint[ep] if ok { for _, work := range byDiscoKeys { work.cancel() done := <-work.doneCh r.handleAllocWorkDoneRunLoop(done) } } byServerDisco, ok := r.handshakeWorkByServerDiscoByEndpoint[ep] if ok { for _, handshakeWork := range byServerDisco { handshakeWork.cancel() done := <-handshakeWork.doneCh r.handleHandshakeWorkDoneRunLoop(done) } } } // addrPortVNI represents a combined netip.AddrPort and Geneve header virtual // network identifier. type addrPortVNI struct { addrPort netip.AddrPort vni uint32 } func (r *relayManager) handleRxDiscoMsgRunLoop(event relayDiscoMsgEvent) { var ( work *relayHandshakeWork ok bool ) apv := addrPortVNI{event.from, event.vni} switch msg := event.msg.(type) { case *disco.AllocateUDPRelayEndpointResponse: sorted := key.NewSortedPairOfDiscoPublic(msg.ClientDisco[0], msg.ClientDisco[1]) byDiscoKeys, ok := r.allocWorkByDiscoKeysByServerNodeKey[event.relayServerNodeKey] if !ok { // No outstanding work tied to this relay sever, discard. return } allocWork, ok := byDiscoKeys[sorted] if !ok { // No outstanding work tied to these disco keys, discard. return } select { case done := <-allocWork.doneCh: // allocateServerEndpoint returned, clean up its state r.handleAllocWorkDoneRunLoop(done) return case allocWork.rxDiscoMsgCh <- msg: return } case *disco.BindUDPRelayEndpointChallenge: work, ok = r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{event.disco, event.vni}] if !ok { // No outstanding work tied to this challenge, discard. return } _, ok = r.handshakeWorkAwaitingPong[work] if ok { // We've seen a challenge for this relay endpoint previously, // discard. Servers only respond to the first src ip:port they see // binds from. return } _, ok = r.addrPortVNIToHandshakeWork[apv] if ok { // There is existing work for the same [addrPortVNI] that is not // 'work'. If both instances happen to be on the same server we // could attempt to resolve event order using LamportID. For now // just leave both work instances alone and take no action other // than to discard this challenge msg. return } // Update state so that future ping/pong will route to 'work'. r.handshakeWorkAwaitingPong[work] = apv r.addrPortVNIToHandshakeWork[apv] = work case *disco.Ping: // Always TX a pong. We might not have any associated work if ping // reception raced with our call to [endpoint.udpRelayEndpointReady()], so // err on the side of enabling the remote side to use this path. // // Conn.handlePingLocked() makes efforts to suppress duplicate pongs // where the same ping can be received both via raw socket and UDP // socket on Linux. We make no such efforts here as the raw socket BPF // program does not support Geneve-encapsulated disco, and is also // disabled by default. vni := virtualNetworkID{} vni.set(event.vni) go event.conn.sendDiscoMessage(epAddr{ap: event.from, vni: vni}, key.NodePublic{}, event.disco, &disco.Pong{ TxID: msg.TxID, Src: event.from, }, discoVerboseLog) work, ok = r.addrPortVNIToHandshakeWork[apv] if !ok { // No outstanding work tied to this [addrPortVNI], return early. return } case *disco.Pong: work, ok = r.addrPortVNIToHandshakeWork[apv] if !ok { // No outstanding work tied to this [addrPortVNI], discard. return } default: // Unexpected message type, discard. return } select { case done := <-work.doneCh: // handshakeServerEndpoint() returned, clean up its state. r.handleHandshakeWorkDoneRunLoop(done) return case work.rxDiscoMsgCh <- event: return } } func (r *relayManager) handleAllocWorkDoneRunLoop(done relayEndpointAllocWorkDoneEvent) { byCandidatePeerRelay, ok := r.allocWorkByCandidatePeerRelayByEndpoint[done.work.wlb.ep] if !ok { return } work, ok := byCandidatePeerRelay[done.work.candidatePeerRelay] if !ok || work != done.work { return } delete(byCandidatePeerRelay, done.work.candidatePeerRelay) if len(byCandidatePeerRelay) == 0 { delete(r.allocWorkByCandidatePeerRelayByEndpoint, done.work.wlb.ep) } byDiscoKeys, ok := r.allocWorkByDiscoKeysByServerNodeKey[done.work.candidatePeerRelay.nodeKey] if !ok { // unexpected return } delete(byDiscoKeys, done.work.discoKeys) if len(byDiscoKeys) == 0 { delete(r.allocWorkByDiscoKeysByServerNodeKey, done.work.candidatePeerRelay.nodeKey) } if !done.allocated.ServerDisco.IsZero() { r.handleNewServerEndpointRunLoop(newRelayServerEndpointEvent{ wlb: done.work.wlb, se: done.allocated, server: done.work.candidatePeerRelay, }) } } func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshakeWorkDoneEvent) { byServerDisco, ok := r.handshakeWorkByServerDiscoByEndpoint[done.work.wlb.ep] if !ok { return } work, ok := byServerDisco[done.work.se.ServerDisco] if !ok || work != done.work { return } delete(byServerDisco, done.work.se.ServerDisco) if len(byServerDisco) == 0 { delete(r.handshakeWorkByServerDiscoByEndpoint, done.work.wlb.ep) } delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{done.work.se.ServerDisco, done.work.se.VNI}) apv, ok := r.handshakeWorkAwaitingPong[work] if ok { delete(r.handshakeWorkAwaitingPong, work) delete(r.addrPortVNIToHandshakeWork, apv) } if !done.pongReceivedFrom.IsValid() { // The handshake or ping/pong probing timed out. return } // This relay endpoint is functional. vni := virtualNetworkID{} vni.set(done.work.se.VNI) addr := epAddr{ap: done.pongReceivedFrom, vni: vni} // ep.udpRelayEndpointReady() must be called in a new goroutine to prevent // deadlocks as it acquires [endpoint] & [Conn] mutexes. See [relayManager] // docs for details. go done.work.wlb.ep.udpRelayEndpointReady(addrQuality{ epAddr: addr, relayServerDisco: done.work.se.ServerDisco, latency: done.latency, wireMTU: pingSizeToPktLen(0, addr), }) } func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) { // Check for duplicate work by server disco + VNI. sdv := serverDiscoVNI{newServerEndpoint.se.ServerDisco, newServerEndpoint.se.VNI} existingWork, ok := r.handshakeWorkByServerDiscoVNI[sdv] if ok { // There's in-progress handshake work for the server disco + VNI, which // uniquely identify a [udprelay.ServerEndpoint]. Compare Lamport // IDs to determine which is newer. if existingWork.se.LamportID >= newServerEndpoint.se.LamportID { // The existing work is a duplicate or newer. Return early. return } // The existing work is no longer valid, clean it up. existingWork.cancel() done := <-existingWork.doneCh r.handleHandshakeWorkDoneRunLoop(done) } // Check for duplicate work by [*endpoint] + server disco. byServerDisco, ok := r.handshakeWorkByServerDiscoByEndpoint[newServerEndpoint.wlb.ep] if ok { existingWork, ok := byServerDisco[newServerEndpoint.se.ServerDisco] if ok { if newServerEndpoint.se.LamportID <= existingWork.se.LamportID { // The "new" server endpoint is outdated or duplicate in // consideration against existing handshake work. Return early. return } // Cancel existing handshake that has a lower lamport ID. existingWork.cancel() done := <-existingWork.doneCh r.handleHandshakeWorkDoneRunLoop(done) } } // We're now reasonably sure we're dealing with the latest // [udprelay.ServerEndpoint] from a server event order perspective // (LamportID). if newServerEndpoint.server.isValid() { // Send a [disco.CallMeMaybeVia] to the remote peer if we allocated this // endpoint, regardless of if we start a handshake below. go r.sendCallMeMaybeVia(newServerEndpoint.wlb.ep, newServerEndpoint.se) } lastBestMatchingServer := newServerEndpoint.se.ServerDisco.Compare(newServerEndpoint.wlb.lastBest.relayServerDisco) == 0 if lastBestMatchingServer && newServerEndpoint.wlb.lastBestIsTrusted { // This relay server endpoint is the same as [endpoint]'s bestAddr at // the time UDP relay path discovery was started, and it was also a // trusted path (see endpoint.trustBestAddrUntil), so return early. // // If we were to start a new handshake, there is a chance that we // cause [endpoint] to blackhole some packets on its bestAddr if we end // up shifting to a new address family or src, e.g. IPv4 to IPv6, due to // the window of time between the handshake completing, and our call to // udpRelayEndpointReady(). The relay server can only forward packets // from us on a single [epAddr]. return } // TODO(jwhited): if lastBest is untrusted, consider some strategies // to reduce the chance we blackhole if it were to transition to // trusted during/before the new handshake: // 1. Start by attempting a handshake with only lastBest.epAddr. If // that fails then try the remaining [epAddr]s. // 2. Signal bestAddr trust transitions between [endpoint] and // [relayManager] in order to prevent a handshake from starting // and/or stop one that is running. // We're ready to start a new handshake. ctx, cancel := context.WithCancel(context.Background()) work := &relayHandshakeWork{ wlb: newServerEndpoint.wlb, se: newServerEndpoint.se, rxDiscoMsgCh: make(chan relayDiscoMsgEvent), doneCh: make(chan relayEndpointHandshakeWorkDoneEvent, 1), ctx: ctx, cancel: cancel, } if byServerDisco == nil { byServerDisco = make(map[key.DiscoPublic]*relayHandshakeWork) r.handshakeWorkByServerDiscoByEndpoint[newServerEndpoint.wlb.ep] = byServerDisco } byServerDisco[newServerEndpoint.se.ServerDisco] = work r.handshakeWorkByServerDiscoVNI[sdv] = work r.handshakeGeneration++ if r.handshakeGeneration == 0 { // generation must be nonzero r.handshakeGeneration++ } go r.handshakeServerEndpoint(work, r.handshakeGeneration) } // sendCallMeMaybeVia sends a [disco.CallMeMaybeVia] to ep over DERP. It must be // called as part of a goroutine independent from runLoop(), for 2 reasons: // 1. it acquires ep.mu (refer to [relayManager] docs for reasoning) // 2. it makes a networking syscall, which can introduce unwanted backpressure func (r *relayManager) sendCallMeMaybeVia(ep *endpoint, se udprelay.ServerEndpoint) { ep.mu.Lock() derpAddr := ep.derpAddr ep.mu.Unlock() epDisco := ep.disco.Load() if epDisco == nil || !derpAddr.IsValid() { return } callMeMaybeVia := &disco.CallMeMaybeVia{ UDPRelayEndpoint: disco.UDPRelayEndpoint{ ServerDisco: se.ServerDisco, ClientDisco: se.ClientDisco, LamportID: se.LamportID, VNI: se.VNI, BindLifetime: se.BindLifetime.Duration, SteadyStateLifetime: se.SteadyStateLifetime.Duration, AddrPorts: se.AddrPorts, }, } ep.c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, epDisco.key, callMeMaybeVia, discoVerboseLog) } func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generation uint32) { done := relayEndpointHandshakeWorkDoneEvent{work: work} r.ensureDiscoInfoFor(work) defer func() { r.derefDiscoInfoFor(work) work.doneCh <- done relayManagerInputEvent(r, work.ctx, &r.handshakeWorkDoneCh, done) work.cancel() }() ep := work.wlb.ep epDisco := ep.disco.Load() if epDisco == nil { return } common := disco.BindUDPRelayEndpointCommon{ VNI: work.se.VNI, Generation: generation, RemoteKey: epDisco.key, } sentBindAny := false bind := &disco.BindUDPRelayEndpoint{ BindUDPRelayEndpointCommon: common, } vni := virtualNetworkID{} vni.set(work.se.VNI) for _, addrPort := range work.se.AddrPorts { if addrPort.IsValid() { sentBindAny = true go ep.c.sendDiscoMessage(epAddr{ap: addrPort, vni: vni}, key.NodePublic{}, work.se.ServerDisco, bind, discoVerboseLog) } } if !sentBindAny { return } // Limit goroutine lifetime to a reasonable duration. This is intentionally // detached and independent of 'BindLifetime' to prevent relay server // (mis)configuration from negatively impacting client resource usage. const maxHandshakeLifetime = time.Second * 30 timer := time.NewTimer(min(work.se.BindLifetime.Duration, maxHandshakeLifetime)) defer timer.Stop() // Limit the number of pings we will transmit. Inbound pings trigger // outbound pings, so we want to be a little defensive. const limitPings = 10 var ( handshakeState disco.BindUDPRelayHandshakeState = disco.BindUDPRelayHandshakeStateBindSent sentPingAt = make(map[stun.TxID]time.Time) ) txPing := func(to netip.AddrPort, withAnswer *[32]byte) { if len(sentPingAt) == limitPings { return } txid := stun.NewTxID() sentPingAt[txid] = time.Now() ping := &disco.Ping{ TxID: txid, NodeKey: ep.c.publicKeyAtomic.Load(), } go func() { if withAnswer != nil { answer := &disco.BindUDPRelayEndpointAnswer{BindUDPRelayEndpointCommon: common} answer.Challenge = *withAnswer ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog) } ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, epDisco.key, ping, discoVerboseLog) }() } validateVNIAndRemoteKey := func(common disco.BindUDPRelayEndpointCommon) error { if common.VNI != work.se.VNI { return errors.New("mismatching VNI") } if common.RemoteKey.Compare(epDisco.key) != 0 { return errors.New("mismatching RemoteKey") } return nil } // This for{select{}} is responsible for handshaking and tx'ing ping/pong // when the handshake is complete. for { select { case <-work.ctx.Done(): return case msgEvent := <-work.rxDiscoMsgCh: switch msg := msgEvent.msg.(type) { case *disco.BindUDPRelayEndpointChallenge: err := validateVNIAndRemoteKey(msg.BindUDPRelayEndpointCommon) if err != nil { continue } if handshakeState >= disco.BindUDPRelayHandshakeStateAnswerSent { continue } txPing(msgEvent.from, &msg.Challenge) handshakeState = disco.BindUDPRelayHandshakeStateAnswerSent case *disco.Ping: if handshakeState < disco.BindUDPRelayHandshakeStateAnswerSent { continue } // An inbound ping from the remote peer indicates we completed a // handshake with the relay server (our answer msg was // received). Chances are our ping was dropped before the remote // handshake was complete. We need to rx a pong to determine // latency, so send another ping. Since the handshake is // complete we do not need to send an answer in front of this // one. // // We don't need to TX a pong, that was already handled for us // in handleRxDiscoMsgRunLoop(). txPing(msgEvent.from, nil) case *disco.Pong: at, ok := sentPingAt[msg.TxID] if !ok { continue } // The relay server endpoint is functional! Record the // round-trip latency and return. done.pongReceivedFrom = msgEvent.from done.latency = time.Since(at) return default: // unexpected message type, silently discard continue } case <-timer.C: // The handshake timed out. return } } } const allocateUDPRelayEndpointRequestTimeout = time.Second * 10 func (r *relayManager) allocateServerEndpoint(work *relayEndpointAllocWork, generation uint32) { done := relayEndpointAllocWorkDoneEvent{work: work} defer func() { work.doneCh <- done relayManagerInputEvent(r, work.ctx, &r.allocateWorkDoneCh, done) work.cancel() }() dm := &disco.AllocateUDPRelayEndpointRequest{ ClientDisco: work.discoKeys.Get(), Generation: generation, } sendAllocReq := func() { work.wlb.ep.c.sendDiscoAllocateUDPRelayEndpointRequest( epAddr{ ap: netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, work.candidatePeerRelay.derpHomeRegionID), }, work.candidatePeerRelay.nodeKey, work.candidatePeerRelay.discoKey, dm, discoVerboseLog, ) } go sendAllocReq() returnAfterTimer := time.NewTimer(allocateUDPRelayEndpointRequestTimeout) defer returnAfterTimer.Stop() // While connections to DERP are over TCP, they can be lossy on the DERP // server when data moves between the two independent streams. Also, the // peer relay server may not be "ready" (see [tailscale.com/net/udprelay.ErrServerNotReady]). // So, start a timer to retry once if needed. retryAfterTimer := time.NewTimer(udprelay.ServerRetryAfter) defer retryAfterTimer.Stop() for { select { case <-work.ctx.Done(): return case <-returnAfterTimer.C: return case <-retryAfterTimer.C: go sendAllocReq() case resp := <-work.rxDiscoMsgCh: if resp.Generation != generation || !work.discoKeys.Equal(key.NewSortedPairOfDiscoPublic(resp.ClientDisco[0], resp.ClientDisco[1])) { continue } done.allocated = udprelay.ServerEndpoint{ ServerDisco: resp.ServerDisco, ClientDisco: resp.ClientDisco, LamportID: resp.LamportID, AddrPorts: resp.AddrPorts, VNI: resp.VNI, BindLifetime: tstime.GoDuration{Duration: resp.BindLifetime}, SteadyStateLifetime: tstime.GoDuration{Duration: resp.SteadyStateLifetime}, } return } } } func (r *relayManager) allocateAllServersRunLoop(wlb endpointWithLastBest) { if len(r.serversByNodeKey) == 0 { return } remoteDisco := wlb.ep.disco.Load() if remoteDisco == nil { return } discoKeys := key.NewSortedPairOfDiscoPublic(wlb.ep.c.discoPublic, remoteDisco.key) for _, v := range r.serversByNodeKey { byDiscoKeys, ok := r.allocWorkByDiscoKeysByServerNodeKey[v.nodeKey] if !ok { byDiscoKeys = make(map[key.SortedPairOfDiscoPublic]*relayEndpointAllocWork) r.allocWorkByDiscoKeysByServerNodeKey[v.nodeKey] = byDiscoKeys } else { _, ok = byDiscoKeys[discoKeys] if ok { // If there is an existing key, a disco key collision may have // occurred across peers ([*endpoint]). Do not overwrite the // existing work, let it finish. wlb.ep.c.logf("[unexpected] magicsock: relayManager: suspected disco key collision on server %v for keys: %v", v.nodeKey.ShortString(), discoKeys) continue } } ctx, cancel := context.WithCancel(context.Background()) started := &relayEndpointAllocWork{ wlb: wlb, discoKeys: discoKeys, candidatePeerRelay: v, rxDiscoMsgCh: make(chan *disco.AllocateUDPRelayEndpointResponse), doneCh: make(chan relayEndpointAllocWorkDoneEvent, 1), ctx: ctx, cancel: cancel, } byDiscoKeys[discoKeys] = started byCandidatePeerRelay, ok := r.allocWorkByCandidatePeerRelayByEndpoint[wlb.ep] if !ok { byCandidatePeerRelay = make(map[candidatePeerRelay]*relayEndpointAllocWork) r.allocWorkByCandidatePeerRelayByEndpoint[wlb.ep] = byCandidatePeerRelay } byCandidatePeerRelay[v] = started r.allocGeneration++ go r.allocateServerEndpoint(started, r.allocGeneration) } }