From 591e1d8290598de0eaf10cece33083ee49069f65 Mon Sep 17 00:00:00 2001 From: Dylan Bargatze Date: Fri, 25 Jul 2025 16:09:16 -0400 Subject: [PATCH] client, cmd, feature/relayserver, net/udprelay: initial take on tailscale debug peer-relay-sessions Signed-off-by: Dylan Bargatze --- client/local/local.go | 11 ++ cmd/tailscale/cli/debug-peer-relay.go | 47 ++++++ cmd/tailscale/cli/debug.go | 4 +- feature/relayserver/relayserver.go | 93 +++++++++++ net/udprelay/endpoint/endpoint.go | 213 ++++++++++++++++++++++++++ net/udprelay/server.go | 81 ++++++++++ 6 files changed, 448 insertions(+), 1 deletion(-) create mode 100644 cmd/tailscale/cli/debug-peer-relay.go diff --git a/client/local/local.go b/client/local/local.go index 55d14f95e..91ab9cbc3 100644 --- a/client/local/local.go +++ b/client/local/local.go @@ -35,6 +35,7 @@ import ( "tailscale.com/ipn" "tailscale.com/ipn/ipnstate" "tailscale.com/net/netutil" + "tailscale.com/net/udprelay/endpoint" "tailscale.com/paths" "tailscale.com/safesocket" "tailscale.com/tailcfg" @@ -1638,6 +1639,16 @@ func (lc *Client) DebugSetExpireIn(ctx context.Context, d time.Duration) error { return err } +// DebugPeerRelaySessions returns debug information about the current peer +// relay sessions running through this node. +func (lc *Client) DebugPeerRelaySessions(ctx context.Context) ([]endpoint.PeerRelayServerSession, error) { + body, err := lc.send(ctx, "GET", "/localapi/v0/debug-peer-relay-sessions", 200, nil) + if err != nil { + return nil, fmt.Errorf("error %w: %s", err, body) + } + return decodeJSON[[]endpoint.PeerRelayServerSession](body) +} + // StreamDebugCapture streams a pcap-formatted packet capture. // // The provided context does not determine the lifetime of the diff --git a/cmd/tailscale/cli/debug-peer-relay.go b/cmd/tailscale/cli/debug-peer-relay.go new file mode 100644 index 000000000..9052e8db1 --- /dev/null +++ b/cmd/tailscale/cli/debug-peer-relay.go @@ -0,0 +1,47 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !ios && !ts_omit_relayserver + +package cli + +import ( + "context" + + "github.com/peterbourgon/ff/v3/ffcli" +) + +func init() { + debugPeerRelayCmd = mkDebugPeerRelaySessionsCmd +} + +func mkDebugPeerRelaySessionsCmd() *ffcli.Command { + return &ffcli.Command{ + Name: "peer-relay-sessions", + ShortUsage: "tailscale debug peer-relay-sessions", + Exec: runPeerRelaySessions, + ShortHelp: "Print the current set of active peer relay sessions relayed through this node", + } +} + +func runPeerRelaySessions(ctx context.Context, args []string) error { + v, err := localClient.DebugPeerRelaySessions(ctx) + if err != nil { + return err + } + + if len(v) == 0 { + println("This peer relay server is not relaying any sessions.") + return nil + } + + println("Sessions relayed by this peer relay server:") + for _, s := range v { + printf("- Session %v: %v <-> %v <-> %v\n", s.VNI, s.ClientEndpoint[0], s.ServerEndpoint, s.ClientEndpoint[1]) + printf(" Server : disco=%v | endpoint=%v | status=%v\n", s.ServerShortDisco, s.ServerEndpoint, s.Status.OverallStatus) + printf(" Client 1: disco=%v | endpoint=%v | status=%v, %v\n", s.ClientShortDisco[0], s.ClientEndpoint[0], s.Status.ClientBindStatus[0], s.Status.ClientPingStatus[0]) + printf(" Client 2: disco=%v | endpoint=%v | status=%v, %v\n", s.ClientShortDisco[1], s.ClientEndpoint[1], s.Status.ClientBindStatus[1], s.Status.ClientPingStatus[1]) + } + + return nil +} diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index fb062fd17..4fbde7e67 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -49,7 +49,8 @@ import ( ) var ( - debugCaptureCmd func() *ffcli.Command // or nil + debugCaptureCmd func() *ffcli.Command // or nil + debugPeerRelayCmd func() *ffcli.Command // or nil ) func debugCmd() *ffcli.Command { @@ -374,6 +375,7 @@ func debugCmd() *ffcli.Command { ShortHelp: "Print the current set of candidate peer relay servers", Exec: runPeerRelayServers, }, + ccall(debugPeerRelayCmd), }...), } } diff --git a/feature/relayserver/relayserver.go b/feature/relayserver/relayserver.go index b90a62345..27e13efc7 100644 --- a/feature/relayserver/relayserver.go +++ b/feature/relayserver/relayserver.go @@ -6,12 +6,17 @@ package relayserver import ( + "encoding/json" + "errors" + "fmt" + "net/http" "sync" "tailscale.com/disco" "tailscale.com/feature" "tailscale.com/ipn" "tailscale.com/ipn/ipnext" + "tailscale.com/ipn/localapi" "tailscale.com/net/udprelay" "tailscale.com/net/udprelay/endpoint" "tailscale.com/tailcfg" @@ -29,6 +34,62 @@ const featureName = "relayserver" func init() { feature.Register(featureName) ipnext.RegisterExtension(featureName, newExtension) + localapi.Register("debug-peer-relay-sessions", servePeerRelayDebugSessions) +} + +func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + http.Error(w, "GET required", http.StatusMethodNotAllowed) + return + } + + var e *extension + if ok := h.LocalBackend().FindMatchingExtension(&e); !ok { + http.Error(w, "Peer relay extension unavailable", http.StatusInternalServerError) + return + } + + e.mu.Lock() + running := e.busDoneCh != nil + shutdown := e.shutdown + port := e.port + disabled := e.hasNodeAttrDisableRelayServer + e.mu.Unlock() + + if !running { + http.Error(w, "peer relay server is not running", http.StatusServiceUnavailable) + return + } else if shutdown { + http.Error(w, "peer relay server has been shut down", http.StatusServiceUnavailable) + return + } else if disabled { + http.Error(w, "peer relay server is disabled", http.StatusServiceUnavailable) + return + } else if port == nil { + http.Error(w, "peer relay server port is not configured", http.StatusPreconditionFailed) + return + } + + // h.Logf("peer relay server is available, running=%v shutdown=%v disabled=%v port=%v", running, shutdown, disabled, *port) + + client := e.bus.Client("relayserver.debug-peer-relay-sessions") + defer client.Close() + debugReqPub := eventbus.Publish[PeerRelaySessionsReq](client) + debugRespSub := eventbus.Subscribe[PeerRelaySessionsResp](client) + + debugReqPub.Publish(PeerRelaySessionsReq{}) + // TODO (dylan): remove this message + // h.Logf("relayserver: waiting for run loop to publish peer relay sessions...") + resp := <-debugRespSub.Events() + + // TODO (dylan): check resp.Error (or move it into PeerRelaySessions instead of leaving it in PeerRelaySessionsResp) + // TODO (dylan): what status to return if the peer relay server isn't running/configured? + j, err := json.Marshal(resp.Sessions) + if err != nil { + http.Error(w, fmt.Sprintf("failed to marshal json: %v", err), http.StatusInternalServerError) + return + } + w.Write(j) } // newExtension is an [ipnext.NewExtensionFn] that creates a new relay server @@ -59,6 +120,16 @@ type extension struct { type relayServer interface { AllocateEndpoint(discoA key.DiscoPublic, discoB key.DiscoPublic) (endpoint.ServerEndpoint, error) Close() error + GetSessions() ([]endpoint.PeerRelayServerSession, error) +} + +// TODO (dylan): doc comments +type PeerRelaySessionsReq struct{} + +// TODO (dylan): doc comments +type PeerRelaySessionsResp struct { + Sessions []endpoint.PeerRelayServerSession + Error error } // Name implements [ipnext.Extension]. @@ -119,6 +190,8 @@ func (e *extension) consumeEventbusTopics(port int) { defer close(e.busDoneCh) eventClient := e.bus.Client("relayserver.extension") + debugReqSub := eventbus.Subscribe[PeerRelaySessionsReq](eventClient) + debugRespPub := eventbus.Publish[PeerRelaySessionsResp](eventClient) reqSub := eventbus.Subscribe[magicsock.UDPRelayAllocReq](eventClient) respPub := eventbus.Publish[magicsock.UDPRelayAllocResp](eventClient) defer eventClient.Close() @@ -137,6 +210,26 @@ func (e *extension) consumeEventbusTopics(port int) { // If reqSub is done, the eventClient has been closed, which is a // signal to return. return + case <-debugReqSub.Events(): + // TODO (dylan): This is where we want to send debug session info back to the CLI. + if rs == nil { + // TODO (dylan): should we initialize the server here too + // TODO (dylan): what is the pattern for sending error values back over the event bus? + // TODO (dylan): this isn't even an error condition, expected when nobody has tried to + // allocate an endpoint...rethink, maybe add a "Status string" field to PeerRelaySessionsResp? + resp := PeerRelaySessionsResp{Error: errors.New("no peer relay sessions: server has not been contacted yet")} + debugRespPub.Publish(resp) + continue + } + sessions, err := rs.GetSessions() + if err != nil { + // TODO (dylan): should this be an errors.Join() instead with err? + prs_err := fmt.Errorf("error retrieving peer relay sessions: %v", err) + e.logf(prs_err.Error()) + debugRespPub.Publish(PeerRelaySessionsResp{Error: prs_err}) + continue + } + debugRespPub.Publish(PeerRelaySessionsResp{sessions, nil}) case req := <-reqSub.Events(): if rs == nil { var err error diff --git a/net/udprelay/endpoint/endpoint.go b/net/udprelay/endpoint/endpoint.go index 0d2a14e96..8687242b4 100644 --- a/net/udprelay/endpoint/endpoint.go +++ b/net/udprelay/endpoint/endpoint.go @@ -62,3 +62,216 @@ type ServerEndpoint struct { // bidirectional data flow. SteadyStateLifetime tstime.GoDuration } + +// TODO (dylan): doc comments +type PeerRelayServerAllocStatus int + +const ( + EndpointAllocNotStarted PeerRelayServerAllocStatus = iota + // EndpointAllocRequestReceived by the peer relay server from the allocating client + EndpointAllocRequestReceived + // EndpointAllocated on the peer relay server, but response not yet sent to allocating client + EndpointAllocated + // EndpointAllocResponseSent from the peer relay server to allocating client + EndpointAllocResponseSent + + // TODO (dylan): Should we have a status here for dead allocs that weren't bound before the + // BindLifetime timer expired? + EndpointAllocExpired +) + +func (s PeerRelayServerAllocStatus) String() string { + switch s { + case EndpointAllocNotStarted: + return "alloc not started" + case EndpointAllocRequestReceived: + return "alloc request received" + case EndpointAllocated: + return "endpoint allocated" + case EndpointAllocResponseSent: + return "alloc complete" + case EndpointAllocExpired: + return "expired" + default: + return "unknown" + } +} + +// PeerRelayServerBindStatus is the current status of the endpoint binding +// handshake between the peer relay server and a SINGLE peer relay client. Both +// clients need to bind into an endpoint for a peer relay session to be bound, +// so a peer relay server will have two PeerRelayServerBindStatus fields to +// track per session. +type PeerRelayServerBindStatus int + +// TODO (dylan): doc comments +const ( + EndpointBindNotStarted PeerRelayServerBindStatus = iota + EndpointBindRequestReceived + EndpointBindChallengeSent + EndpointBindAnswerReceived +) + +func (s PeerRelayServerBindStatus) String() string { + switch s { + case EndpointBindNotStarted: + return "binding not started" + case EndpointBindRequestReceived: + return "bind request received" + case EndpointBindChallengeSent: + return "bind challenge sent" + case EndpointBindAnswerReceived: + return "bind complete" + default: + return "unknown" + } +} + +// PeerRelayServerPingStatus is the current status of a SINGLE SIDE of the +// bidirectional disco ping exchange between two peer relay clients, as seen by +// the peer relay server. As each client will send a disco ping and should +// receive a disco pong from the other client in response, a peer relay server +// will have two PeerRelayServerPingStatus fields to track per session. +type PeerRelayServerPingStatus int + +// TODO (dylan): doc comments +const ( + DiscoPingNotStarted PeerRelayServerPingStatus = iota + DiscoPingSeen + DiscoPongSeen +) + +func (s PeerRelayServerPingStatus) String() string { + switch s { + case DiscoPingNotStarted: + return "ping not started" + case DiscoPingSeen: + return "disco ping seen" + case DiscoPongSeen: + return "disco pong seen" + default: + return "unknown" + } +} + +// TODO (dylan): doc comments +type PeerRelayServerStatus int + +// TODO (dylan): doc comments +const ( + AllocatingEndpoint PeerRelayServerStatus = iota + BindingEndpoint + BidirectionalPinging + ServerSessionEstablished +) + +func (s PeerRelayServerStatus) String() string { + switch s { + case AllocatingEndpoint: + return "allocating endpoint allocation" + case BindingEndpoint: + return "binding endpoint" + case BidirectionalPinging: + return "clients pinging" + case ServerSessionEstablished: + return "session established" + default: + return "unknown" + } +} + +// TODO (dylan): doc comments +type PeerRelayServerSessionStatus struct { + AllocStatus PeerRelayServerAllocStatus + ClientBindStatus [2]PeerRelayServerBindStatus + ClientPingStatus [2]PeerRelayServerPingStatus + ClientPacketsRx [2]uint64 + ClientPacketsFwd [2]uint64 + + OverallStatus PeerRelayServerStatus +} + +func NewPeerRelayServerSessionStatus() PeerRelayServerSessionStatus { + return PeerRelayServerSessionStatus{ + AllocStatus: EndpointAllocNotStarted, + ClientBindStatus: [2]PeerRelayServerBindStatus{EndpointBindNotStarted, EndpointBindNotStarted}, + ClientPingStatus: [2]PeerRelayServerPingStatus{DiscoPingNotStarted, DiscoPingNotStarted}, + OverallStatus: AllocatingEndpoint, + } +} + +// TODO (dylan): doc comments +type PeerRelayClientAllocStatus int + +const ( + // EndpointAllocRequestSent from the allocating client to the peer relay server via DERP + EndpointAllocRequestSent PeerRelayClientAllocStatus = iota + // EndpointAllocResponseReceived by the allocating client from the peer relay server via DERP + EndpointAllocResponseReceived + // CallMeMaybeViaSent from the allocating client to the target client via DERP + CallMeMaybeViaSent + // CallMeMaybeViaReceived by the target client from the allocating client via DERP + CallMeMaybeViaReceived +) + +// TODO (dylan): doc comments +type PeerRelayClientBindStatus int + +const ( + // EndpointBindHandshakeSent from this client to the peer relay server + EndpointBindHandshakeSent PeerRelayClientBindStatus = iota + // EndpointBindChallengeReceived by this client from the peer relay server + EndpointBindChallengeReceived + // EndpointBindAnswerSent from this client to the peer relay server + EndpointBindAnswerSent +) + +// TODO (dylan): doc comments +type PeerRelayClientPingStatus int + +// TODO (dylan): doc comments +const ( + DiscoPingSent PeerRelayClientPingStatus = iota + DiscoPingReceived +) + +// TODO (dylan): doc comments +type PeerRelayClientStatus int + +// TODO (dylan): doc comments +const ( + EndpointAllocation PeerRelayClientStatus = iota + EndpointBinding + Pinging + ClientSessionEstablished +) + +// TODO (dylan): doc comments +type PeerRelayClientSessionStatus struct { + AllocStatus PeerRelayClientAllocStatus + BindStatus PeerRelayClientBindStatus + PingStatus PeerRelayClientPingStatus + + OverallStatus PeerRelayClientStatus +} + +// TODO (dylan): doc comments +type PeerRelaySessionBaseStatus struct { + VNI uint32 + ClientShortDisco [2]string + ClientEndpoint [2]netip.AddrPort + ServerShortDisco string + ServerEndpoint netip.AddrPort +} + +// TODO (dylan): doc comments +type PeerRelayServerSession struct { + Status PeerRelayServerSessionStatus + PeerRelaySessionBaseStatus +} + +// TODO (dylan): doc comments +type PeerRelayClientSession struct { + Status PeerRelayClientStatus + PeerRelaySessionBaseStatus +} diff --git a/net/udprelay/server.go b/net/udprelay/server.go index c34a4b5f6..95840edb6 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -94,6 +94,8 @@ type serverEndpoint struct { lamportID uint64 vni uint32 allocatedAt time.Time + + status endpoint.PeerRelayServerSessionStatus } func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex int, discoMsg disco.Message, conn *net.UDPConn, serverDisco key.DiscoPublic) { @@ -133,6 +135,9 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex } e.handshakeGeneration[senderIndex] = discoMsg.Generation e.handshakeAddrPorts[senderIndex] = from + // TODO (dylan): assert current e.status.AllocStatus is EndpointAllocated + // TODO (dylan): assert e.status.ClientBindStatus[senderIndex] is not already EndpointBindRequestReceived or later + e.status.ClientBindStatus[senderIndex] = endpoint.EndpointBindRequestReceived m := new(disco.BindUDPRelayEndpointChallenge) m.VNI = e.vni m.Generation = discoMsg.Generation @@ -150,6 +155,8 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex box := e.discoSharedSecrets[senderIndex].Seal(m.AppendMarshal(nil)) reply = append(reply, box...) conn.WriteMsgUDPAddrPort(reply, nil, from) + e.status.ClientBindStatus[senderIndex] = endpoint.EndpointBindChallengeSent + e.status.OverallStatus = endpoint.BindingEndpoint return case *disco.BindUDPRelayEndpointAnswer: err := validateVNIAndRemoteKey(discoMsg.BindUDPRelayEndpointCommon) @@ -167,6 +174,14 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex } // Handshake complete. Update the binding for this sender. e.boundAddrPorts[senderIndex] = from + + // TODO (dylan): assert e.status.AllocStatus is EndpointAllocated + // TODO (dylan): assert e.status.ClientBindStatus[senderIndex] is endpoint.EndpointBindChallengeSent + // TODO (dylan): assert e.status.ClientBindStatus[senderIndex] is not already EndpointBindAnswerReceived or later + e.status.ClientBindStatus[senderIndex] = endpoint.EndpointBindAnswerReceived + if e.isBound() { + e.status.OverallStatus = endpoint.BidirectionalPinging + } e.lastSeen[senderIndex] = time.Now() // record last seen as bound time return default: @@ -220,13 +235,41 @@ func (e *serverEndpoint) handlePacket(from netip.AddrPort, gh packet.GeneveHeade case from == e.boundAddrPorts[0]: e.lastSeen[0] = time.Now() to = e.boundAddrPorts[1] + e.status.ClientPacketsRx[0]++ + switch e.status.ClientPingStatus[0] { + case endpoint.DiscoPingNotStarted: + e.status.ClientPingStatus[0] = endpoint.DiscoPingSeen + break + case endpoint.DiscoPingSeen: + e.status.ClientPingStatus[0] = endpoint.DiscoPongSeen + break + default: + break + } + e.status.ClientPacketsFwd[1]++ case from == e.boundAddrPorts[1]: e.lastSeen[1] = time.Now() to = e.boundAddrPorts[0] + e.status.ClientPacketsRx[1]++ + switch e.status.ClientPingStatus[1] { + case endpoint.DiscoPingNotStarted: + e.status.ClientPingStatus[1] = endpoint.DiscoPingSeen + break + case endpoint.DiscoPingSeen: + e.status.ClientPingStatus[1] = endpoint.DiscoPongSeen + break + default: + break + } + e.status.ClientPacketsFwd[0]++ default: // unrecognized source return } + + if e.status.OverallStatus == endpoint.BidirectionalPinging && e.status.ClientPingStatus[0] == endpoint.DiscoPongSeen && e.status.ClientPingStatus[1] == endpoint.DiscoPongSeen { + e.status.OverallStatus = endpoint.ServerSessionEstablished + } // Relay the packet towards the other party via the socket associated // with the destination's address family. If source and destination // address families are matching we tx on the same socket the packet @@ -237,6 +280,7 @@ func (e *serverEndpoint) handlePacket(from netip.AddrPort, gh packet.GeneveHeade } else if otherAFSocket != nil { otherAFSocket.WriteMsgUDPAddrPort(b, nil, to) } + return } @@ -637,10 +681,13 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv } s.lamportID++ + status := endpoint.NewPeerRelayServerSessionStatus() + status.AllocStatus = endpoint.EndpointAllocRequestReceived e = &serverEndpoint{ discoPubKeys: pair, lamportID: s.lamportID, allocatedAt: time.Now(), + status: status, } e.discoSharedSecrets[0] = s.disco.Shared(e.discoPubKeys.Get()[0]) e.discoSharedSecrets[1] = s.disco.Shared(e.discoPubKeys.Get()[1]) @@ -660,3 +707,37 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv SteadyStateLifetime: tstime.GoDuration{Duration: s.steadyStateLifetime}, }, nil } + +func (s *Server) GetSessions() ([]endpoint.PeerRelayServerSession, error) { + var sessions = make([]endpoint.PeerRelayServerSession, 0) + for k, v := range s.byDisco { + var c1Ep, c2Ep netip.AddrPort + + c1Disco := k.Get()[0].ShortString() + if v.boundAddrPorts[0].IsValid() { + c1Ep = v.boundAddrPorts[0] + } else if v.handshakeAddrPorts[0].IsValid() { + c1Ep = v.handshakeAddrPorts[0] + } + + c2Disco := k.Get()[1].ShortString() + if v.boundAddrPorts[1].IsValid() { + c2Ep = v.boundAddrPorts[1] + } else if v.handshakeAddrPorts[1].IsValid() { + c2Ep = v.handshakeAddrPorts[1] + } + sessions = append(sessions, endpoint.PeerRelayServerSession{ + // TODO (dylan): fix overall status + Status: v.status, + PeerRelaySessionBaseStatus: endpoint.PeerRelaySessionBaseStatus{ + VNI: v.vni, + ClientShortDisco: [2]string{c1Disco, c2Disco}, + ClientEndpoint: [2]netip.AddrPort{c1Ep, c2Ep}, + ServerShortDisco: s.discoPublic.ShortString(), + // TODO (dylan): disambiguate which addrPort to use here + ServerEndpoint: s.addrPorts[0], + }, + }) + } + return sessions, nil +}