tailscale/syncs/shardedint.go
James Tucker e8f1721147 syncs: add ShardedInt expvar.Var type
ShardedInt provides an int type expvar.Var that supports more efficient
writes at high frequencies (one order of magnigude on an M1 Max, much
more on NUMA systems).

There are two implementations of ShardValue, one that abuses sync.Pool
that will work on current public Go versions, and one that takes a
dependency on a runtime.TailscaleP function exposed in Tailscale's Go
fork. The sync.Pool variant has about 10x the throughput of a single
atomic integer on an M1 Max, and the runtime.TailscaleP variant is about
10x faster than the sync.Pool variant.

Neither variant have perfect distribution, or perfectly always avoid
cross-CPU sharing, as there is no locking or affinity to ensure that the
time of yield is on the same core as the time of core biasing, but in
the average case the distributions are enough to provide substantially
better performance.

See golang/go#18802 for a related upstream proposal.

Updates tailscale/go#109
Updates tailscale/corp#25450

Signed-off-by: James Tucker <james@tailscale.com>
2024-12-19 14:58:28 -08:00

70 lines
1.5 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package syncs
import (
"encoding/json"
"strconv"
"sync/atomic"
"golang.org/x/sys/cpu"
)
// ShardedInt provides a sharded atomic int64 value that optimizes high
// frequency (Mhz range and above) writes in highly parallel workloads.
// The zero value is not safe for use; use [NewShardedInt].
// ShardedInt implements the expvar.Var interface.
type ShardedInt struct {
sv *ShardValue[intShard]
}
// NewShardedInt returns a new [ShardedInt].
func NewShardedInt() *ShardedInt {
return &ShardedInt{
sv: NewShardValue[intShard](),
}
}
// Add adds delta to the value.
func (m *ShardedInt) Add(delta int64) {
m.sv.One(func(v *intShard) {
v.Add(delta)
})
}
type intShard struct {
atomic.Int64
_ cpu.CacheLinePad // avoid false sharing of neighboring shards
}
// Value returns the current value.
func (m *ShardedInt) Value() int64 {
var v int64
for s := range m.sv.All {
v += s.Load()
}
return v
}
// GetDistribution returns the current value in each shard.
// This is intended for observability/debugging only.
func (m *ShardedInt) GetDistribution() []int64 {
v := make([]int64, 0, m.sv.Len())
for s := range m.sv.All {
v = append(v, s.Load())
}
return v
}
// String implements the expvar.Var interface
func (m *ShardedInt) String() string {
v, _ := json.Marshal(m.Value())
return string(v)
}
// AppendText implements the encoding.TextAppender interface
func (m *ShardedInt) AppendText(b []byte) ([]byte, error) {
return strconv.AppendInt(b, m.Value(), 10), nil
}