client, cmd, feature/relayserver, net/udprelay: initial take on tailscale debug peer-relay-sessions

Signed-off-by: Dylan Bargatze <dylan@tailscale.com>
This commit is contained in:
Dylan Bargatze 2025-07-25 16:09:16 -04:00
parent bfebf870ae
commit 591e1d8290
No known key found for this signature in database
6 changed files with 448 additions and 1 deletions

View File

@ -35,6 +35,7 @@ import (
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/netutil"
"tailscale.com/net/udprelay/endpoint"
"tailscale.com/paths"
"tailscale.com/safesocket"
"tailscale.com/tailcfg"
@ -1638,6 +1639,16 @@ func (lc *Client) DebugSetExpireIn(ctx context.Context, d time.Duration) error {
return err
}
// DebugPeerRelaySessions returns debug information about the current peer
// relay sessions running through this node.
func (lc *Client) DebugPeerRelaySessions(ctx context.Context) ([]endpoint.PeerRelayServerSession, error) {
body, err := lc.send(ctx, "GET", "/localapi/v0/debug-peer-relay-sessions", 200, nil)
if err != nil {
return nil, fmt.Errorf("error %w: %s", err, body)
}
return decodeJSON[[]endpoint.PeerRelayServerSession](body)
}
// StreamDebugCapture streams a pcap-formatted packet capture.
//
// The provided context does not determine the lifetime of the

View File

@ -0,0 +1,47 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !ios && !ts_omit_relayserver
package cli
import (
"context"
"github.com/peterbourgon/ff/v3/ffcli"
)
func init() {
debugPeerRelayCmd = mkDebugPeerRelaySessionsCmd
}
func mkDebugPeerRelaySessionsCmd() *ffcli.Command {
return &ffcli.Command{
Name: "peer-relay-sessions",
ShortUsage: "tailscale debug peer-relay-sessions",
Exec: runPeerRelaySessions,
ShortHelp: "Print the current set of active peer relay sessions relayed through this node",
}
}
func runPeerRelaySessions(ctx context.Context, args []string) error {
v, err := localClient.DebugPeerRelaySessions(ctx)
if err != nil {
return err
}
if len(v) == 0 {
println("This peer relay server is not relaying any sessions.")
return nil
}
println("Sessions relayed by this peer relay server:")
for _, s := range v {
printf("- Session %v: %v <-> %v <-> %v\n", s.VNI, s.ClientEndpoint[0], s.ServerEndpoint, s.ClientEndpoint[1])
printf(" Server : disco=%v | endpoint=%v | status=%v\n", s.ServerShortDisco, s.ServerEndpoint, s.Status.OverallStatus)
printf(" Client 1: disco=%v | endpoint=%v | status=%v, %v\n", s.ClientShortDisco[0], s.ClientEndpoint[0], s.Status.ClientBindStatus[0], s.Status.ClientPingStatus[0])
printf(" Client 2: disco=%v | endpoint=%v | status=%v, %v\n", s.ClientShortDisco[1], s.ClientEndpoint[1], s.Status.ClientBindStatus[1], s.Status.ClientPingStatus[1])
}
return nil
}

View File

@ -49,7 +49,8 @@ import (
)
var (
debugCaptureCmd func() *ffcli.Command // or nil
debugCaptureCmd func() *ffcli.Command // or nil
debugPeerRelayCmd func() *ffcli.Command // or nil
)
func debugCmd() *ffcli.Command {
@ -374,6 +375,7 @@ func debugCmd() *ffcli.Command {
ShortHelp: "Print the current set of candidate peer relay servers",
Exec: runPeerRelayServers,
},
ccall(debugPeerRelayCmd),
}...),
}
}

View File

@ -6,12 +6,17 @@
package relayserver
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"tailscale.com/disco"
"tailscale.com/feature"
"tailscale.com/ipn"
"tailscale.com/ipn/ipnext"
"tailscale.com/ipn/localapi"
"tailscale.com/net/udprelay"
"tailscale.com/net/udprelay/endpoint"
"tailscale.com/tailcfg"
@ -29,6 +34,62 @@ const featureName = "relayserver"
func init() {
feature.Register(featureName)
ipnext.RegisterExtension(featureName, newExtension)
localapi.Register("debug-peer-relay-sessions", servePeerRelayDebugSessions)
}
func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "GET required", http.StatusMethodNotAllowed)
return
}
var e *extension
if ok := h.LocalBackend().FindMatchingExtension(&e); !ok {
http.Error(w, "Peer relay extension unavailable", http.StatusInternalServerError)
return
}
e.mu.Lock()
running := e.busDoneCh != nil
shutdown := e.shutdown
port := e.port
disabled := e.hasNodeAttrDisableRelayServer
e.mu.Unlock()
if !running {
http.Error(w, "peer relay server is not running", http.StatusServiceUnavailable)
return
} else if shutdown {
http.Error(w, "peer relay server has been shut down", http.StatusServiceUnavailable)
return
} else if disabled {
http.Error(w, "peer relay server is disabled", http.StatusServiceUnavailable)
return
} else if port == nil {
http.Error(w, "peer relay server port is not configured", http.StatusPreconditionFailed)
return
}
// h.Logf("peer relay server is available, running=%v shutdown=%v disabled=%v port=%v", running, shutdown, disabled, *port)
client := e.bus.Client("relayserver.debug-peer-relay-sessions")
defer client.Close()
debugReqPub := eventbus.Publish[PeerRelaySessionsReq](client)
debugRespSub := eventbus.Subscribe[PeerRelaySessionsResp](client)
debugReqPub.Publish(PeerRelaySessionsReq{})
// TODO (dylan): remove this message
// h.Logf("relayserver: waiting for run loop to publish peer relay sessions...")
resp := <-debugRespSub.Events()
// TODO (dylan): check resp.Error (or move it into PeerRelaySessions instead of leaving it in PeerRelaySessionsResp)
// TODO (dylan): what status to return if the peer relay server isn't running/configured?
j, err := json.Marshal(resp.Sessions)
if err != nil {
http.Error(w, fmt.Sprintf("failed to marshal json: %v", err), http.StatusInternalServerError)
return
}
w.Write(j)
}
// newExtension is an [ipnext.NewExtensionFn] that creates a new relay server
@ -59,6 +120,16 @@ type extension struct {
type relayServer interface {
AllocateEndpoint(discoA key.DiscoPublic, discoB key.DiscoPublic) (endpoint.ServerEndpoint, error)
Close() error
GetSessions() ([]endpoint.PeerRelayServerSession, error)
}
// TODO (dylan): doc comments
type PeerRelaySessionsReq struct{}
// TODO (dylan): doc comments
type PeerRelaySessionsResp struct {
Sessions []endpoint.PeerRelayServerSession
Error error
}
// Name implements [ipnext.Extension].
@ -119,6 +190,8 @@ func (e *extension) consumeEventbusTopics(port int) {
defer close(e.busDoneCh)
eventClient := e.bus.Client("relayserver.extension")
debugReqSub := eventbus.Subscribe[PeerRelaySessionsReq](eventClient)
debugRespPub := eventbus.Publish[PeerRelaySessionsResp](eventClient)
reqSub := eventbus.Subscribe[magicsock.UDPRelayAllocReq](eventClient)
respPub := eventbus.Publish[magicsock.UDPRelayAllocResp](eventClient)
defer eventClient.Close()
@ -137,6 +210,26 @@ func (e *extension) consumeEventbusTopics(port int) {
// If reqSub is done, the eventClient has been closed, which is a
// signal to return.
return
case <-debugReqSub.Events():
// TODO (dylan): This is where we want to send debug session info back to the CLI.
if rs == nil {
// TODO (dylan): should we initialize the server here too
// TODO (dylan): what is the pattern for sending error values back over the event bus?
// TODO (dylan): this isn't even an error condition, expected when nobody has tried to
// allocate an endpoint...rethink, maybe add a "Status string" field to PeerRelaySessionsResp?
resp := PeerRelaySessionsResp{Error: errors.New("no peer relay sessions: server has not been contacted yet")}
debugRespPub.Publish(resp)
continue
}
sessions, err := rs.GetSessions()
if err != nil {
// TODO (dylan): should this be an errors.Join() instead with err?
prs_err := fmt.Errorf("error retrieving peer relay sessions: %v", err)
e.logf(prs_err.Error())
debugRespPub.Publish(PeerRelaySessionsResp{Error: prs_err})
continue
}
debugRespPub.Publish(PeerRelaySessionsResp{sessions, nil})
case req := <-reqSub.Events():
if rs == nil {
var err error

View File

@ -62,3 +62,216 @@ type ServerEndpoint struct {
// bidirectional data flow.
SteadyStateLifetime tstime.GoDuration
}
// TODO (dylan): doc comments
type PeerRelayServerAllocStatus int
const (
EndpointAllocNotStarted PeerRelayServerAllocStatus = iota
// EndpointAllocRequestReceived by the peer relay server from the allocating client
EndpointAllocRequestReceived
// EndpointAllocated on the peer relay server, but response not yet sent to allocating client
EndpointAllocated
// EndpointAllocResponseSent from the peer relay server to allocating client
EndpointAllocResponseSent
// TODO (dylan): Should we have a status here for dead allocs that weren't bound before the
// BindLifetime timer expired?
EndpointAllocExpired
)
func (s PeerRelayServerAllocStatus) String() string {
switch s {
case EndpointAllocNotStarted:
return "alloc not started"
case EndpointAllocRequestReceived:
return "alloc request received"
case EndpointAllocated:
return "endpoint allocated"
case EndpointAllocResponseSent:
return "alloc complete"
case EndpointAllocExpired:
return "expired"
default:
return "unknown"
}
}
// PeerRelayServerBindStatus is the current status of the endpoint binding
// handshake between the peer relay server and a SINGLE peer relay client. Both
// clients need to bind into an endpoint for a peer relay session to be bound,
// so a peer relay server will have two PeerRelayServerBindStatus fields to
// track per session.
type PeerRelayServerBindStatus int
// TODO (dylan): doc comments
const (
EndpointBindNotStarted PeerRelayServerBindStatus = iota
EndpointBindRequestReceived
EndpointBindChallengeSent
EndpointBindAnswerReceived
)
func (s PeerRelayServerBindStatus) String() string {
switch s {
case EndpointBindNotStarted:
return "binding not started"
case EndpointBindRequestReceived:
return "bind request received"
case EndpointBindChallengeSent:
return "bind challenge sent"
case EndpointBindAnswerReceived:
return "bind complete"
default:
return "unknown"
}
}
// PeerRelayServerPingStatus is the current status of a SINGLE SIDE of the
// bidirectional disco ping exchange between two peer relay clients, as seen by
// the peer relay server. As each client will send a disco ping and should
// receive a disco pong from the other client in response, a peer relay server
// will have two PeerRelayServerPingStatus fields to track per session.
type PeerRelayServerPingStatus int
// TODO (dylan): doc comments
const (
DiscoPingNotStarted PeerRelayServerPingStatus = iota
DiscoPingSeen
DiscoPongSeen
)
func (s PeerRelayServerPingStatus) String() string {
switch s {
case DiscoPingNotStarted:
return "ping not started"
case DiscoPingSeen:
return "disco ping seen"
case DiscoPongSeen:
return "disco pong seen"
default:
return "unknown"
}
}
// TODO (dylan): doc comments
type PeerRelayServerStatus int
// TODO (dylan): doc comments
const (
AllocatingEndpoint PeerRelayServerStatus = iota
BindingEndpoint
BidirectionalPinging
ServerSessionEstablished
)
func (s PeerRelayServerStatus) String() string {
switch s {
case AllocatingEndpoint:
return "allocating endpoint allocation"
case BindingEndpoint:
return "binding endpoint"
case BidirectionalPinging:
return "clients pinging"
case ServerSessionEstablished:
return "session established"
default:
return "unknown"
}
}
// TODO (dylan): doc comments
type PeerRelayServerSessionStatus struct {
AllocStatus PeerRelayServerAllocStatus
ClientBindStatus [2]PeerRelayServerBindStatus
ClientPingStatus [2]PeerRelayServerPingStatus
ClientPacketsRx [2]uint64
ClientPacketsFwd [2]uint64
OverallStatus PeerRelayServerStatus
}
func NewPeerRelayServerSessionStatus() PeerRelayServerSessionStatus {
return PeerRelayServerSessionStatus{
AllocStatus: EndpointAllocNotStarted,
ClientBindStatus: [2]PeerRelayServerBindStatus{EndpointBindNotStarted, EndpointBindNotStarted},
ClientPingStatus: [2]PeerRelayServerPingStatus{DiscoPingNotStarted, DiscoPingNotStarted},
OverallStatus: AllocatingEndpoint,
}
}
// TODO (dylan): doc comments
type PeerRelayClientAllocStatus int
const (
// EndpointAllocRequestSent from the allocating client to the peer relay server via DERP
EndpointAllocRequestSent PeerRelayClientAllocStatus = iota
// EndpointAllocResponseReceived by the allocating client from the peer relay server via DERP
EndpointAllocResponseReceived
// CallMeMaybeViaSent from the allocating client to the target client via DERP
CallMeMaybeViaSent
// CallMeMaybeViaReceived by the target client from the allocating client via DERP
CallMeMaybeViaReceived
)
// TODO (dylan): doc comments
type PeerRelayClientBindStatus int
const (
// EndpointBindHandshakeSent from this client to the peer relay server
EndpointBindHandshakeSent PeerRelayClientBindStatus = iota
// EndpointBindChallengeReceived by this client from the peer relay server
EndpointBindChallengeReceived
// EndpointBindAnswerSent from this client to the peer relay server
EndpointBindAnswerSent
)
// TODO (dylan): doc comments
type PeerRelayClientPingStatus int
// TODO (dylan): doc comments
const (
DiscoPingSent PeerRelayClientPingStatus = iota
DiscoPingReceived
)
// TODO (dylan): doc comments
type PeerRelayClientStatus int
// TODO (dylan): doc comments
const (
EndpointAllocation PeerRelayClientStatus = iota
EndpointBinding
Pinging
ClientSessionEstablished
)
// TODO (dylan): doc comments
type PeerRelayClientSessionStatus struct {
AllocStatus PeerRelayClientAllocStatus
BindStatus PeerRelayClientBindStatus
PingStatus PeerRelayClientPingStatus
OverallStatus PeerRelayClientStatus
}
// TODO (dylan): doc comments
type PeerRelaySessionBaseStatus struct {
VNI uint32
ClientShortDisco [2]string
ClientEndpoint [2]netip.AddrPort
ServerShortDisco string
ServerEndpoint netip.AddrPort
}
// TODO (dylan): doc comments
type PeerRelayServerSession struct {
Status PeerRelayServerSessionStatus
PeerRelaySessionBaseStatus
}
// TODO (dylan): doc comments
type PeerRelayClientSession struct {
Status PeerRelayClientStatus
PeerRelaySessionBaseStatus
}

View File

@ -94,6 +94,8 @@ type serverEndpoint struct {
lamportID uint64
vni uint32
allocatedAt time.Time
status endpoint.PeerRelayServerSessionStatus
}
func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex int, discoMsg disco.Message, conn *net.UDPConn, serverDisco key.DiscoPublic) {
@ -133,6 +135,9 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex
}
e.handshakeGeneration[senderIndex] = discoMsg.Generation
e.handshakeAddrPorts[senderIndex] = from
// TODO (dylan): assert current e.status.AllocStatus is EndpointAllocated
// TODO (dylan): assert e.status.ClientBindStatus[senderIndex] is not already EndpointBindRequestReceived or later
e.status.ClientBindStatus[senderIndex] = endpoint.EndpointBindRequestReceived
m := new(disco.BindUDPRelayEndpointChallenge)
m.VNI = e.vni
m.Generation = discoMsg.Generation
@ -150,6 +155,8 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex
box := e.discoSharedSecrets[senderIndex].Seal(m.AppendMarshal(nil))
reply = append(reply, box...)
conn.WriteMsgUDPAddrPort(reply, nil, from)
e.status.ClientBindStatus[senderIndex] = endpoint.EndpointBindChallengeSent
e.status.OverallStatus = endpoint.BindingEndpoint
return
case *disco.BindUDPRelayEndpointAnswer:
err := validateVNIAndRemoteKey(discoMsg.BindUDPRelayEndpointCommon)
@ -167,6 +174,14 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex
}
// Handshake complete. Update the binding for this sender.
e.boundAddrPorts[senderIndex] = from
// TODO (dylan): assert e.status.AllocStatus is EndpointAllocated
// TODO (dylan): assert e.status.ClientBindStatus[senderIndex] is endpoint.EndpointBindChallengeSent
// TODO (dylan): assert e.status.ClientBindStatus[senderIndex] is not already EndpointBindAnswerReceived or later
e.status.ClientBindStatus[senderIndex] = endpoint.EndpointBindAnswerReceived
if e.isBound() {
e.status.OverallStatus = endpoint.BidirectionalPinging
}
e.lastSeen[senderIndex] = time.Now() // record last seen as bound time
return
default:
@ -220,13 +235,41 @@ func (e *serverEndpoint) handlePacket(from netip.AddrPort, gh packet.GeneveHeade
case from == e.boundAddrPorts[0]:
e.lastSeen[0] = time.Now()
to = e.boundAddrPorts[1]
e.status.ClientPacketsRx[0]++
switch e.status.ClientPingStatus[0] {
case endpoint.DiscoPingNotStarted:
e.status.ClientPingStatus[0] = endpoint.DiscoPingSeen
break
case endpoint.DiscoPingSeen:
e.status.ClientPingStatus[0] = endpoint.DiscoPongSeen
break
default:
break
}
e.status.ClientPacketsFwd[1]++
case from == e.boundAddrPorts[1]:
e.lastSeen[1] = time.Now()
to = e.boundAddrPorts[0]
e.status.ClientPacketsRx[1]++
switch e.status.ClientPingStatus[1] {
case endpoint.DiscoPingNotStarted:
e.status.ClientPingStatus[1] = endpoint.DiscoPingSeen
break
case endpoint.DiscoPingSeen:
e.status.ClientPingStatus[1] = endpoint.DiscoPongSeen
break
default:
break
}
e.status.ClientPacketsFwd[0]++
default:
// unrecognized source
return
}
if e.status.OverallStatus == endpoint.BidirectionalPinging && e.status.ClientPingStatus[0] == endpoint.DiscoPongSeen && e.status.ClientPingStatus[1] == endpoint.DiscoPongSeen {
e.status.OverallStatus = endpoint.ServerSessionEstablished
}
// Relay the packet towards the other party via the socket associated
// with the destination's address family. If source and destination
// address families are matching we tx on the same socket the packet
@ -237,6 +280,7 @@ func (e *serverEndpoint) handlePacket(from netip.AddrPort, gh packet.GeneveHeade
} else if otherAFSocket != nil {
otherAFSocket.WriteMsgUDPAddrPort(b, nil, to)
}
return
}
@ -637,10 +681,13 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
}
s.lamportID++
status := endpoint.NewPeerRelayServerSessionStatus()
status.AllocStatus = endpoint.EndpointAllocRequestReceived
e = &serverEndpoint{
discoPubKeys: pair,
lamportID: s.lamportID,
allocatedAt: time.Now(),
status: status,
}
e.discoSharedSecrets[0] = s.disco.Shared(e.discoPubKeys.Get()[0])
e.discoSharedSecrets[1] = s.disco.Shared(e.discoPubKeys.Get()[1])
@ -660,3 +707,37 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
SteadyStateLifetime: tstime.GoDuration{Duration: s.steadyStateLifetime},
}, nil
}
func (s *Server) GetSessions() ([]endpoint.PeerRelayServerSession, error) {
var sessions = make([]endpoint.PeerRelayServerSession, 0)
for k, v := range s.byDisco {
var c1Ep, c2Ep netip.AddrPort
c1Disco := k.Get()[0].ShortString()
if v.boundAddrPorts[0].IsValid() {
c1Ep = v.boundAddrPorts[0]
} else if v.handshakeAddrPorts[0].IsValid() {
c1Ep = v.handshakeAddrPorts[0]
}
c2Disco := k.Get()[1].ShortString()
if v.boundAddrPorts[1].IsValid() {
c2Ep = v.boundAddrPorts[1]
} else if v.handshakeAddrPorts[1].IsValid() {
c2Ep = v.handshakeAddrPorts[1]
}
sessions = append(sessions, endpoint.PeerRelayServerSession{
// TODO (dylan): fix overall status
Status: v.status,
PeerRelaySessionBaseStatus: endpoint.PeerRelaySessionBaseStatus{
VNI: v.vni,
ClientShortDisco: [2]string{c1Disco, c2Disco},
ClientEndpoint: [2]netip.AddrPort{c1Ep, c2Ep},
ServerShortDisco: s.discoPublic.ShortString(),
// TODO (dylan): disambiguate which addrPort to use here
ServerEndpoint: s.addrPorts[0],
},
})
}
return sessions, nil
}