ipn/ipnlocal: add primary and approved routes metrics

WIP

Updates tailscale/corp#22075

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby 2024-08-29 14:47:27 +02:00
parent 209567e7a0
commit 7ed3082e53
No known key found for this signature in database
31 changed files with 860 additions and 208 deletions

View File

@ -677,15 +677,18 @@ var tstunNew = tstun.New
func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack bool, err error) { func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack bool, err error) {
conf := wgengine.Config{ conf := wgengine.Config{
ListenPort: args.port, ListenPort: args.port,
NetMon: sys.NetMon.Get(), NetMon: sys.NetMon.Get(),
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
Dialer: sys.Dialer.Get(), UserMetricsRegistry: sys.UserMetricsRegistry(),
SetSubsystem: sys.Set, Dialer: sys.Dialer.Get(),
ControlKnobs: sys.ControlKnobs(), SetSubsystem: sys.Set,
DriveForLocal: driveimpl.NewFileSystemForLocal(logf), ControlKnobs: sys.ControlKnobs(),
DriveForLocal: driveimpl.NewFileSystemForLocal(logf),
} }
sys.HealthTracker().SetMetricsRegistry(sys.UserMetricsRegistry())
onlyNetstack = name == "userspace-networking" onlyNetstack = name == "userspace-networking"
netstackSubnetRouter := onlyNetstack // but mutated later on some platforms netstackSubnetRouter := onlyNetstack // but mutated later on some platforms
netns.SetEnabled(!onlyNetstack) netns.SetEnabled(!onlyNetstack)

2
go.mod
View File

@ -320,7 +320,7 @@ require (
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e // indirect github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/polyfloyd/go-errorlint v1.4.1 // indirect github.com/polyfloyd/go-errorlint v1.4.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/client_model v0.5.0
github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect
github.com/quasilyte/go-ruleguard v0.3.19 // indirect github.com/quasilyte/go-ruleguard v0.3.19 // indirect
github.com/quasilyte/gogrep v0.5.0 // indirect github.com/quasilyte/gogrep v0.5.0 // indirect

View File

@ -20,6 +20,7 @@ import (
"time" "time"
"tailscale.com/envknob" "tailscale.com/envknob"
"tailscale.com/metrics"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/opt" "tailscale.com/types/opt"
"tailscale.com/util/cibuild" "tailscale.com/util/cibuild"
@ -111,6 +112,8 @@ type Tracker struct {
lastLoginErr error lastLoginErr error
localLogConfigErr error localLogConfigErr error
tlsConnectionErrors map[string]error // map[ServerName]error tlsConnectionErrors map[string]error // map[ServerName]error
metricHealthMessage *metrics.MultiLabelMap[metricHealthMessageLabel]
} }
// Subsystem is the name of a subsystem whose health can be monitored. // Subsystem is the name of a subsystem whose health can be monitored.
@ -317,6 +320,33 @@ func (w *Warnable) IsVisible(ws *warningState) bool {
return time.Since(ws.BrokenSince) >= w.TimeToVisible return time.Since(ws.BrokenSince) >= w.TimeToVisible
} }
// SetMetricsRegistry sets up the metrics for the Tracker. It takes
// a usermetric.Registry and registers the metrics there.
func (t *Tracker) SetMetricsRegistry(reg *usermetric.Registry) {
if reg == nil || t.metricHealthMessage != nil {
return
}
t.metricHealthMessage = usermetric.NewMultiLabelMap[metricHealthMessageLabel](
reg,
"tailscaled_health_messages",
"gauge",
"Number of health messages broken down by type.",
)
t.metricHealthMessage.Set(metricHealthMessageLabel{
Type: "warning",
}, expvar.Func(func() any {
if t.nil() {
return 0
}
t.mu.Lock()
defer t.mu.Unlock()
t.updateBuiltinWarnablesLocked()
return int64(len(t.stringsLocked()))
}))
}
// SetUnhealthy sets a warningState for the given Warnable with the provided Args, and should be // SetUnhealthy sets a warningState for the given Warnable with the provided Args, and should be
// called when a Warnable becomes unhealthy, or its unhealthy status needs to be updated. // called when a Warnable becomes unhealthy, or its unhealthy status needs to be updated.
// SetUnhealthy takes ownership of args. The args can be nil if no additional information is // SetUnhealthy takes ownership of args. The args can be nil if no additional information is
@ -1205,18 +1235,6 @@ func (t *Tracker) ReceiveFuncStats(which ReceiveFunc) *ReceiveFuncStats {
} }
func (t *Tracker) doOnceInit() { func (t *Tracker) doOnceInit() {
metricHealthMessage.Set(metricHealthMessageLabel{
Type: "warning",
}, expvar.Func(func() any {
if t.nil() {
return 0
}
t.mu.Lock()
defer t.mu.Unlock()
t.updateBuiltinWarnablesLocked()
return int64(len(t.stringsLocked()))
}))
for i := range t.MagicSockReceiveFuncs { for i := range t.MagicSockReceiveFuncs {
f := &t.MagicSockReceiveFuncs[i] f := &t.MagicSockReceiveFuncs[i]
f.name = (ReceiveFunc(i)).String() f.name = (ReceiveFunc(i)).String()
@ -1252,9 +1270,3 @@ type metricHealthMessageLabel struct {
// TODO: break down by warnable.severity as well? // TODO: break down by warnable.severity as well?
Type string Type string
} }
var metricHealthMessage = usermetric.NewMultiLabelMap[metricHealthMessageLabel](
"tailscaled_health_messages",
"gauge",
"Number of health messages broken down by type.",
)

View File

@ -118,9 +118,6 @@ import (
"tailscale.com/wgengine/wgcfg/nmcfg" "tailscale.com/wgengine/wgcfg/nmcfg"
) )
var metricAdvertisedRoutes = usermetric.NewGauge(
"tailscaled_advertised_routes", "Number of advertised network routes (e.g. by a subnet router)")
var controlDebugFlags = getControlDebugFlags() var controlDebugFlags = getControlDebugFlags()
func getControlDebugFlags() []string { func getControlDebugFlags() []string {
@ -183,6 +180,7 @@ type LocalBackend struct {
statsLogf logger.Logf // for printing peers stats on change statsLogf logger.Logf // for printing peers stats on change
sys *tsd.System sys *tsd.System
health *health.Tracker // always non-nil health *health.Tracker // always non-nil
metrics metrics
e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys
store ipn.StateStore // non-nil; TODO(bradfitz): remove; use sys store ipn.StateStore // non-nil; TODO(bradfitz): remove; use sys
dialer *tsdial.Dialer // non-nil; TODO(bradfitz): remove; use sys dialer *tsdial.Dialer // non-nil; TODO(bradfitz): remove; use sys
@ -376,6 +374,11 @@ func (b *LocalBackend) HealthTracker() *health.Tracker {
return b.health return b.health
} }
// UserMetricsRegistry returns the usermetrics registry for the backend
func (b *LocalBackend) UserMetricsRegistry() *usermetric.Registry {
return b.sys.UserMetricsRegistry()
}
// NetMon returns the network monitor for the backend. // NetMon returns the network monitor for the backend.
func (b *LocalBackend) NetMon() *netmon.Monitor { func (b *LocalBackend) NetMon() *netmon.Monitor {
return b.sys.NetMon.Get() return b.sys.NetMon.Get()
@ -385,6 +388,21 @@ type updateStatus struct {
started bool started bool
} }
type metrics struct {
// advertisedRoutes is a metric that counts the number of network routes that are advertised by the local node.
// This informs the user of how many routes are being advertised by the local node, excluding exit routes.
advertisedRoutes *usermetric.Gauge
// approvedRoutes is a metric that counts the number of network routes served by the local node and approved
// by the control server.
approvedRoutes *usermetric.Gauge
// primaryRoutes is a metric that counts the number of primary network routes served by the local node.
// A route being a primary route implies that the route is currently served by this node, and not by another
// subnet router in a high availability configuration.
primaryRoutes *usermetric.Gauge
}
// clientGen is a func that creates a control plane client. // clientGen is a func that creates a control plane client.
// It's the type used by LocalBackend.SetControlClientGetterForTesting. // It's the type used by LocalBackend.SetControlClientGetterForTesting.
type clientGen func(controlclient.Options) (controlclient.Client, error) type clientGen func(controlclient.Options) (controlclient.Client, error)
@ -428,6 +446,15 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCtx, captiveCancel := context.WithCancel(ctx) captiveCtx, captiveCancel := context.WithCancel(ctx)
captiveCancel() captiveCancel()
m := metrics{
advertisedRoutes: sys.UserMetricsRegistry().NewGauge(
"tailscaled_advertised_routes", "Number of advertised network routes (e.g. by a subnet router)"),
approvedRoutes: sys.UserMetricsRegistry().NewGauge(
"tailscaled_approved_routes", "Number of approved network routes (e.g. by a subnet router)"),
primaryRoutes: sys.UserMetricsRegistry().NewGauge(
"tailscaled_primary_routes", "Number of network routes for which this node is a primary router (in high availability configuration)"),
}
b := &LocalBackend{ b := &LocalBackend{
ctx: ctx, ctx: ctx,
ctxCancel: cancel, ctxCancel: cancel,
@ -436,6 +463,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
statsLogf: logger.LogOnChange(logf, 5*time.Minute, clock.Now), statsLogf: logger.LogOnChange(logf, 5*time.Minute, clock.Now),
sys: sys, sys: sys,
health: sys.HealthTracker(), health: sys.HealthTracker(),
metrics: m,
e: e, e: e,
dialer: dialer, dialer: dialer,
store: store, store: store,
@ -4685,14 +4713,7 @@ func (b *LocalBackend) applyPrefsToHostinfoLocked(hi *tailcfg.Hostinfo, prefs ip
hi.ShieldsUp = prefs.ShieldsUp() hi.ShieldsUp = prefs.ShieldsUp()
hi.AllowsUpdate = envknob.AllowsRemoteUpdate() || prefs.AutoUpdate().Apply.EqualBool(true) hi.AllowsUpdate = envknob.AllowsRemoteUpdate() || prefs.AutoUpdate().Apply.EqualBool(true)
// count routes without exit node routes b.metrics.advertisedRoutes.Set(float64(len(hi.RoutableIPs)))
var routes int64
for _, route := range hi.RoutableIPs {
if route.Bits() != 0 {
routes++
}
}
metricAdvertisedRoutes.Set(float64(routes))
var sshHostKeys []string var sshHostKeys []string
if prefs.RunSSH() && envknob.CanSSHD() { if prefs.RunSSH() && envknob.CanSSHD() {
@ -5317,6 +5338,11 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) {
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs()) b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
if nm == nil { if nm == nil {
b.nodeByAddr = nil b.nodeByAddr = nil
// If there is no netmap, the client is going into a "turned off"
// state so reset the metrics.
b.metrics.approvedRoutes.Set(0)
b.metrics.primaryRoutes.Set(0)
return return
} }
@ -5337,6 +5363,14 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) {
} }
if nm.SelfNode.Valid() { if nm.SelfNode.Valid() {
addNode(nm.SelfNode) addNode(nm.SelfNode)
var approved float64
for _, route := range nm.SelfNode.AllowedIPs().All() {
if !slices.Contains(nm.SelfNode.Addresses().AsSlice(), route) {
approved++
}
}
b.metrics.approvedRoutes.Set(approved)
b.metrics.primaryRoutes.Set(float64(nm.SelfNode.PrimaryRoutes().Len()))
} }
for _, p := range nm.Peers { for _, p := range nm.Peers {
addNode(p) addNode(p)

View File

@ -432,7 +432,7 @@ func newTestLocalBackend(t testing.TB) *LocalBackend {
sys := new(tsd.System) sys := new(tsd.System)
store := new(mem.Store) store := new(mem.Store)
sys.Set(store) sys.Set(store)
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }

View File

@ -50,7 +50,7 @@ func TestLocalLogLines(t *testing.T) {
sys := new(tsd.System) sys := new(tsd.System)
store := new(mem.Store) store := new(mem.Store)
sys.Set(store) sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -35,6 +35,7 @@ import (
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/util/must" "tailscale.com/util/must"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine" "tailscale.com/wgengine"
"tailscale.com/wgengine/filter" "tailscale.com/wgengine/filter"
) )
@ -643,7 +644,8 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) {
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
ht := new(health.Tracker) ht := new(health.Tracker)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
h.ps = &peerAPIServer{ h.ps = &peerAPIServer{
b: &LocalBackend{ b: &LocalBackend{
@ -694,7 +696,8 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) {
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
ht := new(health.Tracker) ht := new(health.Tracker)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector var a *appc.AppConnector
if shouldStore { if shouldStore {
@ -767,7 +770,8 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
rc := &appctest.RouteCollector{} rc := &appctest.RouteCollector{}
ht := new(health.Tracker) ht := new(health.Tracker)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector var a *appc.AppConnector
if shouldStore { if shouldStore {
@ -830,8 +834,9 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) {
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry)
rc := &appctest.RouteCollector{} rc := &appctest.RouteCollector{}
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector var a *appc.AppConnector
if shouldStore { if shouldStore {

View File

@ -682,8 +682,9 @@ func newTestBackend(t *testing.T) *LocalBackend {
sys := &tsd.System{} sys := &tsd.System{}
e, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ e, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
UserMetricsRegistry: sys.UserMetricsRegistry(),
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -298,7 +298,7 @@ func TestStateMachine(t *testing.T) {
sys := new(tsd.System) sys := new(tsd.System)
store := new(testStateStorage) store := new(testStateStorage)
sys.Set(store) sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }
@ -931,7 +931,7 @@ func TestEditPrefsHasNoKeys(t *testing.T) {
logf := tstest.WhileTestRunningLogger(t) logf := tstest.WhileTestRunningLogger(t)
sys := new(tsd.System) sys := new(tsd.System)
sys.Set(new(mem.Store)) sys.Set(new(mem.Store))
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }

View File

@ -62,7 +62,6 @@ import (
"tailscale.com/util/progresstracking" "tailscale.com/util/progresstracking"
"tailscale.com/util/rands" "tailscale.com/util/rands"
"tailscale.com/util/testenv" "tailscale.com/util/testenv"
"tailscale.com/util/usermetric"
"tailscale.com/version" "tailscale.com/version"
"tailscale.com/wgengine/magicsock" "tailscale.com/wgengine/magicsock"
) )
@ -578,7 +577,7 @@ func (h *Handler) serveUserMetrics(w http.ResponseWriter, r *http.Request) {
http.Error(w, "usermetrics debug flag not enabled", http.StatusForbidden) http.Error(w, "usermetrics debug flag not enabled", http.StatusForbidden)
return return
} }
usermetric.Handler(w, r) h.b.UserMetricsRegistry().Handler(w, r)
} }
func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) {

View File

@ -356,7 +356,7 @@ func newTestLocalBackend(t testing.TB) *ipnlocal.LocalBackend {
sys := new(tsd.System) sys := new(tsd.System)
store := new(mem.Store) store := new(mem.Store)
sys.Set(store) sys.Set(store)
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }

View File

@ -97,7 +97,12 @@ type KeyValue[T comparable] struct {
} }
func (v *MultiLabelMap[T]) String() string { func (v *MultiLabelMap[T]) String() string {
return `"MultiLabelMap"` var sb strings.Builder
sb.WriteString("MultiLabelMap:\n")
v.Do(func(kv KeyValue[T]) {
fmt.Fprintf(&sb, "\t%v: %v\n", kv.Key, kv.Value)
})
return sb.String()
} }
// WritePrometheus writes v to w in Prometheus exposition format. // WritePrometheus writes v to w in Prometheus exposition format.
@ -281,3 +286,16 @@ func (v *MultiLabelMap[T]) Do(f func(KeyValue[T])) {
f(KeyValue[T]{e.key, e.val}) f(KeyValue[T]{e.key, e.val})
} }
} }
// ResetAllForTest resets all values for metrics to zero.
// Should only be used in tests.
func (v *MultiLabelMap[T]) ResetAllForTest() {
v.Do(func(kv KeyValue[T]) {
switch v := kv.Value.(type) {
case *expvar.Int:
v.Set(0)
case *expvar.Float:
v.Set(0)
}
})
}

View File

@ -135,19 +135,19 @@ func (s *Statistics) updateVirtual(b []byte, receive bool) {
// The src is always a Tailscale IP address, representing some remote peer. // The src is always a Tailscale IP address, representing some remote peer.
// The dst is a remote IP address and port that corresponds // The dst is a remote IP address and port that corresponds
// with some physical peer backing the Tailscale IP address. // with some physical peer backing the Tailscale IP address.
func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, n int) { func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packets, n int) {
s.updatePhysical(src, dst, n, false) s.updatePhysical(src, dst, packets, n, false)
} }
// UpdateRxPhysical updates the counters for a received wireguard packet. // UpdateRxPhysical updates the counters for a received wireguard packet.
// The src is always a Tailscale IP address, representing some remote peer. // The src is always a Tailscale IP address, representing some remote peer.
// The dst is a remote IP address and port that corresponds // The dst is a remote IP address and port that corresponds
// with some physical peer backing the Tailscale IP address. // with some physical peer backing the Tailscale IP address.
func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, n int) { func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, packets, n int) {
s.updatePhysical(src, dst, n, true) s.updatePhysical(src, dst, packets, n, true)
} }
func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, n int, receive bool) { func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, packets, n int, receive bool) {
conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst} conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst}
s.mu.Lock() s.mu.Lock()
@ -157,10 +157,10 @@ func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, n int, r
return return
} }
if receive { if receive {
cnts.RxPackets++ cnts.RxPackets += uint64(packets)
cnts.RxBytes += uint64(n) cnts.RxBytes += uint64(n)
} else { } else {
cnts.TxPackets++ cnts.TxPackets += uint64(packets)
cnts.TxBytes += uint64(n) cnts.TxBytes += uint64(n)
} }
s.physical[conn] = cnts s.physical[conn] = cnts

View File

@ -24,6 +24,7 @@ import (
"go4.org/mem" "go4.org/mem"
"gvisor.dev/gvisor/pkg/tcpip/stack" "gvisor.dev/gvisor/pkg/tcpip/stack"
"tailscale.com/disco" "tailscale.com/disco"
"tailscale.com/metrics"
"tailscale.com/net/connstats" "tailscale.com/net/connstats"
"tailscale.com/net/packet" "tailscale.com/net/packet"
"tailscale.com/net/packet/checksum" "tailscale.com/net/packet/checksum"
@ -209,6 +210,30 @@ type Wrapper struct {
stats atomic.Pointer[connstats.Statistics] stats atomic.Pointer[connstats.Statistics]
captureHook syncs.AtomicValue[capture.Callback] captureHook syncs.AtomicValue[capture.Callback]
metric *metricWrapper
}
type metricWrapper struct {
inboundDroppedPacketsTotal *metrics.MultiLabelMap[dropPacketLabel]
outboundDroppedPacketsTotal *metrics.MultiLabelMap[dropPacketLabel]
}
func registerMetrics(reg *usermetric.Registry) *metricWrapper {
return &metricWrapper{
inboundDroppedPacketsTotal: usermetric.NewMultiLabelMap[dropPacketLabel](
reg,
"tailscaled_inbound_dropped_packets_total",
"counter",
"Counts the number of dropped packets received by the node from other peers",
),
outboundDroppedPacketsTotal: usermetric.NewMultiLabelMap[dropPacketLabel](
reg,
"tailscaled_outbound_dropped_packets_total",
"counter",
"Counts the number of packets dropped while being sent to other peers",
),
}
} }
// tunInjectedRead is an injected packet pretending to be a tun.Read(). // tunInjectedRead is an injected packet pretending to be a tun.Read().
@ -248,15 +273,15 @@ func (w *Wrapper) Start() {
close(w.startCh) close(w.startCh)
} }
func WrapTAP(logf logger.Logf, tdev tun.Device) *Wrapper { func WrapTAP(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper {
return wrap(logf, tdev, true) return wrap(logf, tdev, true, m)
} }
func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper { func Wrap(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper {
return wrap(logf, tdev, false) return wrap(logf, tdev, false, m)
} }
func wrap(logf logger.Logf, tdev tun.Device, isTAP bool) *Wrapper { func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry) *Wrapper {
logf = logger.WithPrefix(logf, "tstun: ") logf = logger.WithPrefix(logf, "tstun: ")
w := &Wrapper{ w := &Wrapper{
logf: logf, logf: logf,
@ -274,6 +299,7 @@ func wrap(logf logger.Logf, tdev tun.Device, isTAP bool) *Wrapper {
// TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets. // TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets.
filterFlags: filter.LogAccepts | filter.LogDrops, filterFlags: filter.LogAccepts | filter.LogDrops,
startCh: make(chan struct{}), startCh: make(chan struct{}),
metric: registerMetrics(m),
} }
w.vectorBuffer = make([][]byte, tdev.BatchSize()) w.vectorBuffer = make([][]byte, tdev.BatchSize())
@ -872,7 +898,7 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf
if filt.RunOut(p, t.filterFlags) != filter.Accept { if filt.RunOut(p, t.filterFlags) != filter.Accept {
metricPacketOutDropFilter.Add(1) metricPacketOutDropFilter.Add(1)
metricOutboundDroppedPacketsTotal.Add(dropPacketLabel{ t.metric.outboundDroppedPacketsTotal.Add(dropPacketLabel{
Reason: DropReasonACL, Reason: DropReasonACL,
}, 1) }, 1)
return filter.Drop, gro return filter.Drop, gro
@ -1144,7 +1170,7 @@ func (t *Wrapper) filterPacketInboundFromWireGuard(p *packet.Parsed, captHook ca
if outcome != filter.Accept { if outcome != filter.Accept {
metricPacketInDropFilter.Add(1) metricPacketInDropFilter.Add(1)
metricInboundDroppedPacketsTotal.Add(dropPacketLabel{ t.metric.inboundDroppedPacketsTotal.Add(dropPacketLabel{
Reason: DropReasonACL, Reason: DropReasonACL,
}, 1) }, 1)
@ -1225,7 +1251,7 @@ func (t *Wrapper) Write(buffs [][]byte, offset int) (int, error) {
t.noteActivity() t.noteActivity()
_, err := t.tdevWrite(buffs, offset) _, err := t.tdevWrite(buffs, offset)
if err != nil { if err != nil {
metricInboundDroppedPacketsTotal.Add(dropPacketLabel{ t.metric.inboundDroppedPacketsTotal.Add(dropPacketLabel{
Reason: DropReasonError, Reason: DropReasonError,
}, int64(len(buffs))) }, int64(len(buffs)))
} }
@ -1482,19 +1508,6 @@ type dropPacketLabel struct {
Reason DropReason Reason DropReason
} }
var (
metricInboundDroppedPacketsTotal = usermetric.NewMultiLabelMap[dropPacketLabel](
"tailscaled_inbound_dropped_packets_total",
"counter",
"Counts the number of dropped packets received by the node from other peers",
)
metricOutboundDroppedPacketsTotal = usermetric.NewMultiLabelMap[dropPacketLabel](
"tailscaled_outbound_dropped_packets_total",
"counter",
"Counts the number of packets dropped while being sent to other peers",
)
)
func (t *Wrapper) InstallCaptureHook(cb capture.Callback) { func (t *Wrapper) InstallCaptureHook(cb capture.Callback) {
t.captureHook.Store(cb) t.captureHook.Store(cb)
} }

View File

@ -38,6 +38,7 @@ import (
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
"tailscale.com/types/views" "tailscale.com/types/views"
"tailscale.com/util/must" "tailscale.com/util/must"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine/capture" "tailscale.com/wgengine/capture"
"tailscale.com/wgengine/filter" "tailscale.com/wgengine/filter"
"tailscale.com/wgengine/wgcfg" "tailscale.com/wgengine/wgcfg"
@ -173,7 +174,8 @@ func setfilter(logf logger.Logf, tun *Wrapper) {
func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper) { func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper) {
chtun := tuntest.NewChannelTUN() chtun := tuntest.NewChannelTUN()
tun := Wrap(logf, chtun.TUN()) reg := new(usermetric.Registry)
tun := Wrap(logf, chtun.TUN(), reg)
if secure { if secure {
setfilter(logf, tun) setfilter(logf, tun)
} else { } else {
@ -185,7 +187,8 @@ func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper
func newFakeTUN(logf logger.Logf, secure bool) (*fakeTUN, *Wrapper) { func newFakeTUN(logf logger.Logf, secure bool) (*fakeTUN, *Wrapper) {
ftun := NewFake() ftun := NewFake()
tun := Wrap(logf, ftun) reg := new(usermetric.Registry)
tun := Wrap(logf, ftun, reg)
if secure { if secure {
setfilter(logf, tun) setfilter(logf, tun)
} else { } else {
@ -315,12 +318,6 @@ func mustHexDecode(s string) []byte {
} }
func TestFilter(t *testing.T) { func TestFilter(t *testing.T) {
// Reset the metrics before test. These are global
// so the different tests might have affected them.
metricInboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonACL}, 0)
metricInboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonError}, 0)
metricOutboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonACL}, 0)
chtun, tun := newChannelTUN(t.Logf, true) chtun, tun := newChannelTUN(t.Logf, true)
defer tun.Close() defer tun.Close()
@ -435,22 +432,6 @@ func TestFilter(t *testing.T) {
} }
}) })
} }
inACL := metricInboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL})
inError := metricInboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonError})
outACL := metricOutboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL})
assertMetricPackets(t, "inACL", "3", inACL.String())
assertMetricPackets(t, "inError", "0", inError.String())
assertMetricPackets(t, "outACL", "1", outACL.String())
}
func assertMetricPackets(t *testing.T, metricName, want, got string) {
t.Helper()
if want != got {
t.Errorf("%s got unexpected value, got %s, want %s", metricName, got, want)
}
} }
func TestAllocs(t *testing.T) { func TestAllocs(t *testing.T) {
@ -512,6 +493,7 @@ func TestAtomic64Alignment(t *testing.T) {
} }
func TestPeerAPIBypass(t *testing.T) { func TestPeerAPIBypass(t *testing.T) {
reg := new(usermetric.Registry)
wrapperWithPeerAPI := &Wrapper{ wrapperWithPeerAPI := &Wrapper{
PeerAPIPort: func(ip netip.Addr) (port uint16, ok bool) { PeerAPIPort: func(ip netip.Addr) (port uint16, ok bool) {
if ip == netip.MustParseAddr("100.64.1.2") { if ip == netip.MustParseAddr("100.64.1.2") {
@ -519,6 +501,7 @@ func TestPeerAPIBypass(t *testing.T) {
} }
return return
}, },
metric: registerMetrics(reg),
} }
tests := []struct { tests := []struct {
@ -534,13 +517,16 @@ func TestPeerAPIBypass(t *testing.T) {
PeerAPIPort: func(netip.Addr) (port uint16, ok bool) { PeerAPIPort: func(netip.Addr) (port uint16, ok bool) {
return 60000, true return 60000, true
}, },
metric: registerMetrics(reg),
}, },
pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000), pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000),
want: filter.Drop, want: filter.Drop,
}, },
{ {
name: "reject_with_filter", name: "reject_with_filter",
w: &Wrapper{}, w: &Wrapper{
metric: registerMetrics(reg),
},
filter: filter.NewAllowNone(logger.Discard, new(netipx.IPSet)), filter: filter.NewAllowNone(logger.Discard, new(netipx.IPSet)),
pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000), pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000),
want: filter.Drop, want: filter.Drop,

View File

@ -826,7 +826,7 @@ func TestSSHAuthFlow(t *testing.T) {
func TestSSH(t *testing.T) { func TestSSH(t *testing.T) {
var logf logger.Logf = t.Logf var logf logger.Logf = t.Logf
sys := &tsd.System{} sys := &tsd.System{}
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -32,6 +32,7 @@ import (
"tailscale.com/net/tstun" "tailscale.com/net/tstun"
"tailscale.com/proxymap" "tailscale.com/proxymap"
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine" "tailscale.com/wgengine"
"tailscale.com/wgengine/magicsock" "tailscale.com/wgengine/magicsock"
"tailscale.com/wgengine/router" "tailscale.com/wgengine/router"
@ -65,7 +66,8 @@ type System struct {
controlKnobs controlknobs.Knobs controlKnobs controlknobs.Knobs
proxyMap proxymap.Mapper proxyMap proxymap.Mapper
healthTracker health.Tracker userMetricsRegistry usermetric.Registry
healthTracker health.Tracker
} }
// NetstackImpl is the interface that *netstack.Impl implements. // NetstackImpl is the interface that *netstack.Impl implements.
@ -142,6 +144,11 @@ func (s *System) HealthTracker() *health.Tracker {
return &s.healthTracker return &s.healthTracker
} }
// UserMetricsRegistry returns the system usermetrics.
func (s *System) UserMetricsRegistry() *usermetric.Registry {
return &s.userMetricsRegistry
}
// SubSystem represents some subsystem of the Tailscale node daemon. // SubSystem represents some subsystem of the Tailscale node daemon.
// //
// A subsystem can be set to a value, and then later retrieved. A subsystem // A subsystem can be set to a value, and then later retrieved. A subsystem

View File

@ -532,18 +532,20 @@ func (s *Server) start() (reterr error) {
s.dialer = &tsdial.Dialer{Logf: tsLogf} // mutated below (before used) s.dialer = &tsdial.Dialer{Logf: tsLogf} // mutated below (before used)
eng, err := wgengine.NewUserspaceEngine(tsLogf, wgengine.Config{ eng, err := wgengine.NewUserspaceEngine(tsLogf, wgengine.Config{
ListenPort: s.Port, ListenPort: s.Port,
NetMon: s.netMon, NetMon: s.netMon,
Dialer: s.dialer, Dialer: s.dialer,
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
ControlKnobs: sys.ControlKnobs(), ControlKnobs: sys.ControlKnobs(),
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
UserMetricsRegistry: sys.UserMetricsRegistry(),
}) })
if err != nil { if err != nil {
return err return err
} }
closePool.add(s.dialer) closePool.add(s.dialer)
sys.Set(eng) sys.Set(eng)
sys.HealthTracker().SetMetricsRegistry(sys.UserMetricsRegistry())
// TODO(oxtoacart): do we need to support Taildrive on tsnet, and if so, how? // TODO(oxtoacart): do we need to support Taildrive on tsnet, and if so, how?
ns, err := netstack.Create(tsLogf, sys.Tun.Get(), eng, sys.MagicSock.Get(), s.dialer, sys.DNSManager.Get(), sys.ProxyMapper(), nil) ns, err := netstack.Create(tsLogf, sys.Tun.Get(), eng, sys.MagicSock.Get(), s.dialer, sys.DNSManager.Get(), sys.ProxyMapper(), nil)

View File

@ -5,6 +5,7 @@ package tsnet
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"crypto/elliptic" "crypto/elliptic"
@ -31,8 +32,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
"tailscale.com/client/tailscale"
"tailscale.com/cmd/testwrapper/flakytest" "tailscale.com/cmd/testwrapper/flakytest"
"tailscale.com/health" "tailscale.com/health"
"tailscale.com/ipn" "tailscale.com/ipn"
@ -757,6 +760,10 @@ func TestUDPConn(t *testing.T) {
s1, s1ip, _ := startServer(t, ctx, controlURL, "s1") s1, s1ip, _ := startServer(t, ctx, controlURL, "s1")
s2, s2ip, _ := startServer(t, ctx, controlURL, "s2") s2, s2ip, _ := startServer(t, ctx, controlURL, "s2")
assertUDPConn(t, ctx, s1, s2, s1ip, s2ip)
}
func assertUDPConn(t *testing.T, ctx context.Context, s1, s2 *Server, s1ip, s2ip netip.Addr) {
lc2, err := s2.LocalClient() lc2, err := s2.LocalClient()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -818,65 +825,373 @@ func TestUDPConn(t *testing.T) {
} }
} }
// sendData sends a given amount of bytes from s1 to s2.
func sendData(logf func(format string, args ...any), ctx context.Context, bytesCount int, s1, s2 *Server, s1ip, s2ip netip.Addr) error {
l := must.Get(s1.Listen("tcp", fmt.Sprintf("%s:8081", s1ip)))
defer l.Close()
// Dial to s1 from s2
w, err := s2.Dial(ctx, "tcp", fmt.Sprintf("%s:8081", s1ip))
if err != nil {
return err
}
defer w.Close()
stopReceive := make(chan struct{})
defer close(stopReceive)
allReceived := make(chan error)
defer close(allReceived)
go func() {
conn, err := l.Accept()
if err != nil {
allReceived <- err
return
}
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
total := 0
recvStart := time.Now()
for {
got := make([]byte, bytesCount)
n, err := conn.Read(got)
if n != bytesCount {
logf("read %d bytes, want %d", n, bytesCount)
}
select {
case <-stopReceive:
return
default:
}
if err != nil {
allReceived <- fmt.Errorf("failed reading packet, %s", err)
return
}
total += n
logf("received %d/%d bytes, %.2f %%", total, bytesCount, (float64(total) / (float64(bytesCount)) * 100))
if total == bytesCount {
break
}
}
logf("all received, took: %s", time.Since(recvStart).String())
allReceived <- nil
}()
sendStart := time.Now()
w.SetWriteDeadline(time.Now().Add(30 * time.Second))
if _, err := w.Write(bytes.Repeat([]byte("A"), bytesCount)); err != nil {
stopReceive <- struct{}{}
return err
}
logf("all sent (%s), waiting for all packets (%d) to be received", time.Since(sendStart).String(), bytesCount)
err, _ = <-allReceived
if err != nil {
return err
}
return nil
}
// testWarnable is a Warnable that is used within this package for testing purposes only.
var testWarnable = health.Register(&health.Warnable{
Code: "test-warnable-tsnet",
Title: "Test warnable",
Severity: health.SeverityLow,
Text: func(args health.Args) string {
return args[health.ArgError]
},
})
func parseMetrics(m []byte) (map[string]float64, error) {
metrics := make(map[string]float64)
var parser expfmt.TextParser
mf, err := parser.TextToMetricFamilies(bytes.NewReader(m))
if err != nil {
return nil, err
}
for _, f := range mf {
for _, ff := range f.Metric {
val := float64(0)
switch f.GetType() {
case dto.MetricType_COUNTER:
val = ff.GetCounter().GetValue()
case dto.MetricType_GAUGE:
val = ff.GetGauge().GetValue()
}
metrics[f.GetName()+","+promMetricLabelsStr(ff.GetLabel())] = val
}
}
return metrics, nil
}
func promMetricLabelsStr(labels []*dto.LabelPair) string {
var b strings.Builder
for _, l := range labels {
b.WriteString(fmt.Sprintf("%s=%s,", l.GetName(), l.GetValue()))
}
return b.String()
}
// TestUserMetrics tests the user-facing metrics exposed by tailscaled.
func TestUserMetrics(t *testing.T) { func TestUserMetrics(t *testing.T) {
flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/13420")
tstest.ResourceCheck(t) tstest.ResourceCheck(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
// testWarnable is a Warnable that is used within this package for testing purposes only.
var testWarnable = health.Register(&health.Warnable{
Code: "test-warnable-tsnet",
Title: "Test warnable",
Severity: health.SeverityLow,
Text: func(args health.Args) string {
return args[health.ArgError]
},
})
controlURL, c := startControl(t) controlURL, c := startControl(t)
s1, _, s1PubKey := startServer(t, ctx, controlURL, "s1") s1, s1ip, s1PubKey := startServer(t, ctx, controlURL, "s1")
s2, s2ip, _ := startServer(t, ctx, controlURL, "s2")
s1.lb.EditPrefs(&ipn.MaskedPrefs{ s1.lb.EditPrefs(&ipn.MaskedPrefs{
Prefs: ipn.Prefs{ Prefs: ipn.Prefs{
AdvertiseRoutes: []netip.Prefix{ AdvertiseRoutes: []netip.Prefix{
netip.MustParsePrefix("192.0.2.0/24"), netip.MustParsePrefix("192.0.2.0/24"),
netip.MustParsePrefix("192.0.3.0/24"), netip.MustParsePrefix("192.0.3.0/24"),
netip.MustParsePrefix("192.0.5.1/32"),
netip.MustParsePrefix("0.0.0.0/0"),
}, },
}, },
AdvertiseRoutesSet: true, AdvertiseRoutesSet: true,
}) })
c.SetSubnetRoutes(s1PubKey, []netip.Prefix{netip.MustParsePrefix("192.0.2.0/24")}) c.SetSubnetRoutes(s1PubKey, []netip.Prefix{
netip.MustParsePrefix("192.0.2.0/24"),
netip.MustParsePrefix("192.0.5.1/32"),
netip.MustParsePrefix("0.0.0.0/0"),
})
lc1, err := s1.LocalClient() lc1, err := s1.LocalClient()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
lc2, err := s2.LocalClient()
if err != nil {
t.Fatal(err)
}
// ping to make sure the connection is up.
res, err := lc2.Ping(ctx, s1ip, tailcfg.PingICMP)
if err != nil {
t.Fatalf("pinging: %s", err)
}
t.Logf("ping success: %#+v", res)
ht := s1.lb.HealthTracker() ht := s1.lb.HealthTracker()
ht.SetUnhealthy(testWarnable, health.Args{"Text": "Hello world 1"}) ht.SetUnhealthy(testWarnable, health.Args{"Text": "Hello world 1"})
// Force an update to the netmap to ensure that the metrics are up-to-date.
s1.lb.DebugForceNetmapUpdate()
s2.lb.DebugForceNetmapUpdate()
mustDirect(t, t.Logf, lc1, lc2)
// Wait for the routes to be propagated to node 1 to ensure
// that the metrics are up-to-date.
waitForCondition(t, 30*time.Second, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status1, err := lc1.Status(ctx)
if err != nil {
t.Logf("getting status: %s", err)
return false
}
return status1.Self.PrimaryRoutes != nil && status1.Self.PrimaryRoutes.Len() == 3
})
// 50 megabytes 50 * 1024 kilobyte packets
bytesToSend := 50 * 1024 * 1024
// This asserts generates some traffic, it is factored out
// of TestUDPConn.
start := time.Now()
err = sendData(t.Logf, ctx, bytesToSend, s1, s2, s1ip, s2ip)
if err != nil {
t.Fatalf("Failed to send packets: %v", err)
}
t.Logf("Sent %d bytes from s1 to s2 in %s", bytesToSend, time.Since(start).String())
time.Sleep(5 * time.Second)
metrics1, err := lc1.UserMetrics(ctx) metrics1, err := lc1.UserMetrics(ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Note that this test will check for two warnings because the health status1, err := lc1.Status(ctx)
// tracker will have two warnings: one from the testWarnable, added in if err != nil {
// this test, and one because we are running the dev/unstable version t.Fatal(err)
// of tailscale.
want := `# TYPE tailscaled_advertised_routes gauge
# HELP tailscaled_advertised_routes Number of advertised network routes (e.g. by a subnet router)
tailscaled_advertised_routes 2
# TYPE tailscaled_health_messages gauge
# HELP tailscaled_health_messages Number of health messages broken down by type.
tailscaled_health_messages{type="warning"} 2
# TYPE tailscaled_inbound_dropped_packets_total counter
# HELP tailscaled_inbound_dropped_packets_total Counts the number of dropped packets received by the node from other peers
# TYPE tailscaled_outbound_dropped_packets_total counter
# HELP tailscaled_outbound_dropped_packets_total Counts the number of packets dropped while being sent to other peers
`
if diff := cmp.Diff(want, string(metrics1)); diff != "" {
t.Fatalf("unexpected metrics (-want +got):\n%s", diff)
} }
parsedMetrics1, err := parseMetrics(metrics1)
if err != nil {
t.Fatal(err)
}
t.Logf("Metrics1:\n%s\n", metrics1)
// The node is advertising 4 routes:
// - 192.0.2.0/24
// - 192.0.3.0/24
// - 192.0.5.1/32
// - 0.0.0.0/0
if got, want := parsedMetrics1["tailscaled_advertised_routes,"], 4.0; got != want {
t.Errorf("metrics1, tailscaled_advertised_routes: got %v, want %v", got, want)
}
// The control has approved 3 routes:
// - 192.0.2.0/24
// - 192.0.5.1/32
// - 0.0.0.0/0
if got, want := parsedMetrics1["tailscaled_approved_routes,"], 3.0; got != want {
t.Errorf("metrics1, tailscaled_approved_routes: got %v, want %v", got, want)
}
// Validate the health counter metric against the status of the node
if got, want := parsedMetrics1["tailscaled_health_messages,type=warning,"], float64(len(status1.Health)); got != want {
t.Errorf("metrics1, tailscaled_health_messages: got %v, want %v", got, want)
}
// The node is the primary subnet router for 3 routes:
// - 192.0.2.0/24
// - 192.0.5.1/32
// - 0.0.0.0/0
if got, want := parsedMetrics1["tailscaled_primary_routes,"], 3.0; got != want {
t.Errorf("metrics1, tailscaled_primary_routes: got %v, want %v", got, want)
}
// Verify that the amount of data recorded in bytes is higher than the
// 50 megabytes sent.
inboundBytes1 := parsedMetrics1["tailscaled_inbound_bytes_total,path=direct_ipv4,"]
if inboundBytes1 < float64(bytesToSend) {
t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", bytesToSend, inboundBytes1)
}
// But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room.
if inboundBytes1 > float64(bytesToSend)*1.2 {
t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(bytesToSend)*1.2, inboundBytes1)
}
// // Verify that the packet count recorded is higher than the
// // 50 000 1KB packages we sent.
// inboundPackets1 := parsedMetrics1["tailscaled_inbound_packets_total,path=direct_ipv4,"]
// if inboundPackets1 < float64(packets) {
// t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", packets, inboundPackets1)
// }
// // But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room.
// if inboundPackets1 > float64(packets)*1.2 {
// t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(packets)*1.1, inboundPackets1)
// }
metrics2, err := lc2.UserMetrics(ctx)
if err != nil {
t.Fatal(err)
}
status2, err := lc2.Status(ctx)
if err != nil {
t.Fatal(err)
}
parsedMetrics2, err := parseMetrics(metrics2)
if err != nil {
t.Fatal(err)
}
t.Logf("Metrics2:\n%s\n", metrics2)
// The node is advertising 0 routes
if got, want := parsedMetrics2["tailscaled_advertised_routes,"], 0.0; got != want {
t.Errorf("metrics2, tailscaled_advertised_routes: got %v, want %v", got, want)
}
// The control has approved 0 routes
if got, want := parsedMetrics2["tailscaled_approved_routes,"], 0.0; got != want {
t.Errorf("metrics2, tailscaled_approved_routes: got %v, want %v", got, want)
}
// Validate the health counter metric against the status of the node
if got, want := parsedMetrics2["tailscaled_health_messages,type=warning,"], float64(len(status2.Health)); got != want {
t.Errorf("metrics2, tailscaled_health_messages: got %v, want %v", got, want)
}
// The node is the primary subnet router for 0 routes
if got, want := parsedMetrics2["tailscaled_primary_routes,"], 0.0; got != want {
t.Errorf("metrics2, tailscaled_primary_routes: got %v, want %v", got, want)
}
// Verify that the amount of data recorded in bytes is higher than the
// 50 megabytes sent.
outboundBytes2 := parsedMetrics2["tailscaled_outbound_bytes_total,path=direct_ipv4,"]
if outboundBytes2 < float64(bytesToSend) {
t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", bytesToSend, outboundBytes2)
}
// But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room.
if outboundBytes2 > float64(bytesToSend)*1.2 {
t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(bytesToSend)*1.2, outboundBytes2)
}
// // Verify that the packet count recorded is higher than the
// // 50 000 1KB packages we sent.
// outboundPackets2 := parsedMetrics2["tailscaled_outbound_packets_total,path=direct_ipv4,"]
// if outboundPackets2 < float64(packets) {
// t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", packets, outboundPackets2)
// }
// // But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room.
// if outboundPackets2 > float64(packets)*1.2 {
// t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(packets)*1.1, outboundPackets2)
// }
}
func waitForCondition(t *testing.T, waitTime time.Duration, f func() bool) {
t.Helper()
for deadline := time.Now().Add(waitTime); time.Now().Before(deadline); time.Sleep(1 * time.Second) {
if f() {
return
}
}
t.Fatalf("waiting for condition")
}
// mustDirect ensures there is a direct connection between LocalClient 1 and 2
func mustDirect(t *testing.T, logf logger.Logf, lc1, lc2 *tailscale.LocalClient) {
t.Helper()
lastLog := time.Now().Add(-time.Minute)
// See https://github.com/tailscale/tailscale/issues/654
// and https://github.com/tailscale/tailscale/issues/3247 for discussions of this deadline.
for deadline := time.Now().Add(30 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status1, err := lc1.Status(ctx)
if err != nil {
continue
}
status2, err := lc2.Status(ctx)
if err != nil {
continue
}
pst := status1.Peer[status2.Self.PublicKey]
if pst.CurAddr != "" {
logf("direct link %s->%s found with addr %s", status1.Self.HostName, status2.Self.HostName, pst.CurAddr)
return
}
if now := time.Now(); now.Sub(lastLog) > time.Second {
logf("no direct path %s->%s yet, addrs %v", status1.Self.HostName, status2.Self.HostName, pst.Addrs)
lastLog = now
}
}
t.Error("magicsock did not find a direct path from lc1 to lc2")
} }

View File

@ -1018,6 +1018,7 @@ func (s *Server) MapResponse(req *tailcfg.MapRequest) (res *tailcfg.MapResponse,
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
res.Node.PrimaryRoutes = s.nodeSubnetRoutes[nk]
res.Node.AllowedIPs = append(res.Node.Addresses, s.nodeSubnetRoutes[nk]...) res.Node.AllowedIPs = append(res.Node.Addresses, s.nodeSubnetRoutes[nk]...)
// Consume a PingRequest while protected by mutex if it exists // Consume a PingRequest while protected by mutex if it exists

View File

@ -10,12 +10,16 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"strings"
"tailscale.com/metrics" "tailscale.com/metrics"
"tailscale.com/tsweb/varz" "tailscale.com/tsweb/varz"
) )
var vars expvar.Map // Registry tracks user-facing metrics of various Tailscale subsystems.
type Registry struct {
vars expvar.Map
}
// NewMultiLabelMap creates and register a new // NewMultiLabelMap creates and register a new
// MultiLabelMap[T] variable with the given name and returns it. // MultiLabelMap[T] variable with the given name and returns it.
@ -24,15 +28,18 @@ var vars expvar.Map
// Note that usermetric are not protected against duplicate // Note that usermetric are not protected against duplicate
// metrics name. It is the caller's responsibility to ensure that // metrics name. It is the caller's responsibility to ensure that
// the name is unique. // the name is unique.
func NewMultiLabelMap[T comparable](name string, promType, helpText string) *metrics.MultiLabelMap[T] { func NewMultiLabelMap[T comparable](m *Registry, name string, promType, helpText string) *metrics.MultiLabelMap[T] {
m := &metrics.MultiLabelMap[T]{ if m == nil {
return nil
}
ml := &metrics.MultiLabelMap[T]{
Type: promType, Type: promType,
Help: helpText, Help: helpText,
} }
var zero T var zero T
_ = metrics.LabelString(zero) // panic early if T is invalid _ = metrics.LabelString(zero) // panic early if T is invalid
vars.Set(name, m) m.vars.Set(name, ml)
return m return ml
} }
// Gauge is a gauge metric with no labels. // Gauge is a gauge metric with no labels.
@ -42,20 +49,29 @@ type Gauge struct {
} }
// NewGauge creates and register a new gauge metric with the given name and help text. // NewGauge creates and register a new gauge metric with the given name and help text.
func NewGauge(name, help string) *Gauge { func (m *Registry) NewGauge(name, help string) *Gauge {
if m == nil {
return nil
}
g := &Gauge{&expvar.Float{}, help} g := &Gauge{&expvar.Float{}, help}
vars.Set(name, g) m.vars.Set(name, g)
return g return g
} }
// Set sets the gauge to the given value. // Set sets the gauge to the given value.
func (g *Gauge) Set(v float64) { func (g *Gauge) Set(v float64) {
if g == nil {
return
}
g.m.Set(v) g.m.Set(v)
} }
// String returns the string of the underlying expvar.Float. // String returns the string of the underlying expvar.Float.
// This satisfies the expvar.Var interface. // This satisfies the expvar.Var interface.
func (g *Gauge) String() string { func (g *Gauge) String() string {
if g == nil {
return ""
}
return g.m.String() return g.m.String()
} }
@ -79,6 +95,15 @@ func (g *Gauge) WritePrometheus(w io.Writer, name string) {
// Handler returns a varz.Handler that serves the userfacing expvar contained // Handler returns a varz.Handler that serves the userfacing expvar contained
// in this package. // in this package.
func Handler(w http.ResponseWriter, r *http.Request) { func (m *Registry) Handler(w http.ResponseWriter, r *http.Request) {
varz.ExpvarDoHandler(vars.Do)(w, r) varz.ExpvarDoHandler(m.vars.Do)(w, r)
}
func (m *Registry) String() string {
var sb strings.Builder
m.vars.Do(func(kv expvar.KeyValue) {
fmt.Fprintf(&sb, "%s: %v\n", kv.Key, kv.Value)
})
return sb.String()
} }

View File

@ -9,7 +9,8 @@ import (
) )
func TestGauge(t *testing.T) { func TestGauge(t *testing.T) {
g := NewGauge("test_gauge", "This is a test gauge") var reg Registry
g := reg.NewGauge("test_gauge", "This is a test gauge")
g.Set(15) g.Set(15)
var buf bytes.Buffer var buf bytes.Buffer

View File

@ -690,7 +690,9 @@ func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint)
// No data read occurred. Wait for another packet. // No data read occurred. Wait for another packet.
continue continue
} }
metricRecvDataDERP.Add(1) metricRecvDataPacketsDERP.Add(1)
c.metricInboundPacketsTotal.Add(pathLabel{Path: PathDERP}, 1)
c.metricInboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(n))
sizes[0] = n sizes[0] = n
eps[0] = ep eps[0] = ep
return 1, nil return 1, nil
@ -728,7 +730,7 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en
ep.noteRecvActivity(ipp, mono.Now()) ep.noteRecvActivity(ipp, mono.Now())
if stats := c.stats.Load(); stats != nil { if stats := c.stats.Load(); stats != nil {
stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n) stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, dm.n)
} }
return n, ep return n, ep
} }

View File

@ -950,6 +950,8 @@ func (de *endpoint) send(buffs [][]byte) error {
return errNoUDPOrDERP return errNoUDPOrDERP
} }
var err error var err error
// TODO(kradalby): for paring, why is this not an if-else? Do we send to
// both DERP and UDP at the same time if we have both?
if udpAddr.IsValid() { if udpAddr.IsValid() {
_, err = de.c.sendUDPBatch(udpAddr, buffs) _, err = de.c.sendUDPBatch(udpAddr, buffs)
@ -960,13 +962,23 @@ func (de *endpoint) send(buffs [][]byte) error {
de.noteBadEndpoint(udpAddr) de.noteBadEndpoint(udpAddr)
} }
var txBytes int
for _, b := range buffs {
txBytes += len(b)
}
switch {
case udpAddr.Addr().Is4():
de.c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(len(buffs)))
de.c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(txBytes))
case udpAddr.Addr().Is6():
de.c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(len(buffs)))
de.c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(txBytes))
}
// TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends. // TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends.
if stats := de.c.stats.Load(); err == nil && stats != nil { if stats := de.c.stats.Load(); err == nil && stats != nil {
var txBytes int stats.UpdateTxPhysical(de.nodeAddr, udpAddr, len(buffs), txBytes)
for _, b := range buffs {
txBytes += len(b)
}
stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes)
} }
} }
if derpAddr.IsValid() { if derpAddr.IsValid() {
@ -974,8 +986,11 @@ func (de *endpoint) send(buffs [][]byte) error {
for _, buff := range buffs { for _, buff := range buffs {
ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff) ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff)
if stats := de.c.stats.Load(); stats != nil { if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buff)) stats.UpdateTxPhysical(de.nodeAddr, derpAddr, 1, len(buff))
} }
// TODO(kradalby): Is this the correct place for this? Do we need an Error version?
de.c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDERP}, 1)
de.c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(len(buff)))
if !ok { if !ok {
allOk = false allOk = false
} }

View File

@ -33,6 +33,7 @@ import (
"tailscale.com/health" "tailscale.com/health"
"tailscale.com/hostinfo" "tailscale.com/hostinfo"
"tailscale.com/ipn/ipnstate" "tailscale.com/ipn/ipnstate"
"tailscale.com/metrics"
"tailscale.com/net/connstats" "tailscale.com/net/connstats"
"tailscale.com/net/netcheck" "tailscale.com/net/netcheck"
"tailscale.com/net/neterror" "tailscale.com/net/neterror"
@ -60,6 +61,7 @@ import (
"tailscale.com/util/set" "tailscale.com/util/set"
"tailscale.com/util/testenv" "tailscale.com/util/testenv"
"tailscale.com/util/uniq" "tailscale.com/util/uniq"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine/capture" "tailscale.com/wgengine/capture"
"tailscale.com/wgengine/wgint" "tailscale.com/wgengine/wgint"
) )
@ -320,6 +322,11 @@ type Conn struct {
// responsibility to ensure that traffic from these endpoints is routed // responsibility to ensure that traffic from these endpoints is routed
// to the node. // to the node.
staticEndpoints views.Slice[netip.AddrPort] staticEndpoints views.Slice[netip.AddrPort]
metricInboundPacketsTotal *metrics.MultiLabelMap[pathLabel]
metricOutboundPacketsTotal *metrics.MultiLabelMap[pathLabel]
metricInboundBytesTotal *metrics.MultiLabelMap[pathLabel]
metricOutboundBytesTotal *metrics.MultiLabelMap[pathLabel]
} }
// SetDebugLoggingEnabled controls whether spammy debug logging is enabled. // SetDebugLoggingEnabled controls whether spammy debug logging is enabled.
@ -386,6 +393,9 @@ type Options struct {
// report errors and warnings to. // report errors and warnings to.
HealthTracker *health.Tracker HealthTracker *health.Tracker
// UserMetricsRegistry specifies the metrics registry to record metrics to.
UserMetricsRegistry *usermetric.Registry
// ControlKnobs are the set of control knobs to use. // ControlKnobs are the set of control knobs to use.
// If nil, they're ignored and not updated. // If nil, they're ignored and not updated.
ControlKnobs *controlknobs.Knobs ControlKnobs *controlknobs.Knobs
@ -466,6 +476,10 @@ func NewConn(opts Options) (*Conn, error) {
return nil, errors.New("magicsock.Options.NetMon must be non-nil") return nil, errors.New("magicsock.Options.NetMon must be non-nil")
} }
if opts.UserMetricsRegistry == nil {
return nil, errors.New("magicsock.Options.UserMetrics must be non-nil")
}
c := newConn(opts.logf()) c := newConn(opts.logf())
c.port.Store(uint32(opts.Port)) c.port.Store(uint32(opts.Port))
c.controlKnobs = opts.ControlKnobs c.controlKnobs = opts.ControlKnobs
@ -505,6 +519,32 @@ func NewConn(opts Options) (*Conn, error) {
UseDNSCache: true, UseDNSCache: true,
} }
// TODO(kradalby): factor out to a func
c.metricInboundBytesTotal = usermetric.NewMultiLabelMap[pathLabel](
opts.UserMetricsRegistry,
"tailscaled_inbound_bytes_total",
"counter",
"Counts the number of bytes received from other peers",
)
c.metricInboundPacketsTotal = usermetric.NewMultiLabelMap[pathLabel](
opts.UserMetricsRegistry,
"tailscaled_inbound_packets_total",
"counter",
"Counts the number of packets received from other peers",
)
c.metricOutboundBytesTotal = usermetric.NewMultiLabelMap[pathLabel](
opts.UserMetricsRegistry,
"tailscaled_outbound_bytes_total",
"counter",
"Counts the number of bytes sent to other peers",
)
c.metricOutboundPacketsTotal = usermetric.NewMultiLabelMap[pathLabel](
opts.UserMetricsRegistry,
"tailscaled_outbound_packets_total",
"counter",
"Counts the number of packets sent to other peers",
)
if d4, err := c.listenRawDisco("ip4"); err == nil { if d4, err := c.listenRawDisco("ip4"); err == nil {
c.logf("[v1] using BPF disco receiver for IPv4") c.logf("[v1] using BPF disco receiver for IPv4")
c.closeDisco4 = d4 c.closeDisco4 = d4
@ -1145,6 +1185,25 @@ func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte) (sent bool, err error) {
} else { } else {
if sent { if sent {
metricSendUDP.Add(1) metricSendUDP.Add(1)
// TODO(kradalby): Do we need error variants of these?
switch {
case ipp.Addr().Is4():
c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, 1)
c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(len(b)))
case ipp.Addr().Is6():
c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, 1)
c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(len(b)))
}
if stats := c.stats.Load(); stats != nil {
c.mu.Lock()
ep, ok := c.peerMap.endpointForIPPort(ipp)
c.mu.Unlock()
if ok {
stats.UpdateTxPhysical(ep.nodeAddr, ipp, 1, len(b))
}
}
} }
} }
return return
@ -1266,17 +1325,29 @@ func (c *Conn) putReceiveBatch(batch *receiveBatch) {
// receiveIPv4 creates an IPv4 ReceiveFunc reading from c.pconn4. // receiveIPv4 creates an IPv4 ReceiveFunc reading from c.pconn4.
func (c *Conn) receiveIPv4() conn.ReceiveFunc { func (c *Conn) receiveIPv4() conn.ReceiveFunc {
return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), metricRecvDataIPv4) return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4),
func(i int64) {
metricRecvDataPacketsIPv4.Add(i)
c.metricInboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, i)
}, func(i int64) {
c.metricInboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, i)
})
} }
// receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6. // receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6.
func (c *Conn) receiveIPv6() conn.ReceiveFunc { func (c *Conn) receiveIPv6() conn.ReceiveFunc {
return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), metricRecvDataIPv6) return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6),
func(i int64) {
metricRecvDataPacketsIPv6.Add(i)
c.metricInboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, i)
}, func(i int64) {
c.metricInboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, i)
})
} }
// mkReceiveFunc creates a ReceiveFunc reading from ruc. // mkReceiveFunc creates a ReceiveFunc reading from ruc.
// The provided healthItem and metric are updated if non-nil. // The provided healthItem and metrics are updated if non-nil.
func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, metric *clientmetric.Metric) conn.ReceiveFunc { func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, packetMetricFunc, bytesMetricFunc func(int64)) conn.ReceiveFunc {
// epCache caches an IPPort->endpoint for hot flows. // epCache caches an IPPort->endpoint for hot flows.
var epCache ippEndpointCache var epCache ippEndpointCache
@ -1313,8 +1384,11 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu
} }
ipp := msg.Addr.(*net.UDPAddr).AddrPort() ipp := msg.Addr.(*net.UDPAddr).AddrPort()
if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok { if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok {
if metric != nil { if packetMetricFunc != nil {
metric.Add(1) packetMetricFunc(1)
}
if bytesMetricFunc != nil {
bytesMetricFunc(int64(msg.N))
} }
eps[i] = ep eps[i] = ep
sizes[i] = msg.N sizes[i] = msg.N
@ -1370,7 +1444,7 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache)
ep.lastRecvUDPAny.StoreAtomic(now) ep.lastRecvUDPAny.StoreAtomic(now)
ep.noteRecvActivity(ipp, now) ep.noteRecvActivity(ipp, now)
if stats := c.stats.Load(); stats != nil { if stats := c.stats.Load(); stats != nil {
stats.UpdateRxPhysical(ep.nodeAddr, ipp, len(b)) stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, len(b))
} }
return ep, true return ep, true
} }
@ -2924,9 +2998,9 @@ var (
// Data packets (non-disco) // Data packets (non-disco)
metricSendData = clientmetric.NewCounter("magicsock_send_data") metricSendData = clientmetric.NewCounter("magicsock_send_data")
metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down") metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down")
metricRecvDataDERP = clientmetric.NewCounter("magicsock_recv_data_derp") metricRecvDataPacketsDERP = clientmetric.NewCounter("magicsock_recv_data_derp")
metricRecvDataIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4") metricRecvDataPacketsIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4")
metricRecvDataIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6") metricRecvDataPacketsIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6")
// Disco packets // Disco packets
metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp") metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp")
@ -3064,3 +3138,19 @@ func (le *lazyEndpoint) GetPeerEndpoint(peerPublicKey [32]byte) conn.Endpoint {
le.c.logf("magicsock: lazyEndpoint.GetPeerEndpoint(%v) found: %v", pubKey.ShortString(), ep.nodeAddr) le.c.logf("magicsock: lazyEndpoint.GetPeerEndpoint(%v) found: %v", pubKey.ShortString(), ep.nodeAddr)
return ep return ep
} }
type Path string
const (
PathDirectIPv4 Path = "direct_ipv4"
PathDirectIPv6 Path = "direct_ipv6"
PathDERP Path = "derp"
)
type pathLabel struct {
// Path indicates the path that the packet took:
// - direct_ipv4
// - direct_ipv6
// - derp
Path Path
}

View File

@ -10,6 +10,7 @@ import (
"crypto/tls" "crypto/tls"
"encoding/binary" "encoding/binary"
"errors" "errors"
"expvar"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
@ -28,6 +29,7 @@ import (
"time" "time"
"unsafe" "unsafe"
"github.com/google/go-cmp/cmp"
wgconn "github.com/tailscale/wireguard-go/conn" wgconn "github.com/tailscale/wireguard-go/conn"
"github.com/tailscale/wireguard-go/device" "github.com/tailscale/wireguard-go/device"
"github.com/tailscale/wireguard-go/tun/tuntest" "github.com/tailscale/wireguard-go/tun/tuntest"
@ -64,6 +66,7 @@ import (
"tailscale.com/util/cibuild" "tailscale.com/util/cibuild"
"tailscale.com/util/racebuild" "tailscale.com/util/racebuild"
"tailscale.com/util/set" "tailscale.com/util/set"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine/filter" "tailscale.com/wgengine/filter"
"tailscale.com/wgengine/wgcfg" "tailscale.com/wgengine/wgcfg"
"tailscale.com/wgengine/wgcfg/nmcfg" "tailscale.com/wgengine/wgcfg/nmcfg"
@ -156,6 +159,7 @@ type magicStack struct {
dev *device.Device // the wireguard-go Device that connects the previous things dev *device.Device // the wireguard-go Device that connects the previous things
wgLogger *wglog.Logger // wireguard-go log wrapper wgLogger *wglog.Logger // wireguard-go log wrapper
netMon *netmon.Monitor // always non-nil netMon *netmon.Monitor // always non-nil
metrics *usermetric.Registry
} }
// newMagicStack builds and initializes an idle magicsock and // newMagicStack builds and initializes an idle magicsock and
@ -174,6 +178,8 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
t.Fatalf("netmon.New: %v", err) t.Fatalf("netmon.New: %v", err)
} }
var reg usermetric.Registry
epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary
conn, err := NewConn(Options{ conn, err := NewConn(Options{
NetMon: netMon, NetMon: netMon,
@ -183,6 +189,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
EndpointsFunc: func(eps []tailcfg.Endpoint) { EndpointsFunc: func(eps []tailcfg.Endpoint) {
epCh <- eps epCh <- eps
}, },
UserMetricsRegistry: &reg,
}) })
if err != nil { if err != nil {
t.Fatalf("constructing magicsock: %v", err) t.Fatalf("constructing magicsock: %v", err)
@ -193,7 +200,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
} }
tun := tuntest.NewChannelTUN() tun := tuntest.NewChannelTUN()
tsTun := tstun.Wrap(logf, tun.TUN()) tsTun := tstun.Wrap(logf, tun.TUN(), nil)
tsTun.SetFilter(filter.NewAllowAllForTest(logf)) tsTun.SetFilter(filter.NewAllowAllForTest(logf))
tsTun.Start() tsTun.Start()
@ -219,6 +226,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
dev: dev, dev: dev,
wgLogger: wgLogger, wgLogger: wgLogger,
netMon: netMon, netMon: netMon,
metrics: &reg,
} }
} }
@ -392,11 +400,12 @@ func TestNewConn(t *testing.T) {
port := pickPort(t) port := pickPort(t)
conn, err := NewConn(Options{ conn, err := NewConn(Options{
Port: port, Port: port,
DisablePortMapper: true, DisablePortMapper: true,
EndpointsFunc: epFunc, EndpointsFunc: epFunc,
Logf: t.Logf, Logf: t.Logf,
NetMon: netMon, NetMon: netMon,
UserMetricsRegistry: new(usermetric.Registry),
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -519,10 +528,12 @@ func TestDeviceStartStop(t *testing.T) {
} }
defer netMon.Close() defer netMon.Close()
reg := new(usermetric.Registry)
conn, err := NewConn(Options{ conn, err := NewConn(Options{
EndpointsFunc: func(eps []tailcfg.Endpoint) {}, EndpointsFunc: func(eps []tailcfg.Endpoint) {},
Logf: t.Logf, Logf: t.Logf,
NetMon: netMon, NetMon: netMon,
UserMetricsRegistry: reg,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1181,6 +1192,100 @@ func testTwoDevicePing(t *testing.T, d *devices) {
checkStats(t, m1, m1Conns) checkStats(t, m1, m1Conns)
checkStats(t, m2, m2Conns) checkStats(t, m2, m2Conns)
}) })
t.Run("compare-metrics-stats", func(t *testing.T) {
setT(t)
defer setT(outerT)
m1.conn.resetMetricsForTest()
m1.stats.TestExtract()
m2.conn.resetMetricsForTest()
m2.stats.TestExtract()
t.Logf("Metrics before: %s\n", m1.metrics.String())
ping1(t)
ping2(t)
assertConnStatsAndUserMetricsEqual(t, m1)
assertConnStatsAndUserMetricsEqual(t, m2)
t.Logf("Metrics after: %s\n", m1.metrics.String())
})
}
func (c *Conn) resetMetricsForTest() {
c.metricInboundBytesTotal.ResetAllForTest()
c.metricInboundPacketsTotal.ResetAllForTest()
c.metricOutboundBytesTotal.ResetAllForTest()
c.metricOutboundPacketsTotal.ResetAllForTest()
}
func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) {
_, phys := ms.stats.TestExtract()
physIPv4RxBytes := int64(0)
physIPv4TxBytes := int64(0)
physDERPRxBytes := int64(0)
physDERPTxBytes := int64(0)
physIPv4RxPackets := int64(0)
physIPv4TxPackets := int64(0)
physDERPRxPackets := int64(0)
physDERPTxPackets := int64(0)
for conn, count := range phys {
t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String())
if conn.Dst.String() == "127.3.3.40:1" {
physDERPRxBytes += int64(count.RxBytes)
physDERPTxBytes += int64(count.TxBytes)
physDERPRxPackets += int64(count.RxPackets)
physDERPTxPackets += int64(count.TxPackets)
} else {
physIPv4RxBytes += int64(count.RxBytes)
physIPv4TxBytes += int64(count.TxBytes)
physIPv4RxPackets += int64(count.RxPackets)
physIPv4TxPackets += int64(count.TxPackets)
}
}
var metricIPv4RxBytes, metricIPv4TxBytes, metricDERPRxBytes, metricDERPTxBytes int64
var metricIPv4RxPackets, metricIPv4TxPackets, metricDERPRxPackets, metricDERPTxPackets int64
if m, ok := ms.conn.metricInboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4RxBytes = m.Value()
}
if m, ok := ms.conn.metricOutboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4TxBytes = m.Value()
}
if m, ok := ms.conn.metricInboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPRxBytes = m.Value()
}
if m, ok := ms.conn.metricOutboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPTxBytes = m.Value()
}
if m, ok := ms.conn.metricInboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4RxPackets = m.Value()
}
if m, ok := ms.conn.metricOutboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4TxPackets = m.Value()
}
if m, ok := ms.conn.metricInboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPRxPackets = m.Value()
}
if m, ok := ms.conn.metricOutboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPTxPackets = m.Value()
}
assertEqual(t, "derp bytes inbound", physDERPRxBytes, metricDERPRxBytes)
assertEqual(t, "derp bytes outbound", physDERPTxBytes, metricDERPTxBytes)
assertEqual(t, "ipv4 bytes inbound", physIPv4RxBytes, metricIPv4RxBytes)
assertEqual(t, "ipv4 bytes outbound", physIPv4TxBytes, metricIPv4TxBytes)
assertEqual(t, "derp packets inbound", physDERPRxPackets, metricDERPRxPackets)
assertEqual(t, "derp packets outbound", physDERPTxPackets, metricDERPTxPackets)
assertEqual(t, "ipv4 packets inbound", physIPv4RxPackets, metricIPv4RxPackets)
assertEqual(t, "ipv4 packets outbound", physIPv4TxPackets, metricIPv4TxPackets)
}
func assertEqual(t *testing.T, name string, a, b any) {
t.Helper()
t.Logf("assertEqual %s: %v == %v", name, a, b)
if diff := cmp.Diff(a, b); diff != "" {
t.Errorf("%s mismatch (-want +got):\n%s", name, diff)
}
} }
func TestDiscoMessage(t *testing.T) { func TestDiscoMessage(t *testing.T) {
@ -1275,6 +1380,7 @@ func newTestConn(t testing.TB) *Conn {
conn, err := NewConn(Options{ conn, err := NewConn(Options{
NetMon: netMon, NetMon: netMon,
HealthTracker: new(health.Tracker), HealthTracker: new(health.Tracker),
UserMetricsRegistry: new(usermetric.Registry),
DisablePortMapper: true, DisablePortMapper: true,
Logf: t.Logf, Logf: t.Logf,
Port: port, Port: port,

View File

@ -46,10 +46,11 @@ func TestInjectInboundLeak(t *testing.T) {
} }
sys := new(tsd.System) sys := new(tsd.System)
eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{
Tun: tunDev, Tun: tunDev,
Dialer: dialer, Dialer: dialer,
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
UserMetricsRegistry: sys.UserMetricsRegistry(),
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -103,10 +104,11 @@ func makeNetstack(tb testing.TB, config func(*Impl)) *Impl {
dialer := new(tsdial.Dialer) dialer := new(tsdial.Dialer)
logf := tstest.WhileTestRunningLogger(tb) logf := tstest.WhileTestRunningLogger(tb)
eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{
Tun: tunDev, Tun: tunDev,
Dialer: dialer, Dialer: dialer,
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
UserMetricsRegistry: sys.UserMetricsRegistry(),
}) })
if err != nil { if err != nil {
tb.Fatal(err) tb.Fatal(err)

View File

@ -49,6 +49,7 @@ import (
"tailscale.com/util/mak" "tailscale.com/util/mak"
"tailscale.com/util/set" "tailscale.com/util/set"
"tailscale.com/util/testenv" "tailscale.com/util/testenv"
"tailscale.com/util/usermetric"
"tailscale.com/version" "tailscale.com/version"
"tailscale.com/wgengine/capture" "tailscale.com/wgengine/capture"
"tailscale.com/wgengine/filter" "tailscale.com/wgengine/filter"
@ -195,6 +196,9 @@ type Config struct {
// HealthTracker, if non-nil, is the health tracker to use. // HealthTracker, if non-nil, is the health tracker to use.
HealthTracker *health.Tracker HealthTracker *health.Tracker
// UserMetricsRegistry, if non-nil, is the usermetrics registry to use.
UserMetricsRegistry *usermetric.Registry
// Dialer is the dialer to use for outbound connections. // Dialer is the dialer to use for outbound connections.
// If nil, a new Dialer is created. // If nil, a new Dialer is created.
Dialer *tsdial.Dialer Dialer *tsdial.Dialer
@ -249,6 +253,8 @@ func NewFakeUserspaceEngine(logf logger.Logf, opts ...any) (Engine, error) {
conf.ControlKnobs = v conf.ControlKnobs = v
case *health.Tracker: case *health.Tracker:
conf.HealthTracker = v conf.HealthTracker = v
case *usermetric.Registry:
conf.UserMetricsRegistry = v
default: default:
return nil, fmt.Errorf("unknown option type %T", v) return nil, fmt.Errorf("unknown option type %T", v)
} }
@ -289,9 +295,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
var tsTUNDev *tstun.Wrapper var tsTUNDev *tstun.Wrapper
if conf.IsTAP { if conf.IsTAP {
tsTUNDev = tstun.WrapTAP(logf, conf.Tun) tsTUNDev = tstun.WrapTAP(logf, conf.Tun, conf.UserMetricsRegistry)
} else { } else {
tsTUNDev = tstun.Wrap(logf, conf.Tun) tsTUNDev = tstun.Wrap(logf, conf.Tun, conf.UserMetricsRegistry)
} }
closePool.add(tsTUNDev) closePool.add(tsTUNDev)
@ -379,17 +385,18 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
} }
} }
magicsockOpts := magicsock.Options{ magicsockOpts := magicsock.Options{
Logf: logf, Logf: logf,
Port: conf.ListenPort, Port: conf.ListenPort,
EndpointsFunc: endpointsFn, EndpointsFunc: endpointsFn,
DERPActiveFunc: e.RequestStatus, DERPActiveFunc: e.RequestStatus,
IdleFunc: e.tundev.IdleDuration, IdleFunc: e.tundev.IdleDuration,
NoteRecvActivity: e.noteRecvActivity, NoteRecvActivity: e.noteRecvActivity,
NetMon: e.netMon, NetMon: e.netMon,
HealthTracker: e.health, HealthTracker: e.health,
ControlKnobs: conf.ControlKnobs, UserMetricsRegistry: conf.UserMetricsRegistry,
OnPortUpdate: onPortUpdate, ControlKnobs: conf.ControlKnobs,
PeerByKeyFunc: e.PeerByKey, OnPortUpdate: onPortUpdate,
PeerByKeyFunc: e.PeerByKey,
} }
var err error var err error

View File

@ -20,8 +20,9 @@ func TestIsNetstack(t *testing.T) {
e, err := wgengine.NewUserspaceEngine( e, err := wgengine.NewUserspaceEngine(
tstest.WhileTestRunningLogger(t), tstest.WhileTestRunningLogger(t),
wgengine.Config{ wgengine.Config{
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
UserMetricsRegistry: sys.UserMetricsRegistry(),
}, },
) )
if err != nil { if err != nil {
@ -72,6 +73,7 @@ func TestIsNetstackRouter(t *testing.T) {
conf := tt.conf conf := tt.conf
conf.SetSubsystem = sys.Set conf.SetSubsystem = sys.Set
conf.HealthTracker = sys.HealthTracker() conf.HealthTracker = sys.HealthTracker()
conf.UserMetricsRegistry = sys.UserMetricsRegistry()
e, err := wgengine.NewUserspaceEngine(logger.Discard, conf) e, err := wgengine.NewUserspaceEngine(logger.Discard, conf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -25,6 +25,7 @@ import (
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/types/opt" "tailscale.com/types/opt"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine/router" "tailscale.com/wgengine/router"
"tailscale.com/wgengine/wgcfg" "tailscale.com/wgengine/wgcfg"
) )
@ -100,7 +101,8 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView {
func TestUserspaceEngineReconfig(t *testing.T) { func TestUserspaceEngineReconfig(t *testing.T) {
ht := new(health.Tracker) ht := new(health.Tracker)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht) reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -167,9 +169,10 @@ func TestUserspaceEnginePortReconfig(t *testing.T) {
// Keep making a wgengine until we find an unused port // Keep making a wgengine until we find an unused port
var ue *userspaceEngine var ue *userspaceEngine
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry)
for i := range 100 { for i := range 100 {
attempt := uint16(defaultPort + i) attempt := uint16(defaultPort + i)
e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht) e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -249,7 +252,8 @@ func TestUserspaceEnginePeerMTUReconfig(t *testing.T) {
var knobs controlknobs.Knobs var knobs controlknobs.Knobs
ht := new(health.Tracker) ht := new(health.Tracker)
e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht) reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"tailscale.com/health" "tailscale.com/health"
"tailscale.com/util/usermetric"
) )
func TestWatchdog(t *testing.T) { func TestWatchdog(t *testing.T) {
@ -24,7 +25,8 @@ func TestWatchdog(t *testing.T) {
t.Run("default watchdog does not fire", func(t *testing.T) { t.Run("default watchdog does not fire", func(t *testing.T) {
t.Parallel() t.Parallel()
ht := new(health.Tracker) ht := new(health.Tracker)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht) reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }