client/local, cmd/tailscale/cli, feature/relayserver, net/udprelay: interim commit

Signed-off-by: Dylan Bargatze <dylan@tailscale.com>
This commit is contained in:
Dylan Bargatze 2025-07-29 19:50:48 -04:00
parent 70569bb937
commit 0bc2f5ad7c
No known key found for this signature in database
5 changed files with 188 additions and 69 deletions

View File

@ -1641,12 +1641,12 @@ func (lc *Client) DebugSetExpireIn(ctx context.Context, d time.Duration) error {
// DebugPeerRelaySessions returns debug information about the current peer
// relay sessions running through this node.
func (lc *Client) DebugPeerRelaySessions(ctx context.Context) ([]status.ServerSession, error) {
func (lc *Client) DebugPeerRelaySessions(ctx context.Context) (*status.ServerStatus, error) {
body, err := lc.send(ctx, "GET", "/localapi/v0/debug-peer-relay-sessions", 200, nil)
if err != nil {
return nil, fmt.Errorf("error %w: %s", err, body)
}
return decodeJSON[[]status.ServerSession](body)
return decodeJSON[*status.ServerStatus](body)
}
// StreamDebugCapture streams a pcap-formatted packet capture.

View File

@ -6,9 +6,14 @@
package cli
import (
"bytes"
"cmp"
"context"
"fmt"
"slices"
"github.com/peterbourgon/ff/v3/ffcli"
"tailscale.com/net/udprelay/status"
)
func init() {
@ -25,23 +30,75 @@ func mkDebugPeerRelaySessionsCmd() *ffcli.Command {
}
func runPeerRelaySessions(ctx context.Context, args []string) error {
v, err := localClient.DebugPeerRelaySessions(ctx)
srv, err := localClient.DebugPeerRelaySessions(ctx)
if err != nil {
return err
}
if len(v) == 0 {
println("This peer relay server is not relaying any sessions.")
var buf bytes.Buffer
f := func(format string, a ...any) { fmt.Fprintf(&buf, format, a...) }
valid_state := false
f("Server status : ")
switch srv.State {
case status.Disabled:
f("disabled (via node capability attribute 'disable-relay-server')")
case status.ShutDown:
f("shut down")
case status.NotConfigured:
f("not configured (you can configure the port with 'sudo tailscale set --relay-server-port=<PORT>')")
case status.Uninitialized:
valid_state = true
f("listening on port %v", srv.UDPPort)
case status.Running:
valid_state = true
f("running on port %v", srv.UDPPort)
default:
panic(fmt.Sprintf("unexpected status.ServerState: %#v", srv.State))
}
f("\n")
if !valid_state {
Stdout.Write(buf.Bytes())
return nil
}
println("Sessions relayed by this peer relay server:")
for _, s := range v {
printf("- Session %v: %v <-> %v <-> %v\n", s.VNI, s.ClientEndpoint[0], s.ServerEndpoint, s.ClientEndpoint[1])
printf(" Server : disco=%v | endpoint=%v | status=%v\n", s.ServerShortDisco, s.ServerEndpoint, s.Status.OverallStatus)
printf(" Client 1: disco=%v | endpoint=%v | status=%v, %v\n", s.ClientShortDisco[0], s.ClientEndpoint[0], s.Status.ClientBindStatus[0], s.Status.ClientPingStatus[0])
printf(" Client 2: disco=%v | endpoint=%v | status=%v, %v\n", s.ClientShortDisco[1], s.ClientEndpoint[1], s.Status.ClientBindStatus[1], s.Status.ClientPingStatus[1])
f("Active sessions: %d\n", len(srv.Sessions))
if len(srv.Sessions) == 0 {
Stdout.Write(buf.Bytes())
return nil
}
srvStr := func(s status.ServerSession) string {
return fmt.Sprintf("%v[%s]", s.ServerEndpoint, s.ServerShortDisco)
}
cliStr := func(s status.ServerSession, idx int) string {
return fmt.Sprintf("%v[%s]", s.ClientEndpoint[idx], s.ClientShortDisco[idx])
}
pktStr := func(s status.ServerSession, idx int) string {
return fmt.Sprintf("tx %d rx %d", s.Status.ClientPacketsRx[idx], s.Status.ClientPacketsFwd[idx])
}
byteStr := func(s status.ServerSession, idx int) string {
return fmt.Sprintf("tx %dB rx %dB", s.Status.ClientBytesRx[idx], s.Status.ClientBytesFwd[idx])
}
slices.SortFunc(srv.Sessions, func(s1, s2 status.ServerSession) int { return cmp.Compare(s1.VNI, s2.VNI) })
f("\n%-8s %-41s %-41s %-41s\n", "VNI", "Server", "Client 1", "Client 2")
for _, s := range srv.Sessions {
f("%-8d %-41s %-41s %-41s\n",
s.VNI,
srvStr(s),
cliStr(s, 0),
cliStr(s, 1),
// TODO (dylan): Status
)
f("%-8s %-41s %-41s %-41s\n", "", "", pktStr(s, 0), pktStr(s, 1))
f("%-8s %-41s %-41s %-41s\n", "", "", byteStr(s, 0), byteStr(s, 1))
}
Stdout.Write(buf.Bytes())
return nil
}

View File

@ -7,7 +7,6 @@ package relayserver
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
@ -38,6 +37,7 @@ func init() {
localapi.Register("debug-peer-relay-sessions", servePeerRelayDebugSessions)
}
// TODO (dylan): doc comments
func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "GET required", http.StatusMethodNotAllowed)
@ -46,46 +46,17 @@ func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r *
var e *extension
if ok := h.LocalBackend().FindMatchingExtension(&e); !ok {
http.Error(w, "Peer relay extension unavailable", http.StatusInternalServerError)
http.Error(w, "peer relay server extension unavailable", http.StatusInternalServerError)
return
}
e.mu.Lock()
running := e.busDoneCh != nil
shutdown := e.shutdown
port := e.port
disabled := e.hasNodeAttrDisableRelayServer
e.mu.Unlock()
if !running {
http.Error(w, "peer relay server is not running", http.StatusServiceUnavailable)
return
} else if shutdown {
http.Error(w, "peer relay server has been shut down", http.StatusServiceUnavailable)
return
} else if disabled {
http.Error(w, "peer relay server is disabled", http.StatusServiceUnavailable)
return
} else if port == nil {
http.Error(w, "peer relay server port is not configured", http.StatusPreconditionFailed)
st, err := e.status()
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve peer relay server status: %v", err), http.StatusInternalServerError)
return
}
// h.Logf("peer relay server is available, running=%v shutdown=%v disabled=%v port=%v", running, shutdown, disabled, *port)
client := e.bus.Client("relayserver.debug-peer-relay-sessions")
defer client.Close()
debugReqPub := eventbus.Publish[PeerRelaySessionsReq](client)
debugRespSub := eventbus.Subscribe[PeerRelaySessionsResp](client)
debugReqPub.Publish(PeerRelaySessionsReq{})
// TODO (dylan): remove this message
// h.Logf("relayserver: waiting for run loop to publish peer relay sessions...")
resp := <-debugRespSub.Events()
// TODO (dylan): check resp.Error (or move it into PeerRelaySessions instead of leaving it in PeerRelaySessionsResp)
// TODO (dylan): what status to return if the peer relay server isn't running/configured?
j, err := json.Marshal(resp.Sessions)
j, err := json.Marshal(st)
if err != nil {
http.Error(w, fmt.Sprintf("failed to marshal json: %v", err), http.StatusInternalServerError)
return
@ -129,8 +100,8 @@ type PeerRelaySessionsReq struct{}
// TODO (dylan): doc comments
type PeerRelaySessionsResp struct {
Sessions []status.ServerSession
Error error
Status status.ServerStatus
Error error
}
// Name implements [ipnext.Extension].
@ -212,16 +183,18 @@ func (e *extension) consumeEventbusTopics(port int) {
// signal to return.
return
case <-debugReqSub.Events():
// TODO (dylan): This is where we want to send debug session info back to the CLI.
st := status.ServerStatus{
State: status.Uninitialized,
UDPPort: port,
Sessions: nil,
}
if rs == nil {
// TODO (dylan): should we initialize the server here too
// TODO (dylan): what is the pattern for sending error values back over the event bus?
// TODO (dylan): this isn't even an error condition, expected when nobody has tried to
// allocate an endpoint...rethink, maybe add a "Status string" field to PeerRelaySessionsResp?
resp := PeerRelaySessionsResp{Error: errors.New("no peer relay sessions: server has not been contacted yet")}
resp := PeerRelaySessionsResp{st, nil}
debugRespPub.Publish(resp)
continue
}
st.State = status.Running
sessions, err := rs.GetSessions()
if err != nil {
// TODO (dylan): should this be an errors.Join() instead with err?
@ -230,7 +203,8 @@ func (e *extension) consumeEventbusTopics(port int) {
debugRespPub.Publish(PeerRelaySessionsResp{Error: prs_err})
continue
}
debugRespPub.Publish(PeerRelaySessionsResp{sessions, nil})
st.Sessions = sessions
debugRespPub.Publish(PeerRelaySessionsResp{st, nil})
case req := <-reqSub.Events():
if rs == nil {
var err error
@ -282,3 +256,49 @@ func (e *extension) Shutdown() error {
e.shutdown = true
return nil
}
// TODO (dylan): doc comments.
func (e *extension) status() (status.ServerStatus, error) {
st := status.ServerStatus{
State: status.Uninitialized,
UDPPort: -1,
Sessions: nil,
}
e.mu.Lock()
running := e.busDoneCh != nil
shutdown := e.shutdown
port := e.port
disabled := e.hasNodeAttrDisableRelayServer
e.mu.Unlock()
if port == nil {
st.State = status.NotConfigured
return st, nil
}
st.UDPPort = *port
if disabled {
st.State = status.Disabled
return st, nil
}
if shutdown {
st.State = status.ShutDown
return st, nil
}
if !running {
// Leave state as Uninitialized.
return st, nil
}
client := e.bus.Client("relayserver.debug-peer-relay-sessions")
defer client.Close()
debugReqPub := eventbus.Publish[PeerRelaySessionsReq](client)
debugRespSub := eventbus.Subscribe[PeerRelaySessionsResp](client)
debugReqPub.Publish(PeerRelaySessionsReq{})
resp := <-debugRespSub.Events()
return resp.Status, resp.Error
}

View File

@ -237,6 +237,7 @@ func (e *serverEndpoint) handlePacket(from netip.AddrPort, gh packet.GeneveHeade
e.lastSeen[0] = time.Now()
to = e.boundAddrPorts[1]
e.status.ClientPacketsRx[0]++
e.status.ClientBytesRx[0] += uint64(len(b))
switch e.status.ClientPingStatus[0] {
case status.DiscoPingNotStarted:
e.status.ClientPingStatus[0] = status.DiscoPingSeen
@ -246,10 +247,12 @@ func (e *serverEndpoint) handlePacket(from netip.AddrPort, gh packet.GeneveHeade
break
}
e.status.ClientPacketsFwd[1]++
e.status.ClientBytesFwd[1] += uint64(len(b))
case from == e.boundAddrPorts[1]:
e.lastSeen[1] = time.Now()
to = e.boundAddrPorts[0]
e.status.ClientPacketsRx[1]++
e.status.ClientBytesRx[1] += uint64(len(b))
switch e.status.ClientPingStatus[1] {
case status.DiscoPingNotStarted:
e.status.ClientPingStatus[1] = status.DiscoPingSeen
@ -259,6 +262,7 @@ func (e *serverEndpoint) handlePacket(from netip.AddrPort, gh packet.GeneveHeade
break
}
e.status.ClientPacketsFwd[0]++
e.status.ClientBytesFwd[0] += uint64(len(b))
default:
// unrecognized source
return
@ -711,23 +715,24 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
// TODO (dylan): doc comments
func (s *Server) GetSessions() ([]status.ServerSession, error) {
extractClient := func(idx int, keys key.SortedPairOfDiscoPublic, ep *serverEndpoint) (netip.AddrPort, string) {
// TODO (dylan): assert idx == 0 || idx == 1
disco := keys.Get()[idx].ShortString()
if ep.boundAddrPorts[0].IsValid() {
return ep.boundAddrPorts[idx], disco
} else if ep.handshakeAddrPorts[idx].IsValid() {
return ep.handshakeAddrPorts[idx], disco
}
return netip.AddrPort{}, disco
}
// lc := &local.Client{}
// lc.Status()
var sessions = make([]status.ServerSession, 0)
for k, v := range s.byDisco {
var c1Ep, c2Ep netip.AddrPort
c1Disco := k.Get()[0].ShortString()
if v.boundAddrPorts[0].IsValid() {
c1Ep = v.boundAddrPorts[0]
} else if v.handshakeAddrPorts[0].IsValid() {
c1Ep = v.handshakeAddrPorts[0]
}
c2Disco := k.Get()[1].ShortString()
if v.boundAddrPorts[1].IsValid() {
c2Ep = v.boundAddrPorts[1]
} else if v.handshakeAddrPorts[1].IsValid() {
c2Ep = v.handshakeAddrPorts[1]
}
c1Ep, c1Disco := extractClient(0, k, v)
c2Ep, c2Disco := extractClient(1, k, v)
sessions = append(sessions, status.ServerSession{
// TODO (dylan): fix overall status
Status: v.status,

View File

@ -7,6 +7,41 @@ package status
import "net/netip"
// Initialized
// Running
// Shut down
// ServerState is the current state of the peer relay server extension.
type ServerState int
const (
// Uninitialized is a placeholder initial state of the peer relay server
// until we can determine its actual state. It can transition to
// [Disabled], [Running], or [ShutDown], but nothing can transition to
// [Uninitialized].
Uninitialized ServerState = iota
// TODO (dylan): doc-comment
NotConfigured
// Disabled indicates the peer relay server has been disabled by a node
// attribute pushed via C2N. It can transition to [Running] or [ShutDown].
Disabled
// Running indicates the peer relay server has been initialized and can
// relay sessions between peers on the configured UDP port. It can
// transition to [Disabled] or [ShutDown].
Running
// ShutDown indicates the peer relay server extension has been told to
// shut down, and can no longer relay sessions between peers. It cannot
// transition to any other state.
ShutDown
)
// TODO (dylan): doc comments
type ServerStatus struct {
State ServerState
UDPPort int
Sessions []ServerSession
}
// ServerSession contains status information for a single session between two
// peer relay clients relayed via a peer relay server. This is the status as
// seen by the peer relay server; each client node may have a different view of
@ -46,7 +81,9 @@ type SessionStatus struct {
ClientBindStatus [2]BindStatus
ClientPingStatus [2]PingStatus
ClientPacketsRx [2]uint64
ClientBytesRx [2]uint64
ClientPacketsFwd [2]uint64
ClientBytesFwd [2]uint64
OverallStatus OverallSessionStatus
}