wgengine: instrument with usermetrics

Updates tailscale/corp#22075

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby 2024-09-25 17:20:56 +02:00
parent 0e0e53d3b3
commit 5bee939fde
No known key found for this signature in database
5 changed files with 363 additions and 19 deletions

View File

@ -35,6 +35,7 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"golang.org/x/net/proxy"
"tailscale.com/client/tailscale"
"tailscale.com/cmd/testwrapper/flakytest"
"tailscale.com/health"
"tailscale.com/ipn"
@ -873,6 +874,78 @@ func promMetricLabelsStr(labels []*dto.LabelPair) string {
return b.String()
}
// sendData sends a given amount of bytes from s1 to s2.
func sendData(logf func(format string, args ...any), ctx context.Context, bytesCount int, s1, s2 *Server, s1ip, s2ip netip.Addr) error {
l := must.Get(s1.Listen("tcp", fmt.Sprintf("%s:8081", s1ip)))
defer l.Close()
// Dial to s1 from s2
w, err := s2.Dial(ctx, "tcp", fmt.Sprintf("%s:8081", s1ip))
if err != nil {
return err
}
defer w.Close()
stopReceive := make(chan struct{})
defer close(stopReceive)
allReceived := make(chan error)
defer close(allReceived)
go func() {
conn, err := l.Accept()
if err != nil {
allReceived <- err
return
}
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
total := 0
recvStart := time.Now()
for {
got := make([]byte, bytesCount)
n, err := conn.Read(got)
if n != bytesCount {
logf("read %d bytes, want %d", n, bytesCount)
}
select {
case <-stopReceive:
return
default:
}
if err != nil {
allReceived <- fmt.Errorf("failed reading packet, %s", err)
return
}
total += n
logf("received %d/%d bytes, %.2f %%", total, bytesCount, (float64(total) / (float64(bytesCount)) * 100))
if total == bytesCount {
break
}
}
logf("all received, took: %s", time.Since(recvStart).String())
allReceived <- nil
}()
sendStart := time.Now()
w.SetWriteDeadline(time.Now().Add(30 * time.Second))
if _, err := w.Write(bytes.Repeat([]byte("A"), bytesCount)); err != nil {
stopReceive <- struct{}{}
return err
}
logf("all sent (%s), waiting for all packets (%d) to be received", time.Since(sendStart).String(), bytesCount)
err, _ = <-allReceived
if err != nil {
return err
}
return nil
}
func TestUserMetrics(t *testing.T) {
flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/13420")
tstest.ResourceCheck(t)
@ -881,7 +954,7 @@ func TestUserMetrics(t *testing.T) {
controlURL, c := startControl(t)
s1, s1ip, s1PubKey := startServer(t, ctx, controlURL, "s1")
s2, _, _ := startServer(t, ctx, controlURL, "s2")
s2, s2ip, _ := startServer(t, ctx, controlURL, "s2")
s1.lb.EditPrefs(&ipn.MaskedPrefs{
Prefs: ipn.Prefs{
@ -924,6 +997,20 @@ func TestUserMetrics(t *testing.T) {
s1.lb.DebugForceNetmapUpdate()
s2.lb.DebugForceNetmapUpdate()
mustDirect(t, t.Logf, lc1, lc2)
// 10 megabytes
bytesToSend := 10 * 1024 * 1024
// This asserts generates some traffic, it is factored out
// of TestUDPConn.
start := time.Now()
err = sendData(t.Logf, ctx, bytesToSend, s1, s2, s1ip, s2ip)
if err != nil {
t.Fatalf("Failed to send packets: %v", err)
}
t.Logf("Sent %d bytes from s1 to s2 in %s", bytesToSend, time.Since(start).String())
ctxLc, cancelLc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelLc()
metrics1, err := lc1.UserMetrics(ctxLc)
@ -956,6 +1043,18 @@ func TestUserMetrics(t *testing.T) {
t.Errorf("metrics1, tailscaled_health_messages: got %v, want %v", got, want)
}
// Verify that the amount of data recorded in bytes is higher than the
// 10 megabytes sent.
inboundBytes1 := parsedMetrics1[`tailscaled_inbound_bytes_total{path="direct_ipv4"}`]
if inboundBytes1 < float64(bytesToSend) {
t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected higher than %d, got: %f`, bytesToSend, inboundBytes1)
}
// But ensure that it is not too much higher than the 10 megabytes sent, given 10% wiggle room.
if inboundBytes1 > float64(bytesToSend)*1.1 {
t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*1.1, inboundBytes1)
}
metrics2, err := lc2.UserMetrics(ctx)
if err != nil {
t.Fatal(err)
@ -982,4 +1081,46 @@ func TestUserMetrics(t *testing.T) {
if got, want := parsedMetrics2[`tailscaled_health_messages{type="warning"}`], float64(len(status2.Health)); got != want {
t.Errorf("metrics2, tailscaled_health_messages: got %v, want %v", got, want)
}
// Verify that the amount of data recorded in bytes is higher than the
// 10 megabytes sent.
outboundBytes2 := parsedMetrics2[`tailscaled_outbound_bytes_total{path="direct_ipv4"}`]
if outboundBytes2 < float64(bytesToSend) {
t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected higher than %d, got: %f`, bytesToSend, outboundBytes2)
}
// But ensure that it is not too much higher than the 10 megabytes sent, given 10% wiggle room.
if outboundBytes2 > float64(bytesToSend)*1.1 {
t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*1.1, outboundBytes2)
}
}
// mustDirect ensures there is a direct connection between LocalClient 1 and 2
func mustDirect(t *testing.T, logf logger.Logf, lc1, lc2 *tailscale.LocalClient) {
t.Helper()
lastLog := time.Now().Add(-time.Minute)
// See https://github.com/tailscale/tailscale/issues/654
// and https://github.com/tailscale/tailscale/issues/3247 for discussions of this deadline.
for deadline := time.Now().Add(30 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status1, err := lc1.Status(ctx)
if err != nil {
continue
}
status2, err := lc2.Status(ctx)
if err != nil {
continue
}
pst := status1.Peer[status2.Self.PublicKey]
if pst.CurAddr != "" {
logf("direct link %s->%s found with addr %s", status1.Self.HostName, status2.Self.HostName, pst.CurAddr)
return
}
if now := time.Now(); now.Sub(lastLog) > time.Second {
logf("no direct path %s->%s yet, addrs %v", status1.Self.HostName, status2.Self.HostName, pst.Addrs)
lastLog = now
}
}
t.Error("magicsock did not find a direct path from lc1 to lc2")
}

View File

@ -690,7 +690,9 @@ func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint)
// No data read occurred. Wait for another packet.
continue
}
metricRecvDataDERP.Add(1)
metricRecvDataPacketsDERP.Add(1)
c.metrics.inboundPacketsTotal.Add(pathLabel{Path: PathDERP}, 1)
c.metrics.inboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(n))
sizes[0] = n
eps[0] = ep
return 1, nil

View File

@ -960,26 +960,43 @@ func (de *endpoint) send(buffs [][]byte) error {
de.noteBadEndpoint(udpAddr)
}
var txBytes int
for _, b := range buffs {
txBytes += len(b)
}
var label pathLabel
switch {
case udpAddr.Addr().Is4():
label = pathLabel{Path: PathDirectIPv4}
case udpAddr.Addr().Is6():
label = pathLabel{Path: PathDirectIPv6}
}
de.c.metrics.outboundPacketsTotal.Add(label, int64(len(buffs)))
de.c.metrics.outboundBytesTotal.Add(label, int64(txBytes))
// TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends.
if stats := de.c.stats.Load(); err == nil && stats != nil {
var txBytes int
for _, b := range buffs {
txBytes += len(b)
}
stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes)
}
}
if derpAddr.IsValid() {
allOk := true
var txBytes int
for _, buff := range buffs {
ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff)
if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buff))
}
txBytes += len(buff)
if !ok {
allOk = false
}
}
if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, txBytes)
}
de.c.metrics.outboundPacketsTotal.Add(pathLabel{Path: PathDERP}, int64(len(buffs)))
de.c.metrics.outboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(txBytes))
if allOk {
return nil
}

View File

@ -33,6 +33,7 @@ import (
"tailscale.com/health"
"tailscale.com/hostinfo"
"tailscale.com/ipn/ipnstate"
tsmetrics "tailscale.com/metrics"
"tailscale.com/net/connstats"
"tailscale.com/net/netcheck"
"tailscale.com/net/neterror"
@ -80,6 +81,41 @@ const (
socketBufferSize = 7 << 20
)
// Path is a label indicating the type of path a packet took.
type Path string
const (
PathDirectIPv4 Path = "direct_ipv4"
PathDirectIPv6 Path = "direct_ipv6"
PathDERP Path = "derp"
)
type pathLabel struct {
// Path indicates the path that the packet took:
// - direct_ipv4
// - direct_ipv6
// - derp
Path Path
}
type metrics struct {
// inboundPacketsTotal is the total number of inbound packets received,
// labeled by the path the packet took.
inboundPacketsTotal *tsmetrics.MultiLabelMap[pathLabel]
// outboundPacketsTotal is the total number of outbound packets sent,
// labeled by the path the packet took.
outboundPacketsTotal *tsmetrics.MultiLabelMap[pathLabel]
// inboundBytesTotal is the total number of inbound bytes received,
// labeled by the path the packet took.
inboundBytesTotal *tsmetrics.MultiLabelMap[pathLabel]
// outboundBytesTotal is the total number of outbound bytes sent,
// labeled by the path the packet took.
outboundBytesTotal *tsmetrics.MultiLabelMap[pathLabel]
}
// A Conn routes UDP packets and actively manages a list of its endpoints.
type Conn struct {
// This block mirrors the contents and field order of the Options
@ -321,6 +357,9 @@ type Conn struct {
// responsibility to ensure that traffic from these endpoints is routed
// to the node.
staticEndpoints views.Slice[netip.AddrPort]
// metrics contains the metrics for the magicsock instance.
metrics *metrics
}
// SetDebugLoggingEnabled controls whether spammy debug logging is enabled.
@ -503,6 +542,33 @@ func NewConn(opts Options) (*Conn, error) {
UseDNSCache: true,
}
c.metrics = &metrics{
inboundBytesTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel](
opts.Metrics,
"tailscaled_inbound_bytes_total",
"counter",
"Counts the number of bytes received from other peers",
),
inboundPacketsTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel](
opts.Metrics,
"tailscaled_inbound_packets_total",
"counter",
"Counts the number of packets received from other peers",
),
outboundBytesTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel](
opts.Metrics,
"tailscaled_outbound_bytes_total",
"counter",
"Counts the number of bytes sent to other peers",
),
outboundPacketsTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel](
opts.Metrics,
"tailscaled_outbound_packets_total",
"counter",
"Counts the number of packets sent to other peers",
),
}
if d4, err := c.listenRawDisco("ip4"); err == nil {
c.logf("[v1] using BPF disco receiver for IPv4")
c.closeDisco4 = d4
@ -1141,6 +1207,15 @@ func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte) (sent bool, err error) {
} else {
if sent {
metricSendUDP.Add(1)
switch {
case ipp.Addr().Is4():
c.metrics.outboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, 1)
c.metrics.outboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(len(b)))
case ipp.Addr().Is6():
c.metrics.outboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, 1)
c.metrics.outboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(len(b)))
}
}
}
return
@ -1278,19 +1353,30 @@ func (c *Conn) putReceiveBatch(batch *receiveBatch) {
c.receiveBatchPool.Put(batch)
}
// receiveIPv4 creates an IPv4 ReceiveFunc reading from c.pconn4.
func (c *Conn) receiveIPv4() conn.ReceiveFunc {
return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), metricRecvDataIPv4)
return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4),
func(i int64) {
metricRecvDataPacketsIPv4.Add(i)
c.metrics.inboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, i)
}, func(i int64) {
c.metrics.inboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, i)
})
}
// receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6.
func (c *Conn) receiveIPv6() conn.ReceiveFunc {
return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), metricRecvDataIPv6)
return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6),
func(i int64) {
metricRecvDataPacketsIPv6.Add(i)
c.metrics.inboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, i)
}, func(i int64) {
c.metrics.inboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, i)
})
}
// mkReceiveFunc creates a ReceiveFunc reading from ruc.
// The provided healthItem and metric are updated if non-nil.
func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, metric *clientmetric.Metric) conn.ReceiveFunc {
// The provided healthItem and metrics are updated if non-nil.
func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, packetMetricFunc, bytesMetricFunc func(int64)) conn.ReceiveFunc {
// epCache caches an IPPort->endpoint for hot flows.
var epCache ippEndpointCache
@ -1327,8 +1413,11 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu
}
ipp := msg.Addr.(*net.UDPAddr).AddrPort()
if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok {
if metric != nil {
metric.Add(1)
if packetMetricFunc != nil {
packetMetricFunc(1)
}
if bytesMetricFunc != nil {
bytesMetricFunc(int64(msg.N))
}
eps[i] = ep
sizes[i] = msg.N
@ -2938,9 +3027,9 @@ var (
// Data packets (non-disco)
metricSendData = clientmetric.NewCounter("magicsock_send_data")
metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down")
metricRecvDataDERP = clientmetric.NewCounter("magicsock_recv_data_derp")
metricRecvDataIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4")
metricRecvDataIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6")
metricRecvDataPacketsDERP = clientmetric.NewCounter("magicsock_recv_data_derp")
metricRecvDataPacketsIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4")
metricRecvDataPacketsIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6")
// Disco packets
metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp")

View File

@ -10,6 +10,7 @@ import (
"crypto/tls"
"encoding/binary"
"errors"
"expvar"
"fmt"
"io"
"math/rand"
@ -28,6 +29,7 @@ import (
"time"
"unsafe"
"github.com/google/go-cmp/cmp"
wgconn "github.com/tailscale/wireguard-go/conn"
"github.com/tailscale/wireguard-go/device"
"github.com/tailscale/wireguard-go/tun/tuntest"
@ -1188,6 +1190,99 @@ func testTwoDevicePing(t *testing.T, d *devices) {
checkStats(t, m1, m1Conns)
checkStats(t, m2, m2Conns)
})
t.Run("compare-metrics-stats", func(t *testing.T) {
setT(t)
defer setT(outerT)
m1.conn.resetMetricsForTest()
m1.stats.TestExtract()
m2.conn.resetMetricsForTest()
m2.stats.TestExtract()
t.Logf("Metrics before: %s\n", m1.metrics.String())
ping1(t)
ping2(t)
assertConnStatsAndUserMetricsEqual(t, m1)
assertConnStatsAndUserMetricsEqual(t, m2)
t.Logf("Metrics after: %s\n", m1.metrics.String())
})
}
func (c *Conn) resetMetricsForTest() {
c.metrics.inboundBytesTotal.ResetAllForTest()
c.metrics.inboundPacketsTotal.ResetAllForTest()
c.metrics.outboundBytesTotal.ResetAllForTest()
c.metrics.outboundPacketsTotal.ResetAllForTest()
}
func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) {
_, phys := ms.stats.TestExtract()
physIPv4RxBytes := int64(0)
physIPv4TxBytes := int64(0)
physDERPRxBytes := int64(0)
physDERPTxBytes := int64(0)
physIPv4RxPackets := int64(0)
physIPv4TxPackets := int64(0)
physDERPRxPackets := int64(0)
physDERPTxPackets := int64(0)
for conn, count := range phys {
t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String())
if conn.Dst.String() == "127.3.3.40:1" {
physDERPRxBytes += int64(count.RxBytes)
physDERPTxBytes += int64(count.TxBytes)
physDERPRxPackets += int64(count.RxPackets)
physDERPTxPackets += int64(count.TxPackets)
} else {
physIPv4RxBytes += int64(count.RxBytes)
physIPv4TxBytes += int64(count.TxBytes)
physIPv4RxPackets += int64(count.RxPackets)
physIPv4TxPackets += int64(count.TxPackets)
}
}
var metricIPv4RxBytes, metricIPv4TxBytes, metricDERPRxBytes, metricDERPTxBytes int64
var metricIPv4RxPackets, metricIPv4TxPackets, metricDERPRxPackets, metricDERPTxPackets int64
if m, ok := ms.conn.metrics.inboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4RxBytes = m.Value()
}
if m, ok := ms.conn.metrics.outboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4TxBytes = m.Value()
}
if m, ok := ms.conn.metrics.inboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPRxBytes = m.Value()
}
if m, ok := ms.conn.metrics.outboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPTxBytes = m.Value()
}
if m, ok := ms.conn.metrics.inboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4RxPackets = m.Value()
}
if m, ok := ms.conn.metrics.outboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok {
metricIPv4TxPackets = m.Value()
}
if m, ok := ms.conn.metrics.inboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPRxPackets = m.Value()
}
if m, ok := ms.conn.metrics.outboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok {
metricDERPTxPackets = m.Value()
}
assertEqual(t, "derp bytes inbound", physDERPRxBytes, metricDERPRxBytes)
assertEqual(t, "derp bytes outbound", physDERPTxBytes, metricDERPTxBytes)
assertEqual(t, "ipv4 bytes inbound", physIPv4RxBytes, metricIPv4RxBytes)
assertEqual(t, "ipv4 bytes outbound", physIPv4TxBytes, metricIPv4TxBytes)
assertEqual(t, "derp packets inbound", physDERPRxPackets, metricDERPRxPackets)
assertEqual(t, "derp packets outbound", physDERPTxPackets, metricDERPTxPackets)
assertEqual(t, "ipv4 packets inbound", physIPv4RxPackets, metricIPv4RxPackets)
assertEqual(t, "ipv4 packets outbound", physIPv4TxPackets, metricIPv4TxPackets)
}
func assertEqual(t *testing.T, name string, a, b any) {
t.Helper()
t.Logf("assertEqual %s: %v == %v", name, a, b)
if diff := cmp.Diff(a, b); diff != "" {
t.Errorf("%s mismatch (-want +got):\n%s", name, diff)
}
}
func TestDiscoMessage(t *testing.T) {