diff --git a/net/tunstats/stats.go b/net/tunstats/stats.go new file mode 100644 index 000000000..9d1a8ae55 --- /dev/null +++ b/net/tunstats/stats.go @@ -0,0 +1,367 @@ +// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package tunstats maintains statistics about connections +// flowing through a TUN device (which operate at the IP layer). +package tunstats + +import ( + "encoding/binary" + "hash/maphash" + "math/bits" + "net/netip" + "sync" + "sync/atomic" + + "tailscale.com/net/flowtrack" + "tailscale.com/types/ipproto" +) + +// Statistics maintains counters for every connection. +// All methods are safe for concurrent use. +// The zero value is ready for use. +type Statistics struct { + v4 hashTable[addrsPortsV4] + v6 hashTable[addrsPortsV6] +} + +// Counts are statistics about a particular connection. +type Counts struct { + TxPackets uint64 `json:"txPkts,omitempty"` + TxBytes uint64 `json:"txBytes,omitempty"` + RxPackets uint64 `json:"rxPkts,omitempty"` + RxBytes uint64 `json:"rxBytes,omitempty"` +} + +const ( + minTableLen = 8 + maxProbeLen = 64 +) + +// hashTable is a hash table that uses open addressing with probing. +// See https://en.wikipedia.org/wiki/Hash_table#Open_addressing. +// The primary table is in the active field and can be retrieved atomically. +// In the common case, this data structure is mostly lock free. +// +// If the current table is too small, a new table is allocated that +// replaces the current active table. The contents of the older table are +// NOT copied to the new table, but rather the older table is appended +// to a list of outgrown tables. Re-growth happens under a lock, +// but is expected to happen rarely as the table size grows exponentially. +// +// To reduce memory usage, the counters uses 32-bit unsigned integers, +// which carry the risk of overflowing. If an overflow is detected, +// we add the amount overflowed to the overflow map. This is a naive Go map +// protected by a sync.Mutex. Overflow is rare that contention is not a concern. +// +// To extract all counters, we replace the active table with a zeroed table, +// and clear out the outgrown and overflow tables. +// We take advantage of the fact that all the tables can be merged together +// by simply adding up all the counters for each connection. +type hashTable[AddrsPorts addrsPorts] struct { + // TODO: Get rid of this. It is just an atomic update in the common case, + // but contention updating the same word still incurs a 25% performance hit. + mu sync.RWMutex // RLock held while updating, Lock held while extracting + + active atomic.Pointer[countsTable[AddrsPorts]] + inserts atomic.Uint32 // heuristic for next active table to allocate + + muGrow sync.Mutex // muGrow.Lock implies that mu.RLock held + outgrown []countsTable[AddrsPorts] + + muOverflow sync.Mutex // muOverflow.Lock implies that mu.RLock held + overflow map[flowtrack.Tuple]Counts +} + +type countsTable[AddrsPorts addrsPorts] []counts[AddrsPorts] + +func (t *countsTable[AddrsPorts]) len() int { + if t == nil { + return 0 + } + return len(*t) +} + +type counts[AddrsPorts addrsPorts] struct { + // initProto is both an initialization flag and the IP protocol. + // It is 0 if uninitialized, 1 if initializing, and + // 2+ipproto.Proto if initialized. + initProto atomic.Uint32 + + addrsPorts AddrsPorts // only valid if initProto is initialized + + txPackets atomic.Uint32 + txBytes atomic.Uint32 + rxPackets atomic.Uint32 + rxBytes atomic.Uint32 +} + +// NOTE: There is some degree of duplicated code. +// For example, the functionality to swap the addrsPorts and compute the hash +// should be performed by hashTable.update rather than Statistics.update. +// However, Go generics cannot invoke pointer methods on addressable values. +// See https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#no-way-to-require-pointer-methods + +type addrsPorts interface { + comparable + asTuple(ipproto.Proto) flowtrack.Tuple +} + +type addrsPortsV4 [4 + 4 + 2 + 2]byte + +func (x *addrsPortsV4) addrs() *[8]byte { return (*[8]byte)(x[:]) } +func (x *addrsPortsV4) ports() *[4]byte { return (*[4]byte)(x[8:]) } +func (x *addrsPortsV4) swap() { + *(*[4]byte)(x[0:]), *(*[4]byte)(x[4:]) = *(*[4]byte)(x[4:]), *(*[4]byte)(x[0:]) + *(*[2]byte)(x[8:]), *(*[2]byte)(x[10:]) = *(*[2]byte)(x[10:]), *(*[2]byte)(x[8:]) +} +func (x addrsPortsV4) asTuple(proto ipproto.Proto) flowtrack.Tuple { + return flowtrack.Tuple{Proto: proto, + Src: netip.AddrPortFrom(netip.AddrFrom4(*(*[4]byte)(x[0:])), binary.BigEndian.Uint16(x[8:])), + Dst: netip.AddrPortFrom(netip.AddrFrom4(*(*[4]byte)(x[4:])), binary.BigEndian.Uint16(x[10:])), + } +} + +type addrsPortsV6 [16 + 16 + 2 + 2]byte + +func (x *addrsPortsV6) addrs() *[32]byte { return (*[32]byte)(x[:]) } +func (x *addrsPortsV6) ports() *[4]byte { return (*[4]byte)(x[32:]) } +func (x *addrsPortsV6) swap() { + *(*[16]byte)(x[0:]), *(*[16]byte)(x[16:]) = *(*[16]byte)(x[16:]), *(*[16]byte)(x[0:]) + *(*[2]byte)(x[32:]), *(*[2]byte)(x[34:]) = *(*[2]byte)(x[34:]), *(*[2]byte)(x[32:]) +} +func (x addrsPortsV6) asTuple(proto ipproto.Proto) flowtrack.Tuple { + return flowtrack.Tuple{Proto: proto, + Src: netip.AddrPortFrom(netip.AddrFrom16(*(*[16]byte)(x[0:])), binary.BigEndian.Uint16(x[32:])), + Dst: netip.AddrPortFrom(netip.AddrFrom16(*(*[16]byte)(x[16:])), binary.BigEndian.Uint16(x[34:])), + } +} + +// UpdateTx updates the statistics for a transmitted IP packet. +func (s *Statistics) UpdateTx(b []byte) { + s.update(b, false) +} + +// UpdateRx updates the statistics for a received IP packet. +func (s *Statistics) UpdateRx(b []byte) { + s.update(b, true) +} + +var seed = maphash.MakeSeed() + +func (s *Statistics) update(b []byte, receive bool) { + switch { + case len(b) >= 20 && b[0]>>4 == 4: // IPv4 + proto := ipproto.Proto(b[9]) + hasPorts := proto == ipproto.TCP || proto == ipproto.UDP + var addrsPorts addrsPortsV4 + if hdrLen := int(4 * (b[0] & 0xf)); hdrLen == 20 && len(b) >= 24 && hasPorts { + addrsPorts = *(*addrsPortsV4)(b[12:]) // addresses and ports are contiguous + } else { + *addrsPorts.addrs() = *(*[8]byte)(b[12:]) + // May have IPv4 options in-between address and ports. + if len(b) >= hdrLen+4 && hasPorts { + *addrsPorts.ports() = *(*[4]byte)(b[hdrLen:]) + } + } + if receive { + addrsPorts.swap() + } + hash := maphash.Bytes(seed, addrsPorts[:]) ^ uint64(proto) // TODO: Hash proto better? + s.v4.update(receive, proto, &addrsPorts, hash, uint32(len(b))) + return + case len(b) >= 40 && b[0]>>4 == 6: // IPv6 + proto := ipproto.Proto(b[6]) + hasPorts := proto == ipproto.TCP || proto == ipproto.UDP + var addrsPorts addrsPortsV6 + if len(b) >= 44 && hasPorts { + addrsPorts = *(*addrsPortsV6)(b[8:]) // addresses and ports are contiguous + } else { + *addrsPorts.addrs() = *(*[32]byte)(b[8:]) + // TODO: Support IPv6 extension headers? + if hdrLen := 40; len(b) > hdrLen+4 && hasPorts { + *addrsPorts.ports() = *(*[4]byte)(b[hdrLen:]) + } + } + if receive { + addrsPorts.swap() + } + hash := maphash.Bytes(seed, addrsPorts[:]) ^ uint64(proto) // TODO: Hash proto better? + s.v6.update(receive, proto, &addrsPorts, hash, uint32(len(b))) + return + } + // TODO: Track malformed packets? +} + +func (h *hashTable[AddrsPorts]) update(receive bool, proto ipproto.Proto, addrsPorts *AddrsPorts, hash uint64, size uint32) { + h.mu.RLock() + defer h.mu.RUnlock() + + table := h.active.Load() + for { + // Start with an initialized table. + if table.len() == 0 { + table = h.grow(table) + } + + // Try to update an entry in the currently active table. + for i := 0; i < len(*table) && i < maxProbeLen; i++ { + probe := uint64(i) // linear probing for small tables + if len(*table) > 2*maxProbeLen { + probe *= probe // quadratic probing for large tables + } + entry := &(*table)[(hash+probe)%uint64(len(*table))] + + // Spin-lock waiting for the entry to be initialized, + // which should be quick as it only stores the AddrsPort. + retry: + switch initProto := entry.initProto.Load(); initProto { + case 0: // uninitialized + if !entry.initProto.CompareAndSwap(0, 1) { + goto retry // raced with another initialization attempt + } + entry.addrsPorts = *addrsPorts + entry.initProto.Store(uint32(proto) + 2) // initialization done + h.inserts.Add(1) + case 1: // initializing + goto retry + default: // initialized + if ipproto.Proto(initProto-2) != proto || entry.addrsPorts != *addrsPorts { + continue // this entry is for a different connection; try next entry + } + } + + // Atomically update the counters for the connection entry. + var overflowPackets, overflowBytes bool + if receive { + overflowPackets = entry.rxPackets.Add(1) < 1 + overflowBytes = entry.rxBytes.Add(size) < size + } else { + overflowPackets = entry.txPackets.Add(1) < 1 + overflowBytes = entry.txBytes.Add(size) < size + } + if overflowPackets || overflowBytes { + h.updateOverflow(receive, proto, addrsPorts, overflowPackets, overflowBytes) + } + return + } + + // Unable to update, so grow the table and try again. + // TODO: Use overflow map instead if table utilization is too low. + table = h.grow(table) + } +} + +// grow grows the table unless the active table is larger than oldTable. +func (h *hashTable[AddrsPorts]) grow(oldTable *countsTable[AddrsPorts]) (newTable *countsTable[AddrsPorts]) { + h.muGrow.Lock() + defer h.muGrow.Unlock() + + if newTable = h.active.Load(); newTable.len() > oldTable.len() { + return newTable // raced with another grow + } + newTable = new(countsTable[AddrsPorts]) + if oldTable.len() == 0 { + *newTable = make(countsTable[AddrsPorts], minTableLen) + } else { + *newTable = make(countsTable[AddrsPorts], 2*len(*oldTable)) + h.outgrown = append(h.outgrown, *oldTable) + } + h.active.Store(newTable) + return newTable +} + +// updateOverflow updates the overflow map for counters that overflowed. +// Using 32-bit counters, this condition happens rarely as it only triggers +// after every 4 GiB of unidirectional network traffic on the same connection. +func (h *hashTable[AddrsPorts]) updateOverflow(receive bool, proto ipproto.Proto, addrsPorts *AddrsPorts, overflowPackets, overflowBytes bool) { + h.muOverflow.Lock() + defer h.muOverflow.Unlock() + if h.overflow == nil { + h.overflow = make(map[flowtrack.Tuple]Counts) + } + tuple := (*addrsPorts).asTuple(proto) + cnts := h.overflow[tuple] + if overflowPackets { + if receive { + cnts.RxPackets += 1 << 32 + } else { + cnts.TxPackets += 1 << 32 + } + } + if overflowBytes { + if receive { + cnts.RxBytes += 1 << 32 + } else { + cnts.TxBytes += 1 << 32 + } + } + h.overflow[tuple] = cnts +} + +func (h *hashTable[AddrsPorts]) extractInto(out map[flowtrack.Tuple]Counts) { + // Allocate a new table based on previous usage. + var newTable *countsTable[AddrsPorts] + if numInserts := h.inserts.Load(); numInserts > 0 { + newLen := 1 << bits.Len(uint(4*numInserts/3)|uint(minTableLen-1)) + newTable = new(countsTable[AddrsPorts]) + *newTable = make(countsTable[AddrsPorts], newLen) + } + + // Swap out the old tables for new tables. + // We do not need to lock h.muGrow or h.muOverflow since holding h.mu + // implies that nothing else could be holding those locks. + h.mu.Lock() + oldTable := h.active.Swap(newTable) + oldOutgrown := h.outgrown + oldOverflow := h.overflow + h.outgrown = nil + h.overflow = nil + h.inserts.Store(0) + h.mu.Unlock() + + // Merge tables into output. + if oldTable != nil { + mergeTable(out, *oldTable) + } + for _, table := range oldOutgrown { + mergeTable(out, table) + } + mergeMap(out, oldOverflow) +} + +// Extract extracts and resets the counters for all active connections. +// It must be called periodically otherwise the memory used is unbounded. +func (s *Statistics) Extract() map[flowtrack.Tuple]Counts { + out := make(map[flowtrack.Tuple]Counts) + s.v4.extractInto(out) + s.v6.extractInto(out) + return out +} + +func mergeTable[AddrsPorts addrsPorts](dst map[flowtrack.Tuple]Counts, src countsTable[AddrsPorts]) { + for i := range src { + entry := &src[i] + if initProto := entry.initProto.Load(); initProto > 0 { + tuple := entry.addrsPorts.asTuple(ipproto.Proto(initProto - 2)) + cnts := dst[tuple] + cnts.TxPackets += uint64(entry.txPackets.Load()) + cnts.TxBytes += uint64(entry.txBytes.Load()) + cnts.RxPackets += uint64(entry.rxPackets.Load()) + cnts.RxBytes += uint64(entry.rxBytes.Load()) + dst[tuple] = cnts + } + } +} + +func mergeMap(dst, src map[flowtrack.Tuple]Counts) { + for tuple, cntsSrc := range src { + cntsDst := dst[tuple] + cntsDst.TxPackets += cntsSrc.TxPackets + cntsDst.TxBytes += cntsSrc.TxBytes + cntsDst.RxPackets += cntsSrc.RxPackets + cntsDst.RxBytes += cntsSrc.RxBytes + dst[tuple] = cntsDst + } +} diff --git a/net/tunstats/stats_test.go b/net/tunstats/stats_test.go new file mode 100644 index 000000000..6992d99e6 --- /dev/null +++ b/net/tunstats/stats_test.go @@ -0,0 +1,325 @@ +// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tunstats + +import ( + "encoding/binary" + "fmt" + "hash/maphash" + "math" + "runtime" + "sync" + "testing" + + qt "github.com/frankban/quicktest" + "tailscale.com/net/flowtrack" + "tailscale.com/types/ipproto" +) + +type SimpleStatistics struct { + mu sync.Mutex + m map[flowtrack.Tuple]Counts +} + +func (s *SimpleStatistics) UpdateTx(b []byte) { + s.update(b, false) +} +func (s *SimpleStatistics) UpdateRx(b []byte) { + s.update(b, true) +} +func (s *SimpleStatistics) update(b []byte, receive bool) { + var tuple flowtrack.Tuple + var size uint64 + if len(b) >= 1 { + // This logic is mostly copied from Statistics.update. + switch v := b[0] >> 4; { + case v == 4 && len(b) >= 20: // IPv4 + proto := ipproto.Proto(b[9]) + size = uint64(binary.BigEndian.Uint16(b[2:])) + var addrsPorts addrsPortsV4 + *(*[8]byte)(addrsPorts[0:]) = *(*[8]byte)(b[12:]) + if hdrLen := int(4 * (b[0] & 0xf)); len(b) >= hdrLen+4 && (proto == ipproto.TCP || proto == ipproto.UDP) { + *(*[4]byte)(addrsPorts[8:]) = *(*[4]byte)(b[hdrLen:]) + } + if receive { + addrsPorts.swap() + } + tuple = addrsPorts.asTuple(proto) + case v == 6 && len(b) >= 40: // IPv6 + proto := ipproto.Proto(b[6]) + size = uint64(binary.BigEndian.Uint16(b[4:])) + var addrsPorts addrsPortsV6 + *(*[32]byte)(addrsPorts[0:]) = *(*[32]byte)(b[8:]) + if hdrLen := 40; len(b) > hdrLen+4 && (proto == ipproto.TCP || proto == ipproto.UDP) { + *(*[4]byte)(addrsPorts[32:]) = *(*[4]byte)(b[hdrLen:]) + } + if receive { + addrsPorts.swap() + } + tuple = addrsPorts.asTuple(proto) + default: + return // non-IP packet + } + } else { + return // invalid packet + } + + s.mu.Lock() + defer s.mu.Unlock() + if s.m == nil { + s.m = make(map[flowtrack.Tuple]Counts) + } + cnts := s.m[tuple] + if receive { + cnts.RxPackets++ + cnts.RxBytes += size + } else { + cnts.TxPackets++ + cnts.TxBytes += size + } + s.m[tuple] = cnts +} + +func TestEmpty(t *testing.T) { + c := qt.New(t) + var s Statistics + c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{}) + c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{}) +} + +func TestOverflow(t *testing.T) { + c := qt.New(t) + var s Statistics + var cnts Counts + + a := &addrsPortsV4{192, 168, 0, 1, 192, 168, 0, 2, 12, 34, 56, 78} + h := maphash.Bytes(seed, a[:]) + + cnts.TxPackets++ + cnts.TxBytes += math.MaxUint32 + s.v4.update(false, ipproto.UDP, a, h, math.MaxUint32) + for i := 0; i < 1e6; i++ { + cnts.TxPackets++ + cnts.TxBytes += uint64(i) + s.v4.update(false, ipproto.UDP, a, h, uint32(i)) + } + c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{a.asTuple(ipproto.UDP): cnts}) + c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{}) +} + +func FuzzParse(f *testing.F) { + f.Fuzz(func(t *testing.T, b []byte) { + var s Statistics + s.UpdateRx(b) // must not panic + s.UpdateTx(b) // must not panic + s.Extract() // must not panic + }) +} + +var testV4 = func() (b [24]byte) { + b[0] = 4<<4 | 5 // version and header length + binary.BigEndian.PutUint16(b[2:], 1234) // size + b[9] = byte(ipproto.UDP) // protocol + *(*[4]byte)(b[12:]) = [4]byte{192, 168, 0, 1} // src addr + *(*[4]byte)(b[16:]) = [4]byte{192, 168, 0, 2} // dst addr + binary.BigEndian.PutUint16(b[20:], 456) // src port + binary.BigEndian.PutUint16(b[22:], 789) // dst port + return b +}() + +/* +func BenchmarkA(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s Statistics + for j := 0; j < 1e3; j++ { + s.UpdateTx(testV4[:]) + } + } +} + +func BenchmarkB(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s SimpleStatistics + for j := 0; j < 1e3; j++ { + s.UpdateTx(testV4[:]) + } + } +} + +func BenchmarkC(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s Statistics + var group sync.WaitGroup + for k := 0; k < runtime.NumCPU(); k++ { + group.Add(1) + go func(k int) { + defer group.Done() + b := testV4 + for j := 0; j < 1e3; j++ { + binary.LittleEndian.PutUint32(b[12:], uint32(k)) + binary.LittleEndian.PutUint32(b[16:], uint32(j)) + s.UpdateTx(b[:]) + } + }(k) + } + group.Wait() + } +} + +func BenchmarkD(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s SimpleStatistics + var group sync.WaitGroup + for k := 0; k < runtime.NumCPU(); k++ { + group.Add(1) + go func(k int) { + defer group.Done() + b := testV4 + for j := 0; j < 1e3; j++ { + binary.LittleEndian.PutUint32(b[12:], uint32(k)) + binary.LittleEndian.PutUint32(b[16:], uint32(j)) + s.UpdateTx(b[:]) + } + }(k) + } + group.Wait() + } +} +*/ + +// FUZZ +// Benchmark: +// IPv4 vs IPv6 +// single vs all cores +// same vs unique addresses + +/* +linear probing + + 1 => 115595714 ns/op 859003746 B/op + 2 => 9355585 ns/op 46454947 B/op + 4 => 3301663 ns/op 8706967 B/op + 8 => 2775162 ns/op 4176433 B/op + 16 => 2517899 ns/op 2099434 B/op + 32 => 2397939 ns/op 2098986 B/op + 64 => 2118390 ns/op 1197352 B/op + 128 => 2029255 ns/op 1046729 B/op + 256 => 2069939 ns/op 1042577 B/op + +quadratic probing + + 1 => 111134367 ns/op 825962200 B/op + 2 => 8061189 ns/op 45106117 B/op + 4 => 3216728 ns/op 8079556 B/op + 8 => 2576443 ns/op 2355890 B/op + 16 => 2471713 ns/op 2097196 B/op + 32 => 2108294 ns/op 1050225 B/op + 64 => 1964441 ns/op 1048736 B/op + 128 => 2118538 ns/op 1046663 B/op + 256 => 1968353 ns/op 1042568 B/op + 512 => 2049336 ns/op 1034306 B/op + 1024 => 2001605 ns/op 1017786 B/op + 2048 => 2046972 ns/op 984988 B/op + 4096 => 2108753 ns/op 919105 B/op +*/ + +func testPacketV4(proto ipproto.Proto, srcAddr, dstAddr [4]byte, srcPort, dstPort, size uint16) (out []byte) { + var ipHdr [20]byte + ipHdr[0] = 4<<4 | 5 + binary.BigEndian.PutUint16(ipHdr[2:], size) + ipHdr[9] = byte(proto) + *(*[4]byte)(ipHdr[12:]) = srcAddr + *(*[4]byte)(ipHdr[16:]) = dstAddr + out = append(out, ipHdr[:]...) + switch proto { + case ipproto.TCP: + var tcpHdr [20]byte + binary.BigEndian.PutUint16(tcpHdr[0:], srcPort) + binary.BigEndian.PutUint16(tcpHdr[2:], dstPort) + out = append(out, tcpHdr[:]...) + case ipproto.UDP: + var udpHdr [8]byte + binary.BigEndian.PutUint16(udpHdr[0:], srcPort) + binary.BigEndian.PutUint16(udpHdr[2:], dstPort) + out = append(out, udpHdr[:]...) + default: + panic(fmt.Sprintf("unknown proto: %d", proto)) + } + return append(out, make([]byte, int(size)-len(out))...) +} + +func Benchmark(b *testing.B) { + b.Run("SingleRoutine/SameConn", func(b *testing.B) { + p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s Statistics + for j := 0; j < 1e3; j++ { + s.UpdateTx(p) + } + } + }) + b.Run("SingleRoutine/UniqueConns", func(b *testing.B) { + p := testPacketV4(ipproto.UDP, [4]byte{}, [4]byte{}, 0, 0, 789) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s Statistics + for j := 0; j < 1e3; j++ { + binary.BigEndian.PutUint32(p[20:], uint32(j)) // unique port combination + s.UpdateTx(p) + } + } + }) + b.Run("MultiRoutine/SameConn", func(b *testing.B) { + p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s Statistics + var group sync.WaitGroup + for j := 0; j < runtime.NumCPU(); j++ { + group.Add(1) + go func() { + defer group.Done() + for k := 0; k < 1e3; k++ { + s.UpdateTx(p) + } + }() + } + group.Wait() + } + }) + b.Run("MultiRoutine/UniqueConns", func(b *testing.B) { + ps := make([][]byte, runtime.NumCPU()) + for i := range ps { + ps[i] = testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 0, 0, 789) + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var s Statistics + var group sync.WaitGroup + for j := 0; j < runtime.NumCPU(); j++ { + group.Add(1) + go func(j int) { + defer group.Done() + p := ps[j] + j *= 1e3 + for k := 0; k < 1e3; k++ { + binary.BigEndian.PutUint32(p[20:], uint32(j+k)) // unique port combination + s.UpdateTx(p) + } + }(j) + } + group.Wait() + } + }) +}