net/udprelay: expose peer relay metrics

Adding both user and client metrics for peer relay forwarded bytes and
packets, and the total endpoints gauge.

User metrics:
tailscaled_relay_forwarded_packets_total{trainsport_in, transport_out}
tailscaled_relay_forwarded_bytes_total{trainsport_in, transport_out}
tailscaled_relay_endpoints_total{}

Where the transport labels can be of "udp4" or "udp6".

Client metrics:
udprelay_forwarded_(packets|bytes)_udp(4|6)_udp(4|6)
udprelay_endpoints

RELNOTE: Expose tailscaled metrics for peer relay.

Updates tailscale/corp#30820

Change-Id: I1a905d15bdc5ee84e28017e0b93210e2d9660259
Signed-off-by: Alex Valiushko <alexvaliushko@tailscale.com>
This commit is contained in:
Alex Valiushko
2025-12-15 19:30:24 -08:00
parent 0fd1670a59
commit 8476aa8dd5
5 changed files with 291 additions and 4 deletions

View File

@@ -70,7 +70,7 @@ func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r *
func newExtension(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, error) {
e := &extension{
newServerFn: func(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (relayServer, error) {
return udprelay.NewServer(logf, port, onlyStaticAddrPorts)
return udprelay.NewServer(logf, port, onlyStaticAddrPorts, sb.Sys().UserMetricsRegistry())
},
logf: logger.WithPrefix(logf, featureName+": "),
}

130
net/udprelay/metrics.go Normal file
View File

@@ -0,0 +1,130 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package udprelay
import (
"expvar"
"net/netip"
"tailscale.com/util/clientmetric"
"tailscale.com/util/usermetric"
)
var (
metricForwarded44Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp4_udp4")
metricForwarded46Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp4_udp6")
metricForwarded64Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp6_udp4")
metricForwarded66Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp6_udp6")
metricForwarded44Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp4_udp4")
metricForwarded46Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp4_udp6")
metricForwarded64Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp6_udp4")
metricForwarded66Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp6_udp6")
metricEndpoints = clientmetric.NewGauge("udprelay_endpoints")
)
type transport string
const (
transportUDP4 transport = "udp4"
transportUDP6 transport = "udp6"
)
type forwardedLabel struct {
transportIn transport `prom:"transport_in"`
transportOut transport `prom:"transport_out"`
}
type endpointLabel struct {
}
type metrics struct {
forwarded44Packets expvar.Int
forwarded46Packets expvar.Int
forwarded64Packets expvar.Int
forwarded66Packets expvar.Int
forwarded44Bytes expvar.Int
forwarded46Bytes expvar.Int
forwarded64Bytes expvar.Int
forwarded66Bytes expvar.Int
endpoints expvar.Int
}
// registerMetrics publishes user metric counters for peer relay server.
func registerMetrics(reg *usermetric.Registry) *metrics {
var (
forwardedPackets = usermetric.NewMultiLabelMapWithRegistry[forwardedLabel](
reg,
"tailscaled_relay_forwarded_packets_total",
"counter",
"Counts the number of packets forwarded via Peer Relay",
)
forwardedBytes = usermetric.NewMultiLabelMapWithRegistry[forwardedLabel](
reg,
"tailscaled_relay_forwarded_bytes_total",
"counter",
"Counts the number of bytes forwarded via Peer Relay",
)
endpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel](
reg,
"tailscaled_relay_endpoints_total",
"gauge",
"Renders the current number of registered Peer Relay endpoints",
)
forwarded44 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP4}
forwarded46 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP6}
forwarded64 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP4}
forwarded66 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP6}
m = new(metrics)
)
// Publish user metrics.
forwardedPackets.Set(forwarded44, &m.forwarded44Packets)
forwardedPackets.Set(forwarded46, &m.forwarded46Packets)
forwardedPackets.Set(forwarded64, &m.forwarded64Packets)
forwardedPackets.Set(forwarded66, &m.forwarded66Packets)
forwardedBytes.Set(forwarded44, &m.forwarded44Bytes)
forwardedBytes.Set(forwarded46, &m.forwarded46Bytes)
forwardedBytes.Set(forwarded64, &m.forwarded64Bytes)
forwardedBytes.Set(forwarded66, &m.forwarded66Bytes)
endpoints.Set(endpointLabel{}, &m.endpoints)
return m
}
func (m *metrics) addEndpoints(value int) {
m.endpoints.Add(int64(value))
metricEndpoints.Add(int64(value))
}
func (m *metrics) countForwarded(from, to netip.Addr, b []byte) {
in4, out4 := from.Is4(), to.Is4()
bytes := int64(len(b))
if in4 && out4 {
m.forwarded44Packets.Add(1)
m.forwarded44Bytes.Add(bytes)
metricForwarded44Packets.Add(1)
metricForwarded44Bytes.Add(bytes)
} else if in4 && !out4 {
m.forwarded46Packets.Add(1)
m.forwarded46Bytes.Add(bytes)
metricForwarded46Packets.Add(1)
metricForwarded46Bytes.Add(bytes)
} else if !in4 && out4 {
m.forwarded64Packets.Add(1)
m.forwarded64Bytes.Add(bytes)
metricForwarded64Packets.Add(1)
metricForwarded64Bytes.Add(bytes)
} else {
m.forwarded66Packets.Add(1)
m.forwarded66Bytes.Add(bytes)
metricForwarded66Packets.Add(1)
metricForwarded66Bytes.Add(bytes)
}
}

View File

@@ -0,0 +1,140 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package udprelay
import (
"net/netip"
"slices"
"testing"
qt "github.com/frankban/quicktest"
"tailscale.com/util/usermetric"
)
func resetClientMetrics() {
// clientmetrics are global and must be reset between test cases
// for the assertMetricsMatch to work.
metricForwarded44Packets.Set(0)
metricForwarded46Packets.Set(0)
metricForwarded64Packets.Set(0)
metricForwarded66Packets.Set(0)
metricForwarded44Bytes.Set(0)
metricForwarded46Bytes.Set(0)
metricForwarded64Bytes.Set(0)
metricForwarded66Bytes.Set(0)
metricEndpoints.Set(0)
}
func assertMetricsMatch(t *testing.T, s *Server) {
t.Helper()
s.mu.Lock()
defer s.mu.Unlock()
c := qt.New(t)
var (
ps44, ps46, ps64, ps66 uint64
bs44, bs46, bs64, bs66 uint64
es = len(s.serverEndpointByDisco)
)
for _, e := range s.serverEndpointByDisco {
cs := e.extractClientInfo()
a, b := cs[0], cs[1]
a4, b4 := a.Endpoint.Addr().Is4(), b.Endpoint.Addr().Is4()
if a4 && b4 {
ps44 += b.PacketsTx
ps44 += a.PacketsTx
bs44 += b.BytesTx
bs44 += a.BytesTx
} else if a4 && !b4 {
ps46 += b.PacketsTx
ps64 += a.PacketsTx
bs46 += b.BytesTx
bs64 += a.BytesTx
} else if !a4 && b4 {
ps64 += b.PacketsTx
ps46 += a.PacketsTx
bs64 += b.BytesTx
bs46 += a.BytesTx
} else if !a4 && !b4 {
ps66 += b.PacketsTx
ps66 += a.PacketsTx
bs66 += b.BytesTx
bs66 += a.BytesTx
}
}
c.Assert(s.metrics.forwarded44Packets.Value(), qt.Equals, int64(ps44))
c.Assert(s.metrics.forwarded46Packets.Value(), qt.Equals, int64(ps46))
c.Assert(s.metrics.forwarded64Packets.Value(), qt.Equals, int64(ps64))
c.Assert(s.metrics.forwarded66Packets.Value(), qt.Equals, int64(ps66))
c.Assert(s.metrics.forwarded44Bytes.Value(), qt.Equals, int64(bs44))
c.Assert(s.metrics.forwarded46Bytes.Value(), qt.Equals, int64(bs46))
c.Assert(s.metrics.forwarded64Bytes.Value(), qt.Equals, int64(bs64))
c.Assert(s.metrics.forwarded66Bytes.Value(), qt.Equals, int64(bs66))
c.Assert(s.metrics.endpoints.Value(), qt.Equals, int64(es))
c.Assert(metricForwarded44Packets.Value(), qt.Equals, int64(ps44))
c.Assert(metricForwarded46Packets.Value(), qt.Equals, int64(ps46))
c.Assert(metricForwarded64Packets.Value(), qt.Equals, int64(ps64))
c.Assert(metricForwarded66Packets.Value(), qt.Equals, int64(ps66))
c.Assert(metricForwarded44Bytes.Value(), qt.Equals, int64(bs44))
c.Assert(metricForwarded46Bytes.Value(), qt.Equals, int64(bs46))
c.Assert(metricForwarded64Bytes.Value(), qt.Equals, int64(bs64))
c.Assert(metricForwarded66Bytes.Value(), qt.Equals, int64(bs66))
c.Assert(metricEndpoints.Value(), qt.Equals, int64(es))
}
func TestMetrics(t *testing.T) {
c := qt.New(t)
resetClientMetrics()
r := &usermetric.Registry{}
m := registerMetrics(r)
// Expect certain prom names registered.
have := r.MetricNames()
want := []string{
"tailscaled_relay_forwarded_packets_total",
"tailscaled_relay_forwarded_bytes_total",
"tailscaled_relay_endpoints_total",
}
slices.Sort(have)
slices.Sort(want)
c.Assert(have, qt.CmpEquals(), want)
// Validate addEndpoints.
m.addEndpoints(1)
c.Assert(m.endpoints.Value(), qt.Equals, int64(1))
c.Assert(metricEndpoints.Value(), qt.Equals, int64(1))
m.addEndpoints(-1)
c.Assert(m.endpoints.Value(), qt.Equals, int64(0))
c.Assert(metricEndpoints.Value(), qt.Equals, int64(0))
// Validate countForwarded.
var (
ip4 = netip.AddrFrom4([4]byte{1, 1, 1, 1})
ip6 = netip.AddrFrom16([16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})
)
m.countForwarded(ip4, ip4, []byte{1})
c.Assert(m.forwarded44Bytes.Value(), qt.Equals, int64(1))
c.Assert(m.forwarded44Packets.Value(), qt.Equals, int64(1))
c.Assert(metricForwarded44Bytes.Value(), qt.Equals, int64(1))
c.Assert(metricForwarded44Packets.Value(), qt.Equals, int64(1))
m.countForwarded(ip4, ip6, []byte{1, 2})
c.Assert(m.forwarded46Bytes.Value(), qt.Equals, int64(2))
c.Assert(m.forwarded46Packets.Value(), qt.Equals, int64(1))
c.Assert(metricForwarded46Bytes.Value(), qt.Equals, int64(2))
c.Assert(metricForwarded46Packets.Value(), qt.Equals, int64(1))
m.countForwarded(ip6, ip4, []byte{1, 2, 3})
c.Assert(m.forwarded64Bytes.Value(), qt.Equals, int64(3))
c.Assert(m.forwarded64Packets.Value(), qt.Equals, int64(1))
c.Assert(metricForwarded64Bytes.Value(), qt.Equals, int64(3))
c.Assert(metricForwarded64Packets.Value(), qt.Equals, int64(1))
m.countForwarded(ip6, ip6, []byte{1, 2, 3, 4})
c.Assert(m.forwarded66Bytes.Value(), qt.Equals, int64(4))
c.Assert(m.forwarded66Packets.Value(), qt.Equals, int64(1))
c.Assert(metricForwarded66Bytes.Value(), qt.Equals, int64(4))
c.Assert(metricForwarded66Packets.Value(), qt.Equals, int64(1))
}

View File

@@ -43,6 +43,7 @@ import (
"tailscale.com/types/views"
"tailscale.com/util/eventbus"
"tailscale.com/util/set"
"tailscale.com/util/usermetric"
)
const (
@@ -76,6 +77,7 @@ type Server struct {
wg sync.WaitGroup
closeCh chan struct{}
netChecker *netcheck.Client
metrics *metrics
mu sync.Mutex // guards the following fields
macSecrets views.Slice[[blake2s.Size]byte] // [0] is most recent, max 2 elements
@@ -321,7 +323,7 @@ func (e *serverEndpoint) isBoundLocked() bool {
// onlyStaticAddrPorts is true, then dynamic addr:port discovery will be
// disabled, and only addr:port's set via [Server.SetStaticAddrPorts] will be
// used.
func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Server, err error) {
func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool, metrics *usermetric.Registry) (s *Server, err error) {
s = &Server{
logf: logf,
disco: key.NewDisco(),
@@ -333,6 +335,7 @@ func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Serv
nextVNI: minVNI,
}
s.discoPublic = s.disco.Public()
s.metrics = registerMetrics(metrics)
// TODO(creachadair): Find a way to plumb this in during initialization.
// As-written, messages published here will not be seen by other components
@@ -670,6 +673,7 @@ func (s *Server) endpointGCLoop() {
defer s.mu.Unlock()
for k, v := range s.serverEndpointByDisco {
if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) {
s.metrics.addEndpoints(-1)
delete(s.serverEndpointByDisco, k)
s.serverEndpointByVNI.Delete(v.vni)
}
@@ -715,7 +719,11 @@ func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to n
secrets := s.getMACSecrets(now)
return e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now)
}
return e.(*serverEndpoint).handleDataPacket(from, b, now)
write, to = e.(*serverEndpoint).handleDataPacket(from, b, now)
if write != nil {
s.metrics.countForwarded(from.Addr(), to.Addr(), write)
}
return
}
func (s *Server) getMACSecrets(now mono.Time) views.Slice[[blake2s.Size]byte] {
@@ -932,6 +940,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
s.serverEndpointByVNI.Store(e.vni, e)
s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString())
s.metrics.addEndpoints(1)
return endpoint.ServerEndpoint{
ServerDisco: s.discoPublic,
ClientDisco: pair.Get(),

View File

@@ -21,6 +21,7 @@ import (
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/types/views"
"tailscale.com/util/usermetric"
)
type testClient struct {
@@ -209,7 +210,9 @@ func TestServer(t *testing.T) {
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
server, err := NewServer(t.Logf, 0, true)
reg := new(usermetric.Registry)
resetClientMetrics()
server, err := NewServer(t.Logf, 0, true, reg)
if err != nil {
t.Fatal(err)
}
@@ -234,6 +237,7 @@ func TestServer(t *testing.T) {
}
// We expect the same endpoint details pre-handshake.
assertMetricsMatch(t, server)
if diff := cmp.Diff(dupEndpoint, endpoint, cmpopts.EquateComparable(netip.AddrPort{}, key.DiscoPublic{})); diff != "" {
t.Fatalf("wrong dupEndpoint (-got +want)\n%s", diff)
}
@@ -285,6 +289,7 @@ func TestServer(t *testing.T) {
t.Fatal(err)
}
// We expect the same endpoint details post-handshake.
assertMetricsMatch(t, server)
if diff := cmp.Diff(dupEndpoint, endpoint, cmpopts.EquateComparable(netip.AddrPort{}, key.DiscoPublic{})); diff != "" {
t.Fatalf("wrong dupEndpoint (-got +want)\n%s", diff)
}
@@ -308,6 +313,7 @@ func TestServer(t *testing.T) {
defer tcAOnNewPort.close()
// Handshake client A on a new source IP:port, verify we can send packets on the new binding
assertMetricsMatch(t, server)
tcAOnNewPort.handshake(t)
fromAOnNewPort := []byte{7, 8, 9}
@@ -330,6 +336,8 @@ func TestServer(t *testing.T) {
if !bytes.Equal(fromBOnNewPort, rxFromB) {
t.Fatal("unexpected msg B->A")
}
assertMetricsMatch(t, server)
})
}
}