mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-02 10:01:57 +00:00
util/usermetrics: make usermetrics non-global
this commit changes usermetrics to be non-global, this is a building block for correct metrics if a go process runs multiple tsnets or in tests. Updates #13420 Updates tailscale/corp#22075 Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
committed by
Kristoffer Dalby
parent
e1bbe1bf45
commit
0e0e53d3b3
@@ -24,6 +24,7 @@ import (
|
||||
"go4.org/mem"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/stack"
|
||||
"tailscale.com/disco"
|
||||
tsmetrics "tailscale.com/metrics"
|
||||
"tailscale.com/net/connstats"
|
||||
"tailscale.com/net/packet"
|
||||
"tailscale.com/net/packet/checksum"
|
||||
@@ -209,6 +210,30 @@ type Wrapper struct {
|
||||
stats atomic.Pointer[connstats.Statistics]
|
||||
|
||||
captureHook syncs.AtomicValue[capture.Callback]
|
||||
|
||||
metrics *metrics
|
||||
}
|
||||
|
||||
type metrics struct {
|
||||
inboundDroppedPacketsTotal *tsmetrics.MultiLabelMap[dropPacketLabel]
|
||||
outboundDroppedPacketsTotal *tsmetrics.MultiLabelMap[dropPacketLabel]
|
||||
}
|
||||
|
||||
func registerMetrics(reg *usermetric.Registry) *metrics {
|
||||
return &metrics{
|
||||
inboundDroppedPacketsTotal: usermetric.NewMultiLabelMapWithRegistry[dropPacketLabel](
|
||||
reg,
|
||||
"tailscaled_inbound_dropped_packets_total",
|
||||
"counter",
|
||||
"Counts the number of dropped packets received by the node from other peers",
|
||||
),
|
||||
outboundDroppedPacketsTotal: usermetric.NewMultiLabelMapWithRegistry[dropPacketLabel](
|
||||
reg,
|
||||
"tailscaled_outbound_dropped_packets_total",
|
||||
"counter",
|
||||
"Counts the number of packets dropped while being sent to other peers",
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// tunInjectedRead is an injected packet pretending to be a tun.Read().
|
||||
@@ -248,15 +273,15 @@ func (w *Wrapper) Start() {
|
||||
close(w.startCh)
|
||||
}
|
||||
|
||||
func WrapTAP(logf logger.Logf, tdev tun.Device) *Wrapper {
|
||||
return wrap(logf, tdev, true)
|
||||
func WrapTAP(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper {
|
||||
return wrap(logf, tdev, true, m)
|
||||
}
|
||||
|
||||
func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
|
||||
return wrap(logf, tdev, false)
|
||||
func Wrap(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper {
|
||||
return wrap(logf, tdev, false, m)
|
||||
}
|
||||
|
||||
func wrap(logf logger.Logf, tdev tun.Device, isTAP bool) *Wrapper {
|
||||
func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry) *Wrapper {
|
||||
logf = logger.WithPrefix(logf, "tstun: ")
|
||||
w := &Wrapper{
|
||||
logf: logf,
|
||||
@@ -274,6 +299,7 @@ func wrap(logf logger.Logf, tdev tun.Device, isTAP bool) *Wrapper {
|
||||
// TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets.
|
||||
filterFlags: filter.LogAccepts | filter.LogDrops,
|
||||
startCh: make(chan struct{}),
|
||||
metrics: registerMetrics(m),
|
||||
}
|
||||
|
||||
w.vectorBuffer = make([][]byte, tdev.BatchSize())
|
||||
@@ -872,7 +898,7 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf
|
||||
|
||||
if filt.RunOut(p, t.filterFlags) != filter.Accept {
|
||||
metricPacketOutDropFilter.Add(1)
|
||||
metricOutboundDroppedPacketsTotal.Add(dropPacketLabel{
|
||||
t.metrics.outboundDroppedPacketsTotal.Add(dropPacketLabel{
|
||||
Reason: DropReasonACL,
|
||||
}, 1)
|
||||
return filter.Drop, gro
|
||||
@@ -1144,7 +1170,7 @@ func (t *Wrapper) filterPacketInboundFromWireGuard(p *packet.Parsed, captHook ca
|
||||
|
||||
if outcome != filter.Accept {
|
||||
metricPacketInDropFilter.Add(1)
|
||||
metricInboundDroppedPacketsTotal.Add(dropPacketLabel{
|
||||
t.metrics.inboundDroppedPacketsTotal.Add(dropPacketLabel{
|
||||
Reason: DropReasonACL,
|
||||
}, 1)
|
||||
|
||||
@@ -1225,7 +1251,7 @@ func (t *Wrapper) Write(buffs [][]byte, offset int) (int, error) {
|
||||
t.noteActivity()
|
||||
_, err := t.tdevWrite(buffs, offset)
|
||||
if err != nil {
|
||||
metricInboundDroppedPacketsTotal.Add(dropPacketLabel{
|
||||
t.metrics.inboundDroppedPacketsTotal.Add(dropPacketLabel{
|
||||
Reason: DropReasonError,
|
||||
}, int64(len(buffs)))
|
||||
}
|
||||
@@ -1482,19 +1508,6 @@ type dropPacketLabel struct {
|
||||
Reason DropReason
|
||||
}
|
||||
|
||||
var (
|
||||
metricInboundDroppedPacketsTotal = usermetric.NewMultiLabelMap[dropPacketLabel](
|
||||
"tailscaled_inbound_dropped_packets_total",
|
||||
"counter",
|
||||
"Counts the number of dropped packets received by the node from other peers",
|
||||
)
|
||||
metricOutboundDroppedPacketsTotal = usermetric.NewMultiLabelMap[dropPacketLabel](
|
||||
"tailscaled_outbound_dropped_packets_total",
|
||||
"counter",
|
||||
"Counts the number of packets dropped while being sent to other peers",
|
||||
)
|
||||
)
|
||||
|
||||
func (t *Wrapper) InstallCaptureHook(cb capture.Callback) {
|
||||
t.captureHook.Store(cb)
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"reflect"
|
||||
@@ -38,6 +39,7 @@ import (
|
||||
"tailscale.com/types/ptr"
|
||||
"tailscale.com/types/views"
|
||||
"tailscale.com/util/must"
|
||||
"tailscale.com/util/usermetric"
|
||||
"tailscale.com/wgengine/capture"
|
||||
"tailscale.com/wgengine/filter"
|
||||
"tailscale.com/wgengine/wgcfg"
|
||||
@@ -173,7 +175,8 @@ func setfilter(logf logger.Logf, tun *Wrapper) {
|
||||
|
||||
func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper) {
|
||||
chtun := tuntest.NewChannelTUN()
|
||||
tun := Wrap(logf, chtun.TUN())
|
||||
reg := new(usermetric.Registry)
|
||||
tun := Wrap(logf, chtun.TUN(), reg)
|
||||
if secure {
|
||||
setfilter(logf, tun)
|
||||
} else {
|
||||
@@ -185,7 +188,8 @@ func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper
|
||||
|
||||
func newFakeTUN(logf logger.Logf, secure bool) (*fakeTUN, *Wrapper) {
|
||||
ftun := NewFake()
|
||||
tun := Wrap(logf, ftun)
|
||||
reg := new(usermetric.Registry)
|
||||
tun := Wrap(logf, ftun, reg)
|
||||
if secure {
|
||||
setfilter(logf, tun)
|
||||
} else {
|
||||
@@ -315,15 +319,15 @@ func mustHexDecode(s string) []byte {
|
||||
}
|
||||
|
||||
func TestFilter(t *testing.T) {
|
||||
// Reset the metrics before test. These are global
|
||||
// so the different tests might have affected them.
|
||||
metricInboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonACL}, 0)
|
||||
metricInboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonError}, 0)
|
||||
metricOutboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonACL}, 0)
|
||||
|
||||
chtun, tun := newChannelTUN(t.Logf, true)
|
||||
defer tun.Close()
|
||||
|
||||
// Reset the metrics before test. These are global
|
||||
// so the different tests might have affected them.
|
||||
tun.metrics.inboundDroppedPacketsTotal.ResetAllForTest()
|
||||
tun.metrics.outboundDroppedPacketsTotal.ResetAllForTest()
|
||||
|
||||
type direction int
|
||||
|
||||
const (
|
||||
@@ -436,20 +440,26 @@ func TestFilter(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
inACL := metricInboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL})
|
||||
inError := metricInboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonError})
|
||||
outACL := metricOutboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL})
|
||||
|
||||
assertMetricPackets(t, "inACL", "3", inACL.String())
|
||||
assertMetricPackets(t, "inError", "0", inError.String())
|
||||
assertMetricPackets(t, "outACL", "1", outACL.String())
|
||||
var metricInboundDroppedPacketsACL, metricInboundDroppedPacketsErr, metricOutboundDroppedPacketsACL int64
|
||||
if m, ok := tun.metrics.inboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL}).(*expvar.Int); ok {
|
||||
metricInboundDroppedPacketsACL = m.Value()
|
||||
}
|
||||
if m, ok := tun.metrics.inboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonError}).(*expvar.Int); ok {
|
||||
metricInboundDroppedPacketsErr = m.Value()
|
||||
}
|
||||
if m, ok := tun.metrics.outboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL}).(*expvar.Int); ok {
|
||||
metricOutboundDroppedPacketsACL = m.Value()
|
||||
}
|
||||
|
||||
assertMetricPackets(t, "inACL", 3, metricInboundDroppedPacketsACL)
|
||||
assertMetricPackets(t, "inError", 0, metricInboundDroppedPacketsErr)
|
||||
assertMetricPackets(t, "outACL", 1, metricOutboundDroppedPacketsACL)
|
||||
}
|
||||
|
||||
func assertMetricPackets(t *testing.T, metricName, want, got string) {
|
||||
func assertMetricPackets(t *testing.T, metricName string, want, got int64) {
|
||||
t.Helper()
|
||||
if want != got {
|
||||
t.Errorf("%s got unexpected value, got %s, want %s", metricName, got, want)
|
||||
t.Errorf("%s got unexpected value, got %d, want %d", metricName, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -512,6 +522,7 @@ func TestAtomic64Alignment(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPeerAPIBypass(t *testing.T) {
|
||||
reg := new(usermetric.Registry)
|
||||
wrapperWithPeerAPI := &Wrapper{
|
||||
PeerAPIPort: func(ip netip.Addr) (port uint16, ok bool) {
|
||||
if ip == netip.MustParseAddr("100.64.1.2") {
|
||||
@@ -519,6 +530,7 @@ func TestPeerAPIBypass(t *testing.T) {
|
||||
}
|
||||
return
|
||||
},
|
||||
metrics: registerMetrics(reg),
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
@@ -534,13 +546,16 @@ func TestPeerAPIBypass(t *testing.T) {
|
||||
PeerAPIPort: func(netip.Addr) (port uint16, ok bool) {
|
||||
return 60000, true
|
||||
},
|
||||
metrics: registerMetrics(reg),
|
||||
},
|
||||
pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000),
|
||||
want: filter.Drop,
|
||||
},
|
||||
{
|
||||
name: "reject_with_filter",
|
||||
w: &Wrapper{},
|
||||
name: "reject_with_filter",
|
||||
w: &Wrapper{
|
||||
metrics: registerMetrics(reg),
|
||||
},
|
||||
filter: filter.NewAllowNone(logger.Discard, new(netipx.IPSet)),
|
||||
pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000),
|
||||
want: filter.Drop,
|
||||
|
||||
Reference in New Issue
Block a user