diff --git a/feature/relayserver/relayserver.go b/feature/relayserver/relayserver.go index a38587aa3..4634f3ac2 100644 --- a/feature/relayserver/relayserver.go +++ b/feature/relayserver/relayserver.go @@ -8,9 +8,11 @@ package relayserver import ( "encoding/json" "errors" + "fmt" "io" "net/http" "sync" + "time" "tailscale.com/envknob" "tailscale.com/feature" @@ -184,6 +186,12 @@ func handlePeerAPIRelayAllocateEndpoint(h ipnlocal.PeerAPIHandler, w http.Respon } ep, err := rs.AllocateEndpoint(allocateEndpointReq.DiscoKeys[0], allocateEndpointReq.DiscoKeys[1]) if err != nil { + var notReady udprelay.ErrServerNotReady + if errors.As(err, ¬Ready) { + w.Header().Set("Retry-After", fmt.Sprintf("%d", notReady.RetryAfter.Round(time.Second)/time.Second)) + httpErrAndLog(err.Error(), http.StatusServiceUnavailable) + return + } httpErrAndLog(err.Error(), http.StatusInternalServerError) return } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index f7f5868c0..8b9e95fb1 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -63,13 +63,14 @@ type Server struct { closeCh chan struct{} netChecker *netcheck.Client - mu sync.Mutex // guards the following fields - addrPorts []netip.AddrPort // the ip:port pairs returned as candidate endpoints - closed bool - lamportID uint64 - vniPool []uint32 // the pool of available VNIs - byVNI map[uint32]*serverEndpoint - byDisco map[pairOfDiscoPubKeys]*serverEndpoint + mu sync.Mutex // guards the following fields + addrDiscoveryOnce bool // addrDiscovery completed once (successfully or unsuccessfully) + addrPorts []netip.AddrPort // the ip:port pairs returned as candidate endpoints + closed bool + lamportID uint64 + vniPool []uint32 // the pool of available VNIs + byVNI map[uint32]*serverEndpoint + byDisco map[pairOfDiscoPubKeys]*serverEndpoint } // pairOfDiscoPubKeys is a pair of key.DiscoPublic. It must be constructed via @@ -321,8 +322,7 @@ func NewServer(logf logger.Logf, port int, overrideAddrs []netip.Addr) (s *Serve s.wg.Add(1) go s.endpointGCLoop() if len(overrideAddrs) > 0 { - var addrPorts set.Set[netip.AddrPort] - addrPorts.Make() + addrPorts := make(set.Set[netip.AddrPort], len(overrideAddrs)) for _, addr := range overrideAddrs { if addr.IsValid() { addrPorts.Add(netip.AddrPortFrom(addr, boundPort)) @@ -401,12 +401,12 @@ func (s *Server) addrDiscoveryLoop() { } s.mu.Lock() s.addrPorts = addrPorts + s.addrDiscoveryOnce = true s.mu.Unlock() case <-s.closeCh: return } } - } func (s *Server) listenOn(port int) (uint16, error) { @@ -521,10 +521,22 @@ func (s *Server) packetReadLoop() { var ErrServerClosed = errors.New("server closed") +// ErrServerNotReady indicates the server is not ready. Allocation should be +// requested after waiting for at least RetryAfter duration. +type ErrServerNotReady struct { + RetryAfter time.Duration +} + +func (e ErrServerNotReady) Error() string { + return fmt.Sprintf("server not ready, retry after %v", e.RetryAfter) +} + // AllocateEndpoint allocates an [endpoint.ServerEndpoint] for the provided pair // of [key.DiscoPublic]'s. If an allocation already exists for discoA and discoB // it is returned without modification/reallocation. AllocateEndpoint returns -// [ErrServerClosed] if the server has been closed. +// the following notable errors: +// 1. [ErrServerClosed] if the server has been closed. +// 2. [ErrServerNotReady] if the server is not ready. func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.ServerEndpoint, error) { s.mu.Lock() defer s.mu.Unlock() @@ -533,6 +545,9 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv } if len(s.addrPorts) == 0 { + if !s.addrDiscoveryOnce { + return endpoint.ServerEndpoint{}, ErrServerNotReady{RetryAfter: 3 * time.Second} + } return endpoint.ServerEndpoint{}, errors.New("server addrPorts are not yet known") } diff --git a/wgengine/magicsock/relaymanager.go b/wgengine/magicsock/relaymanager.go index 4ccfbb501..d149d0c59 100644 --- a/wgengine/magicsock/relaymanager.go +++ b/wgengine/magicsock/relaymanager.go @@ -7,9 +7,12 @@ import ( "bytes" "context" "encoding/json" + "errors" + "fmt" "io" "net/http" "net/netip" + "strconv" "sync" "time" @@ -716,46 +719,82 @@ func (r *relayManager) allocateAllServersRunLoop(ep *endpoint) { }() } -func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { - // TODO(jwhited): introduce client metrics counters for notable failures - defer wg.Done() - var b bytes.Buffer - remoteDisco := ep.disco.Load() - if remoteDisco == nil { - return - } +type errNotReady struct{ retryAfter time.Duration } + +func (e errNotReady) Error() string { + return fmt.Sprintf("server not ready, retry after %v", e.retryAfter) +} + +const reqTimeout = time.Second * 10 + +func doAllocate(ctx context.Context, server netip.AddrPort, discoKeys [2]key.DiscoPublic) (udprelay.ServerEndpoint, error) { + var reqBody bytes.Buffer type allocateRelayEndpointReq struct { DiscoKeys []key.DiscoPublic } a := &allocateRelayEndpointReq{ - DiscoKeys: []key.DiscoPublic{ep.c.discoPublic, remoteDisco.key}, + DiscoKeys: []key.DiscoPublic{discoKeys[0], discoKeys[1]}, } - err := json.NewEncoder(&b).Encode(a) + err := json.NewEncoder(&reqBody).Encode(a) if err != nil { - return + return udprelay.ServerEndpoint{}, err } - const reqTimeout = time.Second * 10 reqCtx, cancel := context.WithTimeout(ctx, reqTimeout) defer cancel() - req, err := http.NewRequestWithContext(reqCtx, httpm.POST, "http://"+server.String()+"/v0/relay/endpoint", &b) + req, err := http.NewRequestWithContext(reqCtx, httpm.POST, "http://"+server.String()+"/v0/relay/endpoint", &reqBody) if err != nil { - return + return udprelay.ServerEndpoint{}, err } resp, err := http.DefaultClient.Do(req) if err != nil { - return + return udprelay.ServerEndpoint{}, err } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return + switch resp.StatusCode { + case http.StatusOK: + var se udprelay.ServerEndpoint + err = json.NewDecoder(io.LimitReader(resp.Body, 4096)).Decode(&se) + return se, err + case http.StatusServiceUnavailable: + raHeader := resp.Header.Get("Retry-After") + raSeconds, err := strconv.ParseUint(raHeader, 10, 32) + if err == nil { + return udprelay.ServerEndpoint{}, errNotReady{retryAfter: time.Second * time.Duration(raSeconds)} + } + fallthrough + default: + return udprelay.ServerEndpoint{}, fmt.Errorf("non-200 status: %d", resp.StatusCode) + } +} + +func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGroup, server netip.AddrPort, ep *endpoint) { + // TODO(jwhited): introduce client metrics counters for notable failures + defer wg.Done() + remoteDisco := ep.disco.Load() + if remoteDisco == nil { + return + } + firstTry := true + for { + se, err := doAllocate(ctx, server, [2]key.DiscoPublic{ep.c.discoPublic, remoteDisco.key}) + if err == nil { + relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{ + ep: ep, + se: se, + }) + return + } + ep.c.logf("[v1] magicsock: relayManager: error allocating endpoint on %v for %v: %v", server, ep.discoShort(), err) + var notReady errNotReady + if firstTry && errors.As(err, ¬Ready) { + select { + case <-ctx.Done(): + return + case <-time.After(min(notReady.retryAfter, reqTimeout)): + firstTry = false + continue + } + } + return } - var se udprelay.ServerEndpoint - err = json.NewDecoder(io.LimitReader(resp.Body, 4096)).Decode(&se) - if err != nil { - return - } - relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{ - ep: ep, - se: se, - }) }