From 06cc075ed9ecff7a04b2e606fd287fd7bbac936b Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Mon, 5 May 2025 15:32:00 -0700 Subject: [PATCH] wgengine/magicsock: hook relay server alloc into path discovery This is currently no-op as the relayCapable bool is never set. This commit also begins to shape allocation and handshaking within relayManager. Updates tailscale/corp#27502 Signed-off-by: Jordan Whited --- cmd/k8s-operator/depaware.txt | 1 + cmd/tailscaled/depaware.txt | 2 +- tsnet/depaware.txt | 1 + wgengine/magicsock/endpoint.go | 9 ++ wgengine/magicsock/relaymanager.go | 156 ++++++++++++++++++++++++++--- 5 files changed, 152 insertions(+), 17 deletions(-) diff --git a/cmd/k8s-operator/depaware.txt b/cmd/k8s-operator/depaware.txt index 700085b39..67cd76f5c 100644 --- a/cmd/k8s-operator/depaware.txt +++ b/cmd/k8s-operator/depaware.txt @@ -872,6 +872,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/ tailscale.com/net/tsdial from tailscale.com/control/controlclient+ 💣 tailscale.com/net/tshttpproxy from tailscale.com/clientupdate/distsign+ tailscale.com/net/tstun from tailscale.com/tsd+ + tailscale.com/net/udprelay from tailscale.com/wgengine/magicsock tailscale.com/omit from tailscale.com/ipn/conffile tailscale.com/paths from tailscale.com/client/local+ 💣 tailscale.com/portlist from tailscale.com/ipn/ipnlocal diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index 1af828f75..b51cffc9d 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -348,7 +348,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/net/tsdial from tailscale.com/cmd/tailscaled+ 💣 tailscale.com/net/tshttpproxy from tailscale.com/clientupdate/distsign+ tailscale.com/net/tstun from tailscale.com/cmd/tailscaled+ - tailscale.com/net/udprelay from tailscale.com/feature/relayserver + tailscale.com/net/udprelay from tailscale.com/feature/relayserver+ tailscale.com/omit from tailscale.com/ipn/conffile tailscale.com/paths from tailscale.com/client/local+ 💣 tailscale.com/portlist from tailscale.com/ipn/ipnlocal diff --git a/tsnet/depaware.txt b/tsnet/depaware.txt index 8bc93cd2f..786da0af6 100644 --- a/tsnet/depaware.txt +++ b/tsnet/depaware.txt @@ -303,6 +303,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware) tailscale.com/net/tsdial from tailscale.com/control/controlclient+ 💣 tailscale.com/net/tshttpproxy from tailscale.com/clientupdate/distsign+ tailscale.com/net/tstun from tailscale.com/tsd+ + tailscale.com/net/udprelay from tailscale.com/wgengine/magicsock tailscale.com/omit from tailscale.com/ipn/conffile tailscale.com/paths from tailscale.com/client/local+ 💣 tailscale.com/portlist from tailscale.com/ipn/ipnlocal diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 5f4f0bd8c..f88dab29d 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -95,6 +95,7 @@ type endpoint struct { expired bool // whether the node has expired isWireguardOnly bool // whether the endpoint is WireGuard only + relayCapable bool // whether the node is capable of speaking via a [tailscale.com/net/udprelay.Server] } func (de *endpoint) setBestAddrLocked(v addrQuality) { @@ -1249,6 +1250,13 @@ func (de *endpoint) sendDiscoPingsLocked(now mono.Time, sendCallMeMaybe bool) { // sent so our firewall ports are probably open and now // would be a good time for them to connect. go de.c.enqueueCallMeMaybe(derpAddr, de) + + // Schedule allocation of relay endpoints. We make no considerations for + // current relay endpoints or best UDP path state for now, keep it + // simple. + if de.relayCapable { + go de.c.relayManager.allocateAndHandshakeAllServers(de) + } } } @@ -1863,6 +1871,7 @@ func (de *endpoint) resetLocked() { } } de.probeUDPLifetime.resetCycleEndpointLocked() + de.c.relayManager.cancelOutstandingWork(de) } func (de *endpoint) numStopAndReset() int64 { diff --git a/wgengine/magicsock/relaymanager.go b/wgengine/magicsock/relaymanager.go index bf737b078..9f1fbbceb 100644 --- a/wgengine/magicsock/relaymanager.go +++ b/wgengine/magicsock/relaymanager.go @@ -4,11 +4,19 @@ package magicsock import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" "net/netip" "sync" + "time" "tailscale.com/disco" + "tailscale.com/net/udprelay" "tailscale.com/types/key" + "tailscale.com/util/httpm" ) // relayManager manages allocation and handshaking of @@ -17,35 +25,151 @@ import ( type relayManager struct { mu sync.Mutex // guards the following fields discoInfoByServerDisco map[key.DiscoPublic]*discoInfo + // serversByAddrPort value is the disco key of the relay server, which is + // discovered at relay endpoint allocation time. Map value will be zero + // (key.DiscoPublic.IsZero()) if no endpoints have been successfully + // allocated on the server, yet. + serversByAddrPort map[netip.AddrPort]key.DiscoPublic + relaySetupWorkByEndpoint map[*endpoint]*relaySetupWork } -func (h *relayManager) initLocked() { - if h.discoInfoByServerDisco != nil { +// relaySetupWork serves to track in-progress relay endpoint allocation and +// handshaking work for an [*endpoint]. This structure is immutable once +// initialized. +type relaySetupWork struct { + // ep is the [*endpoint] associated with the work + ep *endpoint + // cancel() will signal all associated goroutines to return + cancel context.CancelFunc + // wg.Wait() will return once all associated goroutines have returned + wg *sync.WaitGroup +} + +func (r *relayManager) initLocked() { + if r.discoInfoByServerDisco != nil { return } - h.discoInfoByServerDisco = make(map[key.DiscoPublic]*discoInfo) + r.discoInfoByServerDisco = make(map[key.DiscoPublic]*discoInfo) + r.serversByAddrPort = make(map[netip.AddrPort]key.DiscoPublic) } // discoInfo returns a [*discoInfo] for 'serverDisco' if there is an // active/ongoing handshake with it, otherwise it returns nil, false. -func (h *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok bool) { - h.mu.Lock() - defer h.mu.Unlock() - h.initLocked() - di, ok := h.discoInfoByServerDisco[serverDisco] +func (r *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok bool) { + r.mu.Lock() + defer r.mu.Unlock() + r.initLocked() + di, ok := r.discoInfoByServerDisco[serverDisco] return di, ok } -func (h *relayManager) handleCallMeMaybeVia(dm *disco.CallMeMaybeVia) { - h.mu.Lock() - defer h.mu.Unlock() - h.initLocked() +func (r *relayManager) handleCallMeMaybeVia(dm *disco.CallMeMaybeVia) { + r.mu.Lock() + defer r.mu.Unlock() + r.initLocked() // TODO(jwhited): implement } -func (h *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRelayEndpointChallenge, di *discoInfo, src netip.AddrPort, vni uint32) { - h.mu.Lock() - defer h.mu.Unlock() - h.initLocked() +func (r *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRelayEndpointChallenge, di *discoInfo, src netip.AddrPort, vni uint32) { + r.mu.Lock() + defer r.mu.Unlock() + r.initLocked() // TODO(jwhited): implement } + +// cancelOutstandingWork cancels any in-progress work for 'ep'. +func (r *relayManager) cancelOutstandingWork(ep *endpoint) { + r.mu.Lock() + defer r.mu.Unlock() + existing, ok := r.relaySetupWorkByEndpoint[ep] + if ok { + existing.cancel() + existing.wg.Wait() + delete(r.relaySetupWorkByEndpoint, ep) + } +} + +// allocateAndHandshakeAllServers kicks off allocation and handshaking of relay +// endpoints for 'ep' on all known relay servers, canceling any existing +// in-progress work. +func (r *relayManager) allocateAndHandshakeAllServers(ep *endpoint) { + r.mu.Lock() + defer r.mu.Unlock() + r.initLocked() + existing, ok := r.relaySetupWorkByEndpoint[ep] + if ok { + existing.cancel() + existing.wg.Wait() + delete(r.relaySetupWorkByEndpoint, ep) + } + if len(r.serversByAddrPort) == 0 { + return + } + ctx, cancel := context.WithCancel(context.Background()) + started := &relaySetupWork{ep: ep, cancel: cancel, wg: &sync.WaitGroup{}} + for k := range r.serversByAddrPort { + started.wg.Add(1) + go r.allocateAndHandshakeForServer(ctx, started.wg, k, ep) + } + r.relaySetupWorkByEndpoint[ep] = started + + go func() { + started.wg.Wait() + started.cancel() + r.mu.Lock() + defer r.mu.Unlock() + maybeCleanup, ok := r.relaySetupWorkByEndpoint[ep] + if ok && maybeCleanup == started { + // A subsequent call to allocateAndHandshakeAllServers may have raced to + // delete the associated key/value, so ensure the work we are + // cleaning up from the map is the same as the one we were waiting + // to finish. + delete(r.relaySetupWorkByEndpoint, ep) + } + }() +} + +func (r *relayManager) handleNewServerEndpoint(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, se udprelay.ServerEndpoint) { + // TODO(jwhited): implement +} + +func (r *relayManager) allocateAndHandshakeForServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { + // TODO(jwhited): introduce client metrics counters for notable failures + defer wg.Done() + var b bytes.Buffer + remoteDisco := ep.disco.Load() + if remoteDisco == nil { + return + } + type allocateRelayEndpointReq struct { + DiscoKeys []key.DiscoPublic + } + a := &allocateRelayEndpointReq{ + DiscoKeys: []key.DiscoPublic{ep.c.discoPublic, remoteDisco.key}, + } + err := json.NewEncoder(&b).Encode(a) + if err != nil { + return + } + const reqTimeout = time.Second * 10 + reqCtx, cancel := context.WithTimeout(ctx, reqTimeout) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, httpm.POST, "http://"+server.String()+"/relay/endpoint", &b) + if err != nil { + return + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return + } + var se udprelay.ServerEndpoint + err = json.NewDecoder(io.LimitReader(resp.Body, 4096)).Decode(&se) + if err != nil { + return + } + r.handleNewServerEndpoint(ctx, wg, server, se) +}