mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-26 11:35:35 +00:00
2e5d08ec4f
Previously, tstun.Wrapper and magicsock.Conn managed their own statistics data structure and relied on an external call to Extract to extract (and reset) the statistics. This makes it difficult to ensure a maximum size on the statistics as the caller has no introspection into whether the number of unique connections is getting too large. Invert the control flow such that a *connstats.Statistics is registered with tstun.Wrapper and magicsock.Conn. Methods on non-nil *connstats.Statistics are called for every packet. This allows the implementation of connstats.Statistics (in the future) to better control when it needs to flush to ensure bounds on maximum sizes. The value registered into tstun.Wrapper and magicsock.Conn could be an interface, but that has two performance detriments: 1. Method calls on interface values are more expensive since they must go through a virtual method dispatch. 2. The implementation would need a sync.Mutex to protect the statistics value instead of using an atomic.Pointer. Given that methods on constats.Statistics are called for every packet, we want reduce the CPU cost on this hot path. Signed-off-by: Joe Tsai <joetsai@digital-static.net>
192 lines
5.1 KiB
Go
192 lines
5.1 KiB
Go
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package connstats
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/netip"
|
|
"runtime"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
qt "github.com/frankban/quicktest"
|
|
"tailscale.com/types/ipproto"
|
|
"tailscale.com/types/netlogtype"
|
|
)
|
|
|
|
func testPacketV4(proto ipproto.Proto, srcAddr, dstAddr [4]byte, srcPort, dstPort, size uint16) (out []byte) {
|
|
var ipHdr [20]byte
|
|
ipHdr[0] = 4<<4 | 5
|
|
binary.BigEndian.PutUint16(ipHdr[2:], size)
|
|
ipHdr[9] = byte(proto)
|
|
*(*[4]byte)(ipHdr[12:]) = srcAddr
|
|
*(*[4]byte)(ipHdr[16:]) = dstAddr
|
|
out = append(out, ipHdr[:]...)
|
|
switch proto {
|
|
case ipproto.TCP:
|
|
var tcpHdr [20]byte
|
|
binary.BigEndian.PutUint16(tcpHdr[0:], srcPort)
|
|
binary.BigEndian.PutUint16(tcpHdr[2:], dstPort)
|
|
out = append(out, tcpHdr[:]...)
|
|
case ipproto.UDP:
|
|
var udpHdr [8]byte
|
|
binary.BigEndian.PutUint16(udpHdr[0:], srcPort)
|
|
binary.BigEndian.PutUint16(udpHdr[2:], dstPort)
|
|
out = append(out, udpHdr[:]...)
|
|
default:
|
|
panic(fmt.Sprintf("unknown proto: %d", proto))
|
|
}
|
|
return append(out, make([]byte, int(size)-len(out))...)
|
|
}
|
|
|
|
func TestConcurrent(t *testing.T) {
|
|
c := qt.New(t)
|
|
|
|
var stats Statistics
|
|
var wants []map[netlogtype.Connection]netlogtype.Counts
|
|
gots := make([]map[netlogtype.Connection]netlogtype.Counts, runtime.NumCPU())
|
|
var group sync.WaitGroup
|
|
for i := range gots {
|
|
group.Add(1)
|
|
go func(i int) {
|
|
defer group.Done()
|
|
gots[i] = make(map[netlogtype.Connection]netlogtype.Counts)
|
|
rn := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
var p []byte
|
|
var t netlogtype.Connection
|
|
for j := 0; j < 1000; j++ {
|
|
delay := rn.Intn(10000)
|
|
if p == nil || rn.Intn(64) == 0 {
|
|
proto := ipproto.TCP
|
|
if rn.Intn(2) == 0 {
|
|
proto = ipproto.UDP
|
|
}
|
|
srcAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))})
|
|
dstAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))})
|
|
srcPort := uint16(rand.Intn(16))
|
|
dstPort := uint16(rand.Intn(16))
|
|
size := uint16(64 + rand.Intn(1024))
|
|
p = testPacketV4(proto, srcAddr.As4(), dstAddr.As4(), srcPort, dstPort, size)
|
|
t = netlogtype.Connection{Proto: proto, Src: netip.AddrPortFrom(srcAddr, srcPort), Dst: netip.AddrPortFrom(dstAddr, dstPort)}
|
|
}
|
|
t2 := t
|
|
receive := rn.Intn(2) == 0
|
|
if receive {
|
|
t2.Src, t2.Dst = t2.Dst, t2.Src
|
|
}
|
|
|
|
cnts := gots[i][t2]
|
|
if receive {
|
|
stats.UpdateRxVirtual(p)
|
|
cnts.RxPackets++
|
|
cnts.RxBytes += uint64(len(p))
|
|
} else {
|
|
cnts.TxPackets++
|
|
cnts.TxBytes += uint64(len(p))
|
|
stats.UpdateTxVirtual(p)
|
|
}
|
|
gots[i][t2] = cnts
|
|
time.Sleep(time.Duration(rn.Intn(1 + delay)))
|
|
}
|
|
}(i)
|
|
}
|
|
for range gots {
|
|
virtual, _ := stats.Extract()
|
|
wants = append(wants, virtual)
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
group.Wait()
|
|
virtual, _ := stats.Extract()
|
|
wants = append(wants, virtual)
|
|
|
|
got := make(map[netlogtype.Connection]netlogtype.Counts)
|
|
want := make(map[netlogtype.Connection]netlogtype.Counts)
|
|
mergeMaps(got, gots...)
|
|
mergeMaps(want, wants...)
|
|
c.Assert(got, qt.DeepEquals, want)
|
|
}
|
|
|
|
func mergeMaps(dst map[netlogtype.Connection]netlogtype.Counts, srcs ...map[netlogtype.Connection]netlogtype.Counts) {
|
|
for _, src := range srcs {
|
|
for conn, cnts := range src {
|
|
dst[conn] = dst[conn].Add(cnts)
|
|
}
|
|
}
|
|
}
|
|
|
|
func Benchmark(b *testing.B) {
|
|
// TODO: Test IPv6 packets?
|
|
b.Run("SingleRoutine/SameConn", func(b *testing.B) {
|
|
p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789)
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
for i := 0; i < b.N; i++ {
|
|
var s Statistics
|
|
for j := 0; j < 1e3; j++ {
|
|
s.UpdateTxVirtual(p)
|
|
}
|
|
}
|
|
})
|
|
b.Run("SingleRoutine/UniqueConns", func(b *testing.B) {
|
|
p := testPacketV4(ipproto.UDP, [4]byte{}, [4]byte{}, 0, 0, 789)
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
for i := 0; i < b.N; i++ {
|
|
var s Statistics
|
|
for j := 0; j < 1e3; j++ {
|
|
binary.BigEndian.PutUint32(p[20:], uint32(j)) // unique port combination
|
|
s.UpdateTxVirtual(p)
|
|
}
|
|
}
|
|
})
|
|
b.Run("MultiRoutine/SameConn", func(b *testing.B) {
|
|
p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789)
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
for i := 0; i < b.N; i++ {
|
|
var s Statistics
|
|
var group sync.WaitGroup
|
|
for j := 0; j < runtime.NumCPU(); j++ {
|
|
group.Add(1)
|
|
go func() {
|
|
defer group.Done()
|
|
for k := 0; k < 1e3; k++ {
|
|
s.UpdateTxVirtual(p)
|
|
}
|
|
}()
|
|
}
|
|
group.Wait()
|
|
}
|
|
})
|
|
b.Run("MultiRoutine/UniqueConns", func(b *testing.B) {
|
|
ps := make([][]byte, runtime.NumCPU())
|
|
for i := range ps {
|
|
ps[i] = testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 0, 0, 789)
|
|
}
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
for i := 0; i < b.N; i++ {
|
|
var s Statistics
|
|
var group sync.WaitGroup
|
|
for j := 0; j < runtime.NumCPU(); j++ {
|
|
group.Add(1)
|
|
go func(j int) {
|
|
defer group.Done()
|
|
p := ps[j]
|
|
j *= 1e3
|
|
for k := 0; k < 1e3; k++ {
|
|
binary.BigEndian.PutUint32(p[20:], uint32(j+k)) // unique port combination
|
|
s.UpdateTxVirtual(p)
|
|
}
|
|
}(j)
|
|
}
|
|
group.Wait()
|
|
}
|
|
})
|
|
}
|