mirror of
https://github.com/tailscale/tailscale.git
synced 2025-08-22 02:50:42 +00:00
wgengine: instrument with usermetrics
Updates tailscale/corp#22075 Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:

committed by
Kristoffer Dalby

parent
adc8368964
commit
40c991f6b8
@@ -36,6 +36,7 @@ import (
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"golang.org/x/net/proxy"
|
||||
"tailscale.com/client/tailscale"
|
||||
"tailscale.com/cmd/testwrapper/flakytest"
|
||||
"tailscale.com/health"
|
||||
"tailscale.com/ipn"
|
||||
@@ -874,6 +875,78 @@ func promMetricLabelsStr(labels []*dto.LabelPair) string {
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// sendData sends a given amount of bytes from s1 to s2.
|
||||
func sendData(logf func(format string, args ...any), ctx context.Context, bytesCount int, s1, s2 *Server, s1ip, s2ip netip.Addr) error {
|
||||
l := must.Get(s1.Listen("tcp", fmt.Sprintf("%s:8081", s1ip)))
|
||||
defer l.Close()
|
||||
|
||||
// Dial to s1 from s2
|
||||
w, err := s2.Dial(ctx, "tcp", fmt.Sprintf("%s:8081", s1ip))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
stopReceive := make(chan struct{})
|
||||
defer close(stopReceive)
|
||||
allReceived := make(chan error)
|
||||
defer close(allReceived)
|
||||
|
||||
go func() {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
allReceived <- err
|
||||
return
|
||||
}
|
||||
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
|
||||
|
||||
total := 0
|
||||
recvStart := time.Now()
|
||||
for {
|
||||
got := make([]byte, bytesCount)
|
||||
n, err := conn.Read(got)
|
||||
if n != bytesCount {
|
||||
logf("read %d bytes, want %d", n, bytesCount)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stopReceive:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
allReceived <- fmt.Errorf("failed reading packet, %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
total += n
|
||||
logf("received %d/%d bytes, %.2f %%", total, bytesCount, (float64(total) / (float64(bytesCount)) * 100))
|
||||
if total == bytesCount {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
logf("all received, took: %s", time.Since(recvStart).String())
|
||||
allReceived <- nil
|
||||
}()
|
||||
|
||||
sendStart := time.Now()
|
||||
w.SetWriteDeadline(time.Now().Add(30 * time.Second))
|
||||
if _, err := w.Write(bytes.Repeat([]byte("A"), bytesCount)); err != nil {
|
||||
stopReceive <- struct{}{}
|
||||
return err
|
||||
}
|
||||
|
||||
logf("all sent (%s), waiting for all packets (%d) to be received", time.Since(sendStart).String(), bytesCount)
|
||||
err, _ = <-allReceived
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestUserMetrics(t *testing.T) {
|
||||
flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/13420")
|
||||
tstest.ResourceCheck(t)
|
||||
@@ -882,7 +955,7 @@ func TestUserMetrics(t *testing.T) {
|
||||
|
||||
controlURL, c := startControl(t)
|
||||
s1, s1ip, s1PubKey := startServer(t, ctx, controlURL, "s1")
|
||||
s2, _, _ := startServer(t, ctx, controlURL, "s2")
|
||||
s2, s2ip, _ := startServer(t, ctx, controlURL, "s2")
|
||||
|
||||
s1.lb.EditPrefs(&ipn.MaskedPrefs{
|
||||
Prefs: ipn.Prefs{
|
||||
@@ -951,6 +1024,20 @@ func TestUserMetrics(t *testing.T) {
|
||||
return status1.Self.PrimaryRoutes != nil && status1.Self.PrimaryRoutes.Len() == int(wantRoutes)+1
|
||||
})
|
||||
|
||||
mustDirect(t, t.Logf, lc1, lc2)
|
||||
|
||||
// 10 megabytes
|
||||
bytesToSend := 10 * 1024 * 1024
|
||||
|
||||
// This asserts generates some traffic, it is factored out
|
||||
// of TestUDPConn.
|
||||
start := time.Now()
|
||||
err = sendData(t.Logf, ctx, bytesToSend, s1, s2, s1ip, s2ip)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to send packets: %v", err)
|
||||
}
|
||||
t.Logf("Sent %d bytes from s1 to s2 in %s", bytesToSend, time.Since(start).String())
|
||||
|
||||
ctxLc, cancelLc := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancelLc()
|
||||
metrics1, err := lc1.UserMetrics(ctxLc)
|
||||
@@ -968,6 +1055,9 @@ func TestUserMetrics(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Allow the metrics for the bytes sent to be off by 15%.
|
||||
bytesSentTolerance := 1.15
|
||||
|
||||
t.Logf("Metrics1:\n%s\n", metrics1)
|
||||
|
||||
// The node is advertising 4 routes:
|
||||
@@ -997,6 +1087,18 @@ func TestUserMetrics(t *testing.T) {
|
||||
t.Errorf("metrics1, tailscaled_primary_routes: got %v, want %v", got, want)
|
||||
}
|
||||
|
||||
// Verify that the amount of data recorded in bytes is higher or equal to the
|
||||
// 10 megabytes sent.
|
||||
inboundBytes1 := parsedMetrics1[`tailscaled_inbound_bytes_total{path="direct_ipv4"}`]
|
||||
if inboundBytes1 < float64(bytesToSend) {
|
||||
t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected higher (or equal) than %d, got: %f`, bytesToSend, inboundBytes1)
|
||||
}
|
||||
|
||||
// But ensure that it is not too much higher than the 10 megabytes sent.
|
||||
if inboundBytes1 > float64(bytesToSend)*bytesSentTolerance {
|
||||
t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*bytesSentTolerance, inboundBytes1)
|
||||
}
|
||||
|
||||
metrics2, err := lc2.UserMetrics(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -1033,6 +1135,18 @@ func TestUserMetrics(t *testing.T) {
|
||||
if got, want := parsedMetrics2["tailscaled_primary_routes"], 0.0; got != want {
|
||||
t.Errorf("metrics2, tailscaled_primary_routes: got %v, want %v", got, want)
|
||||
}
|
||||
|
||||
// Verify that the amount of data recorded in bytes is higher or equal than the
|
||||
// 10 megabytes sent.
|
||||
outboundBytes2 := parsedMetrics2[`tailscaled_outbound_bytes_total{path="direct_ipv4"}`]
|
||||
if outboundBytes2 < float64(bytesToSend) {
|
||||
t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected higher (or equal) than %d, got: %f`, bytesToSend, outboundBytes2)
|
||||
}
|
||||
|
||||
// But ensure that it is not too much higher than the 10 megabytes sent.
|
||||
if outboundBytes2 > float64(bytesToSend)*bytesSentTolerance {
|
||||
t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*bytesSentTolerance, outboundBytes2)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForCondition(t *testing.T, msg string, waitTime time.Duration, f func() bool) {
|
||||
@@ -1044,3 +1158,33 @@ func waitForCondition(t *testing.T, msg string, waitTime time.Duration, f func()
|
||||
}
|
||||
t.Fatalf("waiting for condition: %s", msg)
|
||||
}
|
||||
|
||||
// mustDirect ensures there is a direct connection between LocalClient 1 and 2
|
||||
func mustDirect(t *testing.T, logf logger.Logf, lc1, lc2 *tailscale.LocalClient) {
|
||||
t.Helper()
|
||||
lastLog := time.Now().Add(-time.Minute)
|
||||
// See https://github.com/tailscale/tailscale/issues/654
|
||||
// and https://github.com/tailscale/tailscale/issues/3247 for discussions of this deadline.
|
||||
for deadline := time.Now().Add(30 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
status1, err := lc1.Status(ctx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
status2, err := lc2.Status(ctx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
pst := status1.Peer[status2.Self.PublicKey]
|
||||
if pst.CurAddr != "" {
|
||||
logf("direct link %s->%s found with addr %s", status1.Self.HostName, status2.Self.HostName, pst.CurAddr)
|
||||
return
|
||||
}
|
||||
if now := time.Now(); now.Sub(lastLog) > time.Second {
|
||||
logf("no direct path %s->%s yet, addrs %v", status1.Self.HostName, status2.Self.HostName, pst.Addrs)
|
||||
lastLog = now
|
||||
}
|
||||
}
|
||||
t.Error("magicsock did not find a direct path from lc1 to lc2")
|
||||
}
|
||||
|
Reference in New Issue
Block a user