wgengine/magicsock: hook relay server alloc into path discovery

This is currently no-op as the relayCapable bool is never set. This
commit also begins to shape allocation and handshaking within
relayManager.

Updates tailscale/corp#27502

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited 2025-05-05 15:32:00 -07:00
parent 7e2630235f
commit 06cc075ed9
No known key found for this signature in database
GPG Key ID: 33DF352F65991EB8
5 changed files with 152 additions and 17 deletions

View File

@ -872,6 +872,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
tailscale.com/net/tsdial from tailscale.com/control/controlclient+
💣 tailscale.com/net/tshttpproxy from tailscale.com/clientupdate/distsign+
tailscale.com/net/tstun from tailscale.com/tsd+
tailscale.com/net/udprelay from tailscale.com/wgengine/magicsock
tailscale.com/omit from tailscale.com/ipn/conffile
tailscale.com/paths from tailscale.com/client/local+
💣 tailscale.com/portlist from tailscale.com/ipn/ipnlocal

View File

@ -348,7 +348,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/net/tsdial from tailscale.com/cmd/tailscaled+
💣 tailscale.com/net/tshttpproxy from tailscale.com/clientupdate/distsign+
tailscale.com/net/tstun from tailscale.com/cmd/tailscaled+
tailscale.com/net/udprelay from tailscale.com/feature/relayserver
tailscale.com/net/udprelay from tailscale.com/feature/relayserver+
tailscale.com/omit from tailscale.com/ipn/conffile
tailscale.com/paths from tailscale.com/client/local+
💣 tailscale.com/portlist from tailscale.com/ipn/ipnlocal

View File

@ -303,6 +303,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware)
tailscale.com/net/tsdial from tailscale.com/control/controlclient+
💣 tailscale.com/net/tshttpproxy from tailscale.com/clientupdate/distsign+
tailscale.com/net/tstun from tailscale.com/tsd+
tailscale.com/net/udprelay from tailscale.com/wgengine/magicsock
tailscale.com/omit from tailscale.com/ipn/conffile
tailscale.com/paths from tailscale.com/client/local+
💣 tailscale.com/portlist from tailscale.com/ipn/ipnlocal

View File

@ -95,6 +95,7 @@ type endpoint struct {
expired bool // whether the node has expired
isWireguardOnly bool // whether the endpoint is WireGuard only
relayCapable bool // whether the node is capable of speaking via a [tailscale.com/net/udprelay.Server]
}
func (de *endpoint) setBestAddrLocked(v addrQuality) {
@ -1249,6 +1250,13 @@ func (de *endpoint) sendDiscoPingsLocked(now mono.Time, sendCallMeMaybe bool) {
// sent so our firewall ports are probably open and now
// would be a good time for them to connect.
go de.c.enqueueCallMeMaybe(derpAddr, de)
// Schedule allocation of relay endpoints. We make no considerations for
// current relay endpoints or best UDP path state for now, keep it
// simple.
if de.relayCapable {
go de.c.relayManager.allocateAndHandshakeAllServers(de)
}
}
}
@ -1863,6 +1871,7 @@ func (de *endpoint) resetLocked() {
}
}
de.probeUDPLifetime.resetCycleEndpointLocked()
de.c.relayManager.cancelOutstandingWork(de)
}
func (de *endpoint) numStopAndReset() int64 {

View File

@ -4,11 +4,19 @@
package magicsock
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/netip"
"sync"
"time"
"tailscale.com/disco"
"tailscale.com/net/udprelay"
"tailscale.com/types/key"
"tailscale.com/util/httpm"
)
// relayManager manages allocation and handshaking of
@ -17,35 +25,151 @@ import (
type relayManager struct {
mu sync.Mutex // guards the following fields
discoInfoByServerDisco map[key.DiscoPublic]*discoInfo
// serversByAddrPort value is the disco key of the relay server, which is
// discovered at relay endpoint allocation time. Map value will be zero
// (key.DiscoPublic.IsZero()) if no endpoints have been successfully
// allocated on the server, yet.
serversByAddrPort map[netip.AddrPort]key.DiscoPublic
relaySetupWorkByEndpoint map[*endpoint]*relaySetupWork
}
func (h *relayManager) initLocked() {
if h.discoInfoByServerDisco != nil {
// relaySetupWork serves to track in-progress relay endpoint allocation and
// handshaking work for an [*endpoint]. This structure is immutable once
// initialized.
type relaySetupWork struct {
// ep is the [*endpoint] associated with the work
ep *endpoint
// cancel() will signal all associated goroutines to return
cancel context.CancelFunc
// wg.Wait() will return once all associated goroutines have returned
wg *sync.WaitGroup
}
func (r *relayManager) initLocked() {
if r.discoInfoByServerDisco != nil {
return
}
h.discoInfoByServerDisco = make(map[key.DiscoPublic]*discoInfo)
r.discoInfoByServerDisco = make(map[key.DiscoPublic]*discoInfo)
r.serversByAddrPort = make(map[netip.AddrPort]key.DiscoPublic)
}
// discoInfo returns a [*discoInfo] for 'serverDisco' if there is an
// active/ongoing handshake with it, otherwise it returns nil, false.
func (h *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok bool) {
h.mu.Lock()
defer h.mu.Unlock()
h.initLocked()
di, ok := h.discoInfoByServerDisco[serverDisco]
func (r *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.initLocked()
di, ok := r.discoInfoByServerDisco[serverDisco]
return di, ok
}
func (h *relayManager) handleCallMeMaybeVia(dm *disco.CallMeMaybeVia) {
h.mu.Lock()
defer h.mu.Unlock()
h.initLocked()
func (r *relayManager) handleCallMeMaybeVia(dm *disco.CallMeMaybeVia) {
r.mu.Lock()
defer r.mu.Unlock()
r.initLocked()
// TODO(jwhited): implement
}
func (h *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRelayEndpointChallenge, di *discoInfo, src netip.AddrPort, vni uint32) {
h.mu.Lock()
defer h.mu.Unlock()
h.initLocked()
func (r *relayManager) handleBindUDPRelayEndpointChallenge(dm *disco.BindUDPRelayEndpointChallenge, di *discoInfo, src netip.AddrPort, vni uint32) {
r.mu.Lock()
defer r.mu.Unlock()
r.initLocked()
// TODO(jwhited): implement
}
// cancelOutstandingWork cancels any in-progress work for 'ep'.
func (r *relayManager) cancelOutstandingWork(ep *endpoint) {
r.mu.Lock()
defer r.mu.Unlock()
existing, ok := r.relaySetupWorkByEndpoint[ep]
if ok {
existing.cancel()
existing.wg.Wait()
delete(r.relaySetupWorkByEndpoint, ep)
}
}
// allocateAndHandshakeAllServers kicks off allocation and handshaking of relay
// endpoints for 'ep' on all known relay servers, canceling any existing
// in-progress work.
func (r *relayManager) allocateAndHandshakeAllServers(ep *endpoint) {
r.mu.Lock()
defer r.mu.Unlock()
r.initLocked()
existing, ok := r.relaySetupWorkByEndpoint[ep]
if ok {
existing.cancel()
existing.wg.Wait()
delete(r.relaySetupWorkByEndpoint, ep)
}
if len(r.serversByAddrPort) == 0 {
return
}
ctx, cancel := context.WithCancel(context.Background())
started := &relaySetupWork{ep: ep, cancel: cancel, wg: &sync.WaitGroup{}}
for k := range r.serversByAddrPort {
started.wg.Add(1)
go r.allocateAndHandshakeForServer(ctx, started.wg, k, ep)
}
r.relaySetupWorkByEndpoint[ep] = started
go func() {
started.wg.Wait()
started.cancel()
r.mu.Lock()
defer r.mu.Unlock()
maybeCleanup, ok := r.relaySetupWorkByEndpoint[ep]
if ok && maybeCleanup == started {
// A subsequent call to allocateAndHandshakeAllServers may have raced to
// delete the associated key/value, so ensure the work we are
// cleaning up from the map is the same as the one we were waiting
// to finish.
delete(r.relaySetupWorkByEndpoint, ep)
}
}()
}
func (r *relayManager) handleNewServerEndpoint(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, se udprelay.ServerEndpoint) {
// TODO(jwhited): implement
}
func (r *relayManager) allocateAndHandshakeForServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) {
// TODO(jwhited): introduce client metrics counters for notable failures
defer wg.Done()
var b bytes.Buffer
remoteDisco := ep.disco.Load()
if remoteDisco == nil {
return
}
type allocateRelayEndpointReq struct {
DiscoKeys []key.DiscoPublic
}
a := &allocateRelayEndpointReq{
DiscoKeys: []key.DiscoPublic{ep.c.discoPublic, remoteDisco.key},
}
err := json.NewEncoder(&b).Encode(a)
if err != nil {
return
}
const reqTimeout = time.Second * 10
reqCtx, cancel := context.WithTimeout(ctx, reqTimeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, httpm.POST, "http://"+server.String()+"/relay/endpoint", &b)
if err != nil {
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return
}
var se udprelay.ServerEndpoint
err = json.NewDecoder(io.LimitReader(resp.Body, 4096)).Decode(&se)
if err != nil {
return
}
r.handleNewServerEndpoint(ctx, wg, server, se)
}