net/udprelay: add tailscaled_peer_relay_endpoints gauge

New gauge reflects endpoints state via labels: open, semi-bound, bound.

Updates tailnet/corp#30820

Change-Id: Idb1baa90a38c97847e14f9b2390093262ad0ea23
Signed-off-by: Alex Valiushko <alexvaliushko@tailscale.com>
This commit is contained in:
Alex Valiushko
2025-12-19 14:28:56 -08:00
parent 90b4358113
commit 51c6d443ed
3 changed files with 96 additions and 19 deletions

View File

@@ -24,8 +24,10 @@ var (
cMetricForwarded66Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp6")
// [clientmetric.Gauge] does not let us embed existing counters,
// [metrics.addEndpoints] records data into client and user gauges independently.
cMetricEndpoints = clientmetric.NewGauge("udprelay_endpoints")
// [metrics.updateEndpoint] records data into client and user gauges independently.
cMetricEndpointsBound = clientmetric.NewGauge("udprelay_endpoints_bound")
cMetricEndpointsSemiBound = clientmetric.NewGauge("udprelay_endpoints_semi_bound")
cMetricEndpointsOpen = clientmetric.NewGauge("udprelay_endpoints_open")
)
type transport string
@@ -35,12 +37,15 @@ const (
transportUDP6 transport = "udp6"
)
type endpointTransition [2]endpointState
type forwardedLabel struct {
transportIn transport `prom:"transport_in"`
transportOut transport `prom:"transport_out"`
}
type endpointLabel struct {
state endpointState `prom:"state"`
}
type metrics struct {
@@ -54,7 +59,9 @@ type metrics struct {
forwarded64Bytes expvar.Int
forwarded66Bytes expvar.Int
endpoints expvar.Int
endpointsOpen expvar.Int
endpointsSemiBound expvar.Int
endpointsBound expvar.Int
}
// registerMetrics publishes user and client metric counters for peer relay server.
@@ -76,7 +83,7 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
)
uMetricEndpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel](
reg,
"tailscaled_peer_relay_endpoints_total",
"tailscaled_peer_relay_endpoints",
"gauge",
"Number of allocated Peer Relay endpoints",
)
@@ -98,7 +105,9 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
uMetricForwardedBytes.Set(forwarded64, &m.forwarded64Bytes)
uMetricForwardedBytes.Set(forwarded66, &m.forwarded66Bytes)
uMetricEndpoints.Set(endpointLabel{}, &m.endpoints)
uMetricEndpoints.Set(endpointLabel{endpointBound}, &m.endpointsBound)
uMetricEndpoints.Set(endpointLabel{endpointSemiBound}, &m.endpointsSemiBound)
uMetricEndpoints.Set(endpointLabel{endpointOpen}, &m.endpointsOpen)
// Publish client metrics.
cMetricForwarded44Packets.Register(&m.forwarded44Packets)
@@ -113,11 +122,42 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
return m
}
// addEndpoints updates the total endpoints gauge. Value can be negative.
// updateEndpoint updates the total endpoints gauge. Value can be negative.
// It records two gauges independently, see [cMetricEndpoints] doc.
func (m *metrics) addEndpoints(value int64) {
m.endpoints.Add(value)
cMetricEndpoints.Add(value)
func (m *metrics) updateEndpoint(tx endpointTransition) {
switch tx {
case endpointTransition{endpointClosed, endpointOpen}:
m.endpointsOpen.Add(1)
cMetricEndpointsOpen.Add(1)
case endpointTransition{endpointOpen, endpointSemiBound}:
m.endpointsOpen.Add(-1)
cMetricEndpointsOpen.Add(-1)
m.endpointsSemiBound.Add(1)
cMetricEndpointsSemiBound.Add(1)
case endpointTransition{endpointSemiBound, endpointBound}:
m.endpointsSemiBound.Add(-1)
cMetricEndpointsSemiBound.Add(-1)
m.endpointsBound.Add(1)
cMetricEndpointsBound.Add(1)
case endpointTransition{endpointOpen, endpointClosed}:
m.endpointsOpen.Add(-1)
cMetricEndpointsOpen.Add(-1)
case endpointTransition{endpointSemiBound, endpointClosed}:
m.endpointsSemiBound.Add(-1)
cMetricEndpointsSemiBound.Add(-1)
case endpointTransition{endpointBound, endpointClosed}:
m.endpointsBound.Add(-1)
cMetricEndpointsBound.Add(-1)
}
}
func (m *metrics) resetEndpoints() {
m.endpointsBound.Set(0)
cMetricEndpointsBound.Set(0)
m.endpointsSemiBound.Set(0)
cMetricEndpointsSemiBound.Set(0)
m.endpointsOpen.Set(0)
cMetricEndpointsOpen.Set(0)
}
// countForwarded records user and client metrics according to the
@@ -149,5 +189,7 @@ func deregisterMetrics() {
cMetricForwarded46Bytes.UnregisterAll()
cMetricForwarded64Bytes.UnregisterAll()
cMetricForwarded66Bytes.UnregisterAll()
cMetricEndpoints.Set(0)
cMetricEndpointsOpen.Set(0)
cMetricEndpointsSemiBound.Set(0)
cMetricEndpointsBound.Set(0)
}

View File

@@ -22,19 +22,19 @@ func TestMetrics(t *testing.T) {
want := []string{
"tailscaled_peer_relay_forwarded_packets_total",
"tailscaled_peer_relay_forwarded_bytes_total",
"tailscaled_peer_relay_endpoints_total",
"tailscaled_peer_relay_endpoints",
}
slices.Sort(have)
slices.Sort(want)
c.Assert(have, qt.CmpEquals(), want)
// Validate addEndpoints.
m.addEndpoints(1)
c.Assert(m.endpoints.Value(), qt.Equals, int64(1))
c.Assert(cMetricEndpoints.Value(), qt.Equals, int64(1))
m.addEndpoints(-1)
c.Assert(m.endpoints.Value(), qt.Equals, int64(0))
c.Assert(cMetricEndpoints.Value(), qt.Equals, int64(0))
m.updateEndpoint(endpointTransition{endpointClosed, endpointOpen})
c.Assert(m.endpointsOpen.Value(), qt.Equals, int64(1))
c.Assert(cMetricEndpointsOpen.Value(), qt.Equals, int64(1))
m.updateEndpoint(endpointTransition{endpointOpen, endpointClosed})
c.Assert(m.endpointsOpen.Value(), qt.Equals, int64(0))
c.Assert(cMetricEndpointsOpen.Value(), qt.Equals, int64(0))
// Validate countForwarded.
m.countForwarded(true, true, 1, 1)

View File

@@ -115,6 +115,7 @@ type serverEndpoint struct {
lamportID uint64
vni uint32
allocatedAt mono.Time
metrics *metrics
mu sync.Mutex // guards the following fields
inProgressGeneration [2]uint32 // or zero if a handshake has never started, or has just completed
@@ -146,6 +147,28 @@ func blakeMACFromBindMsg(blakeKey [blake2s.Size]byte, src netip.AddrPort, msg di
return out, nil
}
type endpointState string
const (
endpointClosed endpointState = "closed"
endpointOpen endpointState = "open"
endpointSemiBound endpointState = "semi_bound"
endpointBound endpointState = "bound"
)
func (e *serverEndpoint) stateLocked() endpointState {
switch {
case e == nil:
return endpointClosed
case e.isSemiBoundLocked():
return endpointSemiBound
case e.isBoundLocked():
return endpointBound
default:
return endpointOpen
}
}
func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex int, discoMsg disco.Message, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time) (write []byte, to netip.AddrPort) {
e.mu.Lock()
defer e.mu.Unlock()
@@ -224,9 +247,11 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex
// already authenticated via disco.
if bytes.Equal(mac[:], discoMsg.Challenge[:]) {
// Handshake complete. Update the binding for this sender.
oldState := e.stateLocked()
e.boundAddrPorts[senderIndex] = from
e.lastSeen[senderIndex] = now // record last seen as bound time
e.inProgressGeneration[senderIndex] = 0 // reset to zero, which indicates there is no in-progress handshake
e.metrics.updateEndpoint(endpointTransition{oldState, e.stateLocked()})
return nil, netip.AddrPort{}
}
}
@@ -318,6 +343,14 @@ func (e *serverEndpoint) isBoundLocked() bool {
e.boundAddrPorts[1].IsValid()
}
// isSemiBoundLocked returns true if either client has completed a 3-way handshake,
// otherwise false.
func (e *serverEndpoint) isSemiBoundLocked() bool {
a, b := e.boundAddrPorts[0], e.boundAddrPorts[1]
return a.IsValid() && !b.IsValid() ||
!a.IsValid() && b.IsValid()
}
// NewServer constructs a [Server] listening on port. If port is zero, then
// port selection is left up to the host networking stack. If
// onlyStaticAddrPorts is true, then dynamic addr:port discovery will be
@@ -653,6 +686,7 @@ func (s *Server) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
s.serverEndpointByVNI.Clear()
s.metrics.resetEndpoints()
clear(s.serverEndpointByDisco)
s.closed = true
s.bus.Close()
@@ -673,7 +707,7 @@ func (s *Server) endpointGCLoop() {
defer s.mu.Unlock()
for k, v := range s.serverEndpointByDisco {
if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) {
s.metrics.addEndpoints(-1)
s.metrics.updateEndpoint(endpointTransition{v.stateLocked(), endpointOpen})
delete(s.serverEndpointByDisco, k)
s.serverEndpointByVNI.Delete(v.vni)
}
@@ -961,6 +995,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
lamportID: s.lamportID,
allocatedAt: mono.Now(),
vni: vni,
metrics: s.metrics,
}
e.discoSharedSecrets[0] = s.disco.Shared(e.discoPubKeys.Get()[0])
e.discoSharedSecrets[1] = s.disco.Shared(e.discoPubKeys.Get()[1])
@@ -969,7 +1004,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
s.serverEndpointByVNI.Store(e.vni, e)
s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString())
s.metrics.addEndpoints(1)
s.metrics.updateEndpoint(endpointTransition{endpointClosed, endpointOpen})
return endpoint.ServerEndpoint{
ServerDisco: s.discoPublic,
ClientDisco: pair.Get(),