From a589863d61725bf027bb03a1389c7900dce611b8 Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Mon, 23 Jun 2025 15:50:43 -0700 Subject: [PATCH] feature/relayserver,net/udprelay,wgengine/magicsock: implement retry (#16347) udprelay.Server is lazily initialized when the first request is received over peerAPI. These early requests have a high chance of failure until the first address discovery cycle has completed. Return an ErrServerNotReady error until the first address discovery cycle has completed, and plumb retry handling for this error all the way back to the client in relayManager. relayManager can now retry after a few seconds instead of waiting for the next path discovery cycle, which could take another minute or longer. Updates tailscale/corp#27502 Signed-off-by: Jordan Whited --- feature/relayserver/relayserver.go | 8 +++ net/udprelay/server.go | 37 ++++++++---- wgengine/magicsock/relaymanager.go | 91 +++++++++++++++++++++--------- 3 files changed, 99 insertions(+), 37 deletions(-) diff --git a/feature/relayserver/relayserver.go b/feature/relayserver/relayserver.go index a38587aa3..4634f3ac2 100644 --- a/feature/relayserver/relayserver.go +++ b/feature/relayserver/relayserver.go @@ -8,9 +8,11 @@ package relayserver import ( "encoding/json" "errors" + "fmt" "io" "net/http" "sync" + "time" "tailscale.com/envknob" "tailscale.com/feature" @@ -184,6 +186,12 @@ func handlePeerAPIRelayAllocateEndpoint(h ipnlocal.PeerAPIHandler, w http.Respon } ep, err := rs.AllocateEndpoint(allocateEndpointReq.DiscoKeys[0], allocateEndpointReq.DiscoKeys[1]) if err != nil { + var notReady udprelay.ErrServerNotReady + if errors.As(err, ¬Ready) { + w.Header().Set("Retry-After", fmt.Sprintf("%d", notReady.RetryAfter.Round(time.Second)/time.Second)) + httpErrAndLog(err.Error(), http.StatusServiceUnavailable) + return + } httpErrAndLog(err.Error(), http.StatusInternalServerError) return } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index f7f5868c0..8b9e95fb1 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -63,13 +63,14 @@ type Server struct { closeCh chan struct{} netChecker *netcheck.Client - mu sync.Mutex // guards the following fields - addrPorts []netip.AddrPort // the ip:port pairs returned as candidate endpoints - closed bool - lamportID uint64 - vniPool []uint32 // the pool of available VNIs - byVNI map[uint32]*serverEndpoint - byDisco map[pairOfDiscoPubKeys]*serverEndpoint + mu sync.Mutex // guards the following fields + addrDiscoveryOnce bool // addrDiscovery completed once (successfully or unsuccessfully) + addrPorts []netip.AddrPort // the ip:port pairs returned as candidate endpoints + closed bool + lamportID uint64 + vniPool []uint32 // the pool of available VNIs + byVNI map[uint32]*serverEndpoint + byDisco map[pairOfDiscoPubKeys]*serverEndpoint } // pairOfDiscoPubKeys is a pair of key.DiscoPublic. It must be constructed via @@ -321,8 +322,7 @@ func NewServer(logf logger.Logf, port int, overrideAddrs []netip.Addr) (s *Serve s.wg.Add(1) go s.endpointGCLoop() if len(overrideAddrs) > 0 { - var addrPorts set.Set[netip.AddrPort] - addrPorts.Make() + addrPorts := make(set.Set[netip.AddrPort], len(overrideAddrs)) for _, addr := range overrideAddrs { if addr.IsValid() { addrPorts.Add(netip.AddrPortFrom(addr, boundPort)) @@ -401,12 +401,12 @@ func (s *Server) addrDiscoveryLoop() { } s.mu.Lock() s.addrPorts = addrPorts + s.addrDiscoveryOnce = true s.mu.Unlock() case <-s.closeCh: return } } - } func (s *Server) listenOn(port int) (uint16, error) { @@ -521,10 +521,22 @@ func (s *Server) packetReadLoop() { var ErrServerClosed = errors.New("server closed") +// ErrServerNotReady indicates the server is not ready. Allocation should be +// requested after waiting for at least RetryAfter duration. +type ErrServerNotReady struct { + RetryAfter time.Duration +} + +func (e ErrServerNotReady) Error() string { + return fmt.Sprintf("server not ready, retry after %v", e.RetryAfter) +} + // AllocateEndpoint allocates an [endpoint.ServerEndpoint] for the provided pair // of [key.DiscoPublic]'s. If an allocation already exists for discoA and discoB // it is returned without modification/reallocation. AllocateEndpoint returns -// [ErrServerClosed] if the server has been closed. +// the following notable errors: +// 1. [ErrServerClosed] if the server has been closed. +// 2. [ErrServerNotReady] if the server is not ready. func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.ServerEndpoint, error) { s.mu.Lock() defer s.mu.Unlock() @@ -533,6 +545,9 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv } if len(s.addrPorts) == 0 { + if !s.addrDiscoveryOnce { + return endpoint.ServerEndpoint{}, ErrServerNotReady{RetryAfter: 3 * time.Second} + } return endpoint.ServerEndpoint{}, errors.New("server addrPorts are not yet known") } diff --git a/wgengine/magicsock/relaymanager.go b/wgengine/magicsock/relaymanager.go index 4ccfbb501..d149d0c59 100644 --- a/wgengine/magicsock/relaymanager.go +++ b/wgengine/magicsock/relaymanager.go @@ -7,9 +7,12 @@ import ( "bytes" "context" "encoding/json" + "errors" + "fmt" "io" "net/http" "net/netip" + "strconv" "sync" "time" @@ -716,46 +719,82 @@ func (r *relayManager) allocateAllServersRunLoop(ep *endpoint) { }() } -func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { - // TODO(jwhited): introduce client metrics counters for notable failures - defer wg.Done() - var b bytes.Buffer - remoteDisco := ep.disco.Load() - if remoteDisco == nil { - return - } +type errNotReady struct{ retryAfter time.Duration } + +func (e errNotReady) Error() string { + return fmt.Sprintf("server not ready, retry after %v", e.retryAfter) +} + +const reqTimeout = time.Second * 10 + +func doAllocate(ctx context.Context, server netip.AddrPort, discoKeys [2]key.DiscoPublic) (udprelay.ServerEndpoint, error) { + var reqBody bytes.Buffer type allocateRelayEndpointReq struct { DiscoKeys []key.DiscoPublic } a := &allocateRelayEndpointReq{ - DiscoKeys: []key.DiscoPublic{ep.c.discoPublic, remoteDisco.key}, + DiscoKeys: []key.DiscoPublic{discoKeys[0], discoKeys[1]}, } - err := json.NewEncoder(&b).Encode(a) + err := json.NewEncoder(&reqBody).Encode(a) if err != nil { - return + return udprelay.ServerEndpoint{}, err } - const reqTimeout = time.Second * 10 reqCtx, cancel := context.WithTimeout(ctx, reqTimeout) defer cancel() - req, err := http.NewRequestWithContext(reqCtx, httpm.POST, "http://"+server.String()+"/v0/relay/endpoint", &b) + req, err := http.NewRequestWithContext(reqCtx, httpm.POST, "http://"+server.String()+"/v0/relay/endpoint", &reqBody) if err != nil { - return + return udprelay.ServerEndpoint{}, err } resp, err := http.DefaultClient.Do(req) if err != nil { - return + return udprelay.ServerEndpoint{}, err } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return + switch resp.StatusCode { + case http.StatusOK: + var se udprelay.ServerEndpoint + err = json.NewDecoder(io.LimitReader(resp.Body, 4096)).Decode(&se) + return se, err + case http.StatusServiceUnavailable: + raHeader := resp.Header.Get("Retry-After") + raSeconds, err := strconv.ParseUint(raHeader, 10, 32) + if err == nil { + return udprelay.ServerEndpoint{}, errNotReady{retryAfter: time.Second * time.Duration(raSeconds)} + } + fallthrough + default: + return udprelay.ServerEndpoint{}, fmt.Errorf("non-200 status: %d", resp.StatusCode) + } +} + +func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { + // TODO(jwhited): introduce client metrics counters for notable failures + defer wg.Done() + remoteDisco := ep.disco.Load() + if remoteDisco == nil { + return + } + firstTry := true + for { + se, err := doAllocate(ctx, server, [2]key.DiscoPublic{ep.c.discoPublic, remoteDisco.key}) + if err == nil { + relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{ + ep: ep, + se: se, + }) + return + } + ep.c.logf("[v1] magicsock: relayManager: error allocating endpoint on %v for %v: %v", server, ep.discoShort(), err) + var notReady errNotReady + if firstTry && errors.As(err, ¬Ready) { + select { + case <-ctx.Done(): + return + case <-time.After(min(notReady.retryAfter, reqTimeout)): + firstTry = false + continue + } + } + return } - var se udprelay.ServerEndpoint - err = json.NewDecoder(io.LimitReader(resp.Body, 4096)).Decode(&se) - if err != nil { - return - } - relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{ - ep: ep, - se: se, - }) }