From 5bee939fdee66d02a641ff41374eb18e2f0559d3 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 25 Sep 2024 17:20:56 +0200 Subject: [PATCH] wgengine: instrument with usermetrics Updates tailscale/corp#22075 Signed-off-by: Kristoffer Dalby --- tsnet/tsnet_test.go | 143 ++++++++++++++++++++++++++- wgengine/magicsock/derp.go | 4 +- wgengine/magicsock/endpoint.go | 31 ++++-- wgengine/magicsock/magicsock.go | 109 ++++++++++++++++++-- wgengine/magicsock/magicsock_test.go | 95 ++++++++++++++++++ 5 files changed, 363 insertions(+), 19 deletions(-) diff --git a/tsnet/tsnet_test.go b/tsnet/tsnet_test.go index 96c60de47..950fadad7 100644 --- a/tsnet/tsnet_test.go +++ b/tsnet/tsnet_test.go @@ -35,6 +35,7 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "golang.org/x/net/proxy" + "tailscale.com/client/tailscale" "tailscale.com/cmd/testwrapper/flakytest" "tailscale.com/health" "tailscale.com/ipn" @@ -873,6 +874,78 @@ func promMetricLabelsStr(labels []*dto.LabelPair) string { return b.String() } +// sendData sends a given amount of bytes from s1 to s2. +func sendData(logf func(format string, args ...any), ctx context.Context, bytesCount int, s1, s2 *Server, s1ip, s2ip netip.Addr) error { + l := must.Get(s1.Listen("tcp", fmt.Sprintf("%s:8081", s1ip))) + defer l.Close() + + // Dial to s1 from s2 + w, err := s2.Dial(ctx, "tcp", fmt.Sprintf("%s:8081", s1ip)) + if err != nil { + return err + } + defer w.Close() + + stopReceive := make(chan struct{}) + defer close(stopReceive) + allReceived := make(chan error) + defer close(allReceived) + + go func() { + conn, err := l.Accept() + if err != nil { + allReceived <- err + return + } + conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) + + total := 0 + recvStart := time.Now() + for { + got := make([]byte, bytesCount) + n, err := conn.Read(got) + if n != bytesCount { + logf("read %d bytes, want %d", n, bytesCount) + } + + select { + case <-stopReceive: + return + default: + } + + if err != nil { + allReceived <- fmt.Errorf("failed reading packet, %s", err) + return + } + + total += n + logf("received %d/%d bytes, %.2f %%", total, bytesCount, (float64(total) / (float64(bytesCount)) * 100)) + if total == bytesCount { + break + } + } + + logf("all received, took: %s", time.Since(recvStart).String()) + allReceived <- nil + }() + + sendStart := time.Now() + w.SetWriteDeadline(time.Now().Add(30 * time.Second)) + if _, err := w.Write(bytes.Repeat([]byte("A"), bytesCount)); err != nil { + stopReceive <- struct{}{} + return err + } + + logf("all sent (%s), waiting for all packets (%d) to be received", time.Since(sendStart).String(), bytesCount) + err, _ = <-allReceived + if err != nil { + return err + } + + return nil +} + func TestUserMetrics(t *testing.T) { flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/13420") tstest.ResourceCheck(t) @@ -881,7 +954,7 @@ func TestUserMetrics(t *testing.T) { controlURL, c := startControl(t) s1, s1ip, s1PubKey := startServer(t, ctx, controlURL, "s1") - s2, _, _ := startServer(t, ctx, controlURL, "s2") + s2, s2ip, _ := startServer(t, ctx, controlURL, "s2") s1.lb.EditPrefs(&ipn.MaskedPrefs{ Prefs: ipn.Prefs{ @@ -924,6 +997,20 @@ func TestUserMetrics(t *testing.T) { s1.lb.DebugForceNetmapUpdate() s2.lb.DebugForceNetmapUpdate() + mustDirect(t, t.Logf, lc1, lc2) + + // 10 megabytes + bytesToSend := 10 * 1024 * 1024 + + // This asserts generates some traffic, it is factored out + // of TestUDPConn. + start := time.Now() + err = sendData(t.Logf, ctx, bytesToSend, s1, s2, s1ip, s2ip) + if err != nil { + t.Fatalf("Failed to send packets: %v", err) + } + t.Logf("Sent %d bytes from s1 to s2 in %s", bytesToSend, time.Since(start).String()) + ctxLc, cancelLc := context.WithTimeout(context.Background(), 5*time.Second) defer cancelLc() metrics1, err := lc1.UserMetrics(ctxLc) @@ -956,6 +1043,18 @@ func TestUserMetrics(t *testing.T) { t.Errorf("metrics1, tailscaled_health_messages: got %v, want %v", got, want) } + // Verify that the amount of data recorded in bytes is higher than the + // 10 megabytes sent. + inboundBytes1 := parsedMetrics1[`tailscaled_inbound_bytes_total{path="direct_ipv4"}`] + if inboundBytes1 < float64(bytesToSend) { + t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected higher than %d, got: %f`, bytesToSend, inboundBytes1) + } + + // But ensure that it is not too much higher than the 10 megabytes sent, given 10% wiggle room. + if inboundBytes1 > float64(bytesToSend)*1.1 { + t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*1.1, inboundBytes1) + } + metrics2, err := lc2.UserMetrics(ctx) if err != nil { t.Fatal(err) @@ -982,4 +1081,46 @@ func TestUserMetrics(t *testing.T) { if got, want := parsedMetrics2[`tailscaled_health_messages{type="warning"}`], float64(len(status2.Health)); got != want { t.Errorf("metrics2, tailscaled_health_messages: got %v, want %v", got, want) } + + // Verify that the amount of data recorded in bytes is higher than the + // 10 megabytes sent. + outboundBytes2 := parsedMetrics2[`tailscaled_outbound_bytes_total{path="direct_ipv4"}`] + if outboundBytes2 < float64(bytesToSend) { + t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected higher than %d, got: %f`, bytesToSend, outboundBytes2) + } + + // But ensure that it is not too much higher than the 10 megabytes sent, given 10% wiggle room. + if outboundBytes2 > float64(bytesToSend)*1.1 { + t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*1.1, outboundBytes2) + } +} + +// mustDirect ensures there is a direct connection between LocalClient 1 and 2 +func mustDirect(t *testing.T, logf logger.Logf, lc1, lc2 *tailscale.LocalClient) { + t.Helper() + lastLog := time.Now().Add(-time.Minute) + // See https://github.com/tailscale/tailscale/issues/654 + // and https://github.com/tailscale/tailscale/issues/3247 for discussions of this deadline. + for deadline := time.Now().Add(30 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + status1, err := lc1.Status(ctx) + if err != nil { + continue + } + status2, err := lc2.Status(ctx) + if err != nil { + continue + } + pst := status1.Peer[status2.Self.PublicKey] + if pst.CurAddr != "" { + logf("direct link %s->%s found with addr %s", status1.Self.HostName, status2.Self.HostName, pst.CurAddr) + return + } + if now := time.Now(); now.Sub(lastLog) > time.Second { + logf("no direct path %s->%s yet, addrs %v", status1.Self.HostName, status2.Self.HostName, pst.Addrs) + lastLog = now + } + } + t.Error("magicsock did not find a direct path from lc1 to lc2") } diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go index 69c5cbc90..931934e76 100644 --- a/wgengine/magicsock/derp.go +++ b/wgengine/magicsock/derp.go @@ -690,7 +690,9 @@ func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) // No data read occurred. Wait for another packet. continue } - metricRecvDataDERP.Add(1) + metricRecvDataPacketsDERP.Add(1) + c.metrics.inboundPacketsTotal.Add(pathLabel{Path: PathDERP}, 1) + c.metrics.inboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(n)) sizes[0] = n eps[0] = ep return 1, nil diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 53ecb84de..ec926a14f 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -960,26 +960,43 @@ func (de *endpoint) send(buffs [][]byte) error { de.noteBadEndpoint(udpAddr) } + var txBytes int + for _, b := range buffs { + txBytes += len(b) + } + + var label pathLabel + switch { + case udpAddr.Addr().Is4(): + label = pathLabel{Path: PathDirectIPv4} + case udpAddr.Addr().Is6(): + label = pathLabel{Path: PathDirectIPv6} + } + + de.c.metrics.outboundPacketsTotal.Add(label, int64(len(buffs))) + de.c.metrics.outboundBytesTotal.Add(label, int64(txBytes)) + // TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends. if stats := de.c.stats.Load(); err == nil && stats != nil { - var txBytes int - for _, b := range buffs { - txBytes += len(b) - } stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes) } } if derpAddr.IsValid() { allOk := true + var txBytes int for _, buff := range buffs { ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff) - if stats := de.c.stats.Load(); stats != nil { - stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buff)) - } + txBytes += len(buff) if !ok { allOk = false } } + + if stats := de.c.stats.Load(); stats != nil { + stats.UpdateTxPhysical(de.nodeAddr, derpAddr, txBytes) + } + de.c.metrics.outboundPacketsTotal.Add(pathLabel{Path: PathDERP}, int64(len(buffs))) + de.c.metrics.outboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(txBytes)) if allOk { return nil } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index ff3d02336..9d262f8da 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -33,6 +33,7 @@ import ( "tailscale.com/health" "tailscale.com/hostinfo" "tailscale.com/ipn/ipnstate" + tsmetrics "tailscale.com/metrics" "tailscale.com/net/connstats" "tailscale.com/net/netcheck" "tailscale.com/net/neterror" @@ -80,6 +81,41 @@ const ( socketBufferSize = 7 << 20 ) +// Path is a label indicating the type of path a packet took. +type Path string + +const ( + PathDirectIPv4 Path = "direct_ipv4" + PathDirectIPv6 Path = "direct_ipv6" + PathDERP Path = "derp" +) + +type pathLabel struct { + // Path indicates the path that the packet took: + // - direct_ipv4 + // - direct_ipv6 + // - derp + Path Path +} + +type metrics struct { + // inboundPacketsTotal is the total number of inbound packets received, + // labeled by the path the packet took. + inboundPacketsTotal *tsmetrics.MultiLabelMap[pathLabel] + + // outboundPacketsTotal is the total number of outbound packets sent, + // labeled by the path the packet took. + outboundPacketsTotal *tsmetrics.MultiLabelMap[pathLabel] + + // inboundBytesTotal is the total number of inbound bytes received, + // labeled by the path the packet took. + inboundBytesTotal *tsmetrics.MultiLabelMap[pathLabel] + + // outboundBytesTotal is the total number of outbound bytes sent, + // labeled by the path the packet took. + outboundBytesTotal *tsmetrics.MultiLabelMap[pathLabel] +} + // A Conn routes UDP packets and actively manages a list of its endpoints. type Conn struct { // This block mirrors the contents and field order of the Options @@ -321,6 +357,9 @@ type Conn struct { // responsibility to ensure that traffic from these endpoints is routed // to the node. staticEndpoints views.Slice[netip.AddrPort] + + // metrics contains the metrics for the magicsock instance. + metrics *metrics } // SetDebugLoggingEnabled controls whether spammy debug logging is enabled. @@ -503,6 +542,33 @@ func NewConn(opts Options) (*Conn, error) { UseDNSCache: true, } + c.metrics = &metrics{ + inboundBytesTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + opts.Metrics, + "tailscaled_inbound_bytes_total", + "counter", + "Counts the number of bytes received from other peers", + ), + inboundPacketsTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + opts.Metrics, + "tailscaled_inbound_packets_total", + "counter", + "Counts the number of packets received from other peers", + ), + outboundBytesTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + opts.Metrics, + "tailscaled_outbound_bytes_total", + "counter", + "Counts the number of bytes sent to other peers", + ), + outboundPacketsTotal: usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + opts.Metrics, + "tailscaled_outbound_packets_total", + "counter", + "Counts the number of packets sent to other peers", + ), + } + if d4, err := c.listenRawDisco("ip4"); err == nil { c.logf("[v1] using BPF disco receiver for IPv4") c.closeDisco4 = d4 @@ -1141,6 +1207,15 @@ func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte) (sent bool, err error) { } else { if sent { metricSendUDP.Add(1) + + switch { + case ipp.Addr().Is4(): + c.metrics.outboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, 1) + c.metrics.outboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(len(b))) + case ipp.Addr().Is6(): + c.metrics.outboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, 1) + c.metrics.outboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(len(b))) + } } } return @@ -1278,19 +1353,30 @@ func (c *Conn) putReceiveBatch(batch *receiveBatch) { c.receiveBatchPool.Put(batch) } -// receiveIPv4 creates an IPv4 ReceiveFunc reading from c.pconn4. func (c *Conn) receiveIPv4() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), metricRecvDataIPv4) + return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), + func(i int64) { + metricRecvDataPacketsIPv4.Add(i) + c.metrics.inboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, i) + }, func(i int64) { + c.metrics.inboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, i) + }) } // receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6. func (c *Conn) receiveIPv6() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), metricRecvDataIPv6) + return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), + func(i int64) { + metricRecvDataPacketsIPv6.Add(i) + c.metrics.inboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, i) + }, func(i int64) { + c.metrics.inboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, i) + }) } // mkReceiveFunc creates a ReceiveFunc reading from ruc. -// The provided healthItem and metric are updated if non-nil. -func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, metric *clientmetric.Metric) conn.ReceiveFunc { +// The provided healthItem and metrics are updated if non-nil. +func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, packetMetricFunc, bytesMetricFunc func(int64)) conn.ReceiveFunc { // epCache caches an IPPort->endpoint for hot flows. var epCache ippEndpointCache @@ -1327,8 +1413,11 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu } ipp := msg.Addr.(*net.UDPAddr).AddrPort() if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok { - if metric != nil { - metric.Add(1) + if packetMetricFunc != nil { + packetMetricFunc(1) + } + if bytesMetricFunc != nil { + bytesMetricFunc(int64(msg.N)) } eps[i] = ep sizes[i] = msg.N @@ -2938,9 +3027,9 @@ var ( // Data packets (non-disco) metricSendData = clientmetric.NewCounter("magicsock_send_data") metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down") - metricRecvDataDERP = clientmetric.NewCounter("magicsock_recv_data_derp") - metricRecvDataIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4") - metricRecvDataIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6") + metricRecvDataPacketsDERP = clientmetric.NewCounter("magicsock_recv_data_derp") + metricRecvDataPacketsIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4") + metricRecvDataPacketsIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6") // Disco packets metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp") diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index f979834c9..993132008 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -10,6 +10,7 @@ import ( "crypto/tls" "encoding/binary" "errors" + "expvar" "fmt" "io" "math/rand" @@ -28,6 +29,7 @@ import ( "time" "unsafe" + "github.com/google/go-cmp/cmp" wgconn "github.com/tailscale/wireguard-go/conn" "github.com/tailscale/wireguard-go/device" "github.com/tailscale/wireguard-go/tun/tuntest" @@ -1188,6 +1190,99 @@ func testTwoDevicePing(t *testing.T, d *devices) { checkStats(t, m1, m1Conns) checkStats(t, m2, m2Conns) }) + t.Run("compare-metrics-stats", func(t *testing.T) { + setT(t) + defer setT(outerT) + m1.conn.resetMetricsForTest() + m1.stats.TestExtract() + m2.conn.resetMetricsForTest() + m2.stats.TestExtract() + t.Logf("Metrics before: %s\n", m1.metrics.String()) + ping1(t) + ping2(t) + assertConnStatsAndUserMetricsEqual(t, m1) + assertConnStatsAndUserMetricsEqual(t, m2) + t.Logf("Metrics after: %s\n", m1.metrics.String()) + }) +} + +func (c *Conn) resetMetricsForTest() { + c.metrics.inboundBytesTotal.ResetAllForTest() + c.metrics.inboundPacketsTotal.ResetAllForTest() + c.metrics.outboundBytesTotal.ResetAllForTest() + c.metrics.outboundPacketsTotal.ResetAllForTest() +} + +func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { + _, phys := ms.stats.TestExtract() + + physIPv4RxBytes := int64(0) + physIPv4TxBytes := int64(0) + physDERPRxBytes := int64(0) + physDERPTxBytes := int64(0) + physIPv4RxPackets := int64(0) + physIPv4TxPackets := int64(0) + physDERPRxPackets := int64(0) + physDERPTxPackets := int64(0) + for conn, count := range phys { + t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String()) + if conn.Dst.String() == "127.3.3.40:1" { + physDERPRxBytes += int64(count.RxBytes) + physDERPTxBytes += int64(count.TxBytes) + physDERPRxPackets += int64(count.RxPackets) + physDERPTxPackets += int64(count.TxPackets) + } else { + physIPv4RxBytes += int64(count.RxBytes) + physIPv4TxBytes += int64(count.TxBytes) + physIPv4RxPackets += int64(count.RxPackets) + physIPv4TxPackets += int64(count.TxPackets) + } + } + + var metricIPv4RxBytes, metricIPv4TxBytes, metricDERPRxBytes, metricDERPTxBytes int64 + var metricIPv4RxPackets, metricIPv4TxPackets, metricDERPRxPackets, metricDERPTxPackets int64 + + if m, ok := ms.conn.metrics.inboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4RxBytes = m.Value() + } + if m, ok := ms.conn.metrics.outboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4TxBytes = m.Value() + } + if m, ok := ms.conn.metrics.inboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPRxBytes = m.Value() + } + if m, ok := ms.conn.metrics.outboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPTxBytes = m.Value() + } + if m, ok := ms.conn.metrics.inboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4RxPackets = m.Value() + } + if m, ok := ms.conn.metrics.outboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4TxPackets = m.Value() + } + if m, ok := ms.conn.metrics.inboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPRxPackets = m.Value() + } + if m, ok := ms.conn.metrics.outboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPTxPackets = m.Value() + } + + assertEqual(t, "derp bytes inbound", physDERPRxBytes, metricDERPRxBytes) + assertEqual(t, "derp bytes outbound", physDERPTxBytes, metricDERPTxBytes) + assertEqual(t, "ipv4 bytes inbound", physIPv4RxBytes, metricIPv4RxBytes) + assertEqual(t, "ipv4 bytes outbound", physIPv4TxBytes, metricIPv4TxBytes) + assertEqual(t, "derp packets inbound", physDERPRxPackets, metricDERPRxPackets) + assertEqual(t, "derp packets outbound", physDERPTxPackets, metricDERPTxPackets) + assertEqual(t, "ipv4 packets inbound", physIPv4RxPackets, metricIPv4RxPackets) + assertEqual(t, "ipv4 packets outbound", physIPv4TxPackets, metricIPv4TxPackets) +} + +func assertEqual(t *testing.T, name string, a, b any) { + t.Helper() + t.Logf("assertEqual %s: %v == %v", name, a, b) + if diff := cmp.Diff(a, b); diff != "" { + t.Errorf("%s mismatch (-want +got):\n%s", name, diff) + } } func TestDiscoMessage(t *testing.T) {