mirror of
https://github.com/tailscale/tailscale.git
synced 2025-01-07 08:07:42 +00:00
derp: start adding flow tracking stats
This starts adding flow tracking stats, without exposing them anywhere yet. Flow structs are created as needed and metrics are bumped, and benchmarks show no change in performance. Updates #3560 Change-Id: I376187a8452ec92d49effcbf48a6fb4f4d787b8a Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
6e552f66a0
commit
ef68b4c004
@ -48,6 +48,7 @@
|
||||
"tailscale.com/types/key"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/util/ctxkey"
|
||||
"tailscale.com/util/lru"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/util/set"
|
||||
"tailscale.com/util/slicesx"
|
||||
@ -178,11 +179,14 @@ type Server struct {
|
||||
verifyClientsURL string
|
||||
verifyClientsURLFailOpen bool
|
||||
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
||||
clients map[key.NodePublic]*clientSet
|
||||
watchers set.Set[*sclient] // mesh peers
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
flow map[flowKey]*flow
|
||||
flows []*flow // slice of values of flow map
|
||||
flowCleanIndex int
|
||||
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
||||
clients map[key.NodePublic]*clientSet
|
||||
watchers set.Set[*sclient] // mesh peers
|
||||
// clientsMesh tracks all clients in the cluster, both locally
|
||||
// and to mesh peers. If the value is nil, that means the
|
||||
// peer is only local (and thus in the clients Map, but not
|
||||
@ -368,6 +372,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
|
||||
packetsDroppedType: metrics.LabelMap{Label: "type"},
|
||||
clients: map[key.NodePublic]*clientSet{},
|
||||
clientsMesh: map[key.NodePublic]PacketForwarder{},
|
||||
flow: map[flowKey]*flow{},
|
||||
netConns: map[Conn]chan struct{}{},
|
||||
memSys0: ms.Sys,
|
||||
watchers: set.Set[*sclient]{},
|
||||
@ -901,9 +906,20 @@ func (s *Server) debugLogf(format string, v ...any) {
|
||||
}
|
||||
}
|
||||
|
||||
// onRunLoopDone is called when the run loop is done
|
||||
// to clean up.
|
||||
//
|
||||
// It must only be called from the [slient.run] goroutine.
|
||||
func (c *sclient) onRunLoopDone() {
|
||||
c.flows.ForEach(func(k key.NodePublic, peer flowAndClientSet) {
|
||||
peer.f.ref.Add(-1)
|
||||
})
|
||||
}
|
||||
|
||||
// run serves the client until there's an error.
|
||||
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
|
||||
func (c *sclient) run(ctx context.Context) error {
|
||||
defer c.onRunLoopDone()
|
||||
// Launch sender, but don't return from run until sender goroutine is done.
|
||||
var grp errgroup.Group
|
||||
sendCtx, cancelSender := context.WithCancel(ctx)
|
||||
@ -1066,6 +1082,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
||||
var dst *sclient
|
||||
|
||||
s.mu.Lock()
|
||||
flo := s.getMakeFlowLocked(srcKey, dstKey)
|
||||
if set, ok := s.clients[dstKey]; ok {
|
||||
dstLen = set.Len()
|
||||
dst = set.activeClient.Load()
|
||||
@ -1088,7 +1105,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
||||
return c.sendPkt(dst, pkt{
|
||||
bs: contents,
|
||||
enqueuedAt: c.s.clock.Now(),
|
||||
src: srcKey,
|
||||
flow: flo,
|
||||
})
|
||||
}
|
||||
|
||||
@ -1101,22 +1118,13 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
|
||||
}
|
||||
|
||||
var fwd PacketForwarder
|
||||
var dstLen int
|
||||
var dst *sclient
|
||||
|
||||
s.mu.Lock()
|
||||
if set, ok := s.clients[dstKey]; ok {
|
||||
dstLen = set.Len()
|
||||
dst = set.activeClient.Load()
|
||||
}
|
||||
if dst == nil && dstLen < 1 {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
}
|
||||
s.mu.Unlock()
|
||||
flo, dst, fwd := c.lookupDest(dstKey)
|
||||
flo.noteActivity()
|
||||
|
||||
if dst == nil {
|
||||
if fwd != nil {
|
||||
flo.pktSendRegion.Add(1)
|
||||
flo.byteSendRegion.Add(1)
|
||||
s.packetsForwardedOut.Add(1)
|
||||
err := fwd.ForwardPacket(c.key, dstKey, contents)
|
||||
c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err)
|
||||
@ -1126,22 +1134,22 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
flo.dropUnknownDest.Add(1)
|
||||
reason := dropReasonUnknownDest
|
||||
if dstLen > 1 {
|
||||
reason = dropReasonDupClient
|
||||
} else {
|
||||
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
||||
}
|
||||
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
||||
s.recordDrop(contents, c.key, dstKey, reason)
|
||||
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
|
||||
return nil
|
||||
}
|
||||
c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString())
|
||||
|
||||
flo.pktSendLocal.Add(1)
|
||||
flo.byteSendLocal.Add(1)
|
||||
|
||||
p := pkt{
|
||||
bs: contents,
|
||||
enqueuedAt: c.s.clock.Now(),
|
||||
src: c.key,
|
||||
flow: flo,
|
||||
}
|
||||
return c.sendPkt(dst, p)
|
||||
}
|
||||
@ -1189,6 +1197,7 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
|
||||
}
|
||||
|
||||
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
|
||||
// TODO(bradfitz): bump metrics on p.flow
|
||||
s := c.s
|
||||
dstKey := dst.key
|
||||
|
||||
@ -1550,6 +1559,7 @@ type sclient struct {
|
||||
br *bufio.Reader
|
||||
connectedAt time.Time
|
||||
preferred bool
|
||||
flows lru.Cache[key.NodePublic, flowAndClientSet] // keyed by dest
|
||||
|
||||
// Owned by sendLoop, not thread-safe.
|
||||
sawSrc map[key.NodePublic]set.Handle
|
||||
@ -1605,8 +1615,12 @@ type pkt struct {
|
||||
// The memory is owned by pkt.
|
||||
bs []byte
|
||||
|
||||
// src is the who's the sender of the packet.
|
||||
src key.NodePublic
|
||||
// flow is the flow stats from the src to the dest.
|
||||
flow *flow
|
||||
}
|
||||
|
||||
func (p pkt) src() key.NodePublic {
|
||||
return p.flow.flowKey.Value().src
|
||||
}
|
||||
|
||||
// peerGoneMsg is a request to write a peerGone frame to an sclient
|
||||
@ -1677,14 +1691,13 @@ func (c *sclient) onSendLoopDone() {
|
||||
for {
|
||||
select {
|
||||
case pkt := <-c.sendQueue:
|
||||
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
|
||||
case pkt := <-c.discoSendQueue:
|
||||
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
||||
c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *sclient) sendLoop(ctx context.Context) error {
|
||||
@ -1713,11 +1726,11 @@ func (c *sclient) sendLoop(ctx context.Context) error {
|
||||
werr = c.sendMeshUpdates()
|
||||
continue
|
||||
case msg := <-c.sendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
continue
|
||||
case msg := <-c.discoSendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
continue
|
||||
case msg := <-c.sendPongCh:
|
||||
@ -1747,10 +1760,10 @@ func (c *sclient) sendLoop(ctx context.Context) error {
|
||||
case <-c.meshUpdate:
|
||||
werr = c.sendMeshUpdates()
|
||||
case msg := <-c.sendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
case msg := <-c.discoSendQueue:
|
||||
werr = c.sendPacket(msg.src, msg.bs)
|
||||
werr = c.sendPacket(msg.src(), msg.bs)
|
||||
c.recordQueueTime(msg.enqueuedAt)
|
||||
case msg := <-c.sendPongCh:
|
||||
werr = c.sendPong(msg)
|
||||
|
182
derp/flow.go
Normal file
182
derp/flow.go
Normal file
@ -0,0 +1,182 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package derp
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unique"
|
||||
|
||||
"tailscale.com/types/key"
|
||||
)
|
||||
|
||||
type flowKey struct {
|
||||
src, dst key.NodePublic
|
||||
}
|
||||
|
||||
// flow tracks metadata about a directional flow of packets from a source
|
||||
// node to a destination node. The public keys of the src is known
|
||||
// by the caller.
|
||||
type flow struct {
|
||||
createdUnixNano int64 // int64 instead of time.Time to keep flow smaller
|
||||
index int // index in Server.flows slice or -1 if not; guarded by Server.mu
|
||||
flowKey unique.Handle[flowKey] // TODO: make this a unique handle of two unique handles for each NodePublic?
|
||||
|
||||
roughActivityUnixTime atomic.Int64 // unix sec of recent activity, updated at most once a minute
|
||||
pktSendRegion atomic.Int64
|
||||
byteSendRegion atomic.Int64
|
||||
pktSendLocal atomic.Int64
|
||||
byteSendLocal atomic.Int64
|
||||
dropUnknownDest atomic.Int64 // no local or region client for dest
|
||||
dropGone atomic.Int64
|
||||
|
||||
// ref is the reference count of things (*Server, *sclient) holding on
|
||||
// to this flow. As of 2024-09-18 it is currently only informational
|
||||
// and not used for anything. The Server adds/removes a ref count when
|
||||
// it's remove from its map and each 0, 1 or more sclients for a given
|
||||
// recently active flow also add/remove a ref count.
|
||||
//
|
||||
// This might be used in the future as an alternate Server.flow eviction
|
||||
// strategy but for now it's just a debug tool. We do want to keep flow
|
||||
// stats surviving a brief client disconnections, so we do want Server
|
||||
// to keep at least a momentary ref count alive.
|
||||
ref atomic.Int64
|
||||
}
|
||||
|
||||
// noteActivity updates f.recentActivityUnixTime if it's been
|
||||
// more than a minute.
|
||||
func (f *flow) noteActivity() {
|
||||
now := time.Now().Unix()
|
||||
if now-f.roughActivityUnixTime.Load() > 60 {
|
||||
f.roughActivityUnixTime.Store(now)
|
||||
}
|
||||
}
|
||||
|
||||
// getMakeFlow either gets or makes a new flow for the given source and
|
||||
// destination nodes.
|
||||
func (s *Server) getMakeFlow(src, dst key.NodePublic) *flow {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.getMakeFlowLocked(src, dst)
|
||||
}
|
||||
|
||||
func (s *Server) getMakeFlowLocked(src, dst key.NodePublic) *flow {
|
||||
k := flowKey{src, dst}
|
||||
f, ok := s.flow[k]
|
||||
if ok {
|
||||
return f
|
||||
}
|
||||
now := time.Now()
|
||||
f = &flow{
|
||||
createdUnixNano: now.UnixNano(),
|
||||
index: len(s.flows),
|
||||
flowKey: unique.Make(k),
|
||||
}
|
||||
f.roughActivityUnixTime.Store(now.Unix())
|
||||
f.ref.Add(1) // for Server's ref in the s.flows map itself
|
||||
|
||||
// As penance for the one flow we're about to add to the map and slice
|
||||
// above, check two old flows for removal. We roll around and around the
|
||||
// flows slice, so this is a simple way to eventually check everything for
|
||||
// removal before we double in size.
|
||||
for range 2 {
|
||||
s.maybeCleanOldFlowLocked()
|
||||
}
|
||||
|
||||
s.flow[k] = f
|
||||
s.flows = append(s.flows, f)
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func (s *Server) maybeCleanOldFlowLocked() {
|
||||
if len(s.flows) == 0 {
|
||||
return
|
||||
}
|
||||
s.flowCleanIndex++
|
||||
if s.flowCleanIndex >= len(s.flows) {
|
||||
s.flowCleanIndex = 0
|
||||
}
|
||||
f := s.flows[s.flowCleanIndex]
|
||||
|
||||
now := time.Now().Unix()
|
||||
ageSec := now - f.roughActivityUnixTime.Load()
|
||||
if ageSec > 3600 {
|
||||
// No activity in an hour. Remove it.
|
||||
delete(s.flow, f.flowKey.Value())
|
||||
holeIdx := f.index
|
||||
s.flows[holeIdx] = s.flows[len(s.flows)-1]
|
||||
s.flows[holeIdx].index = holeIdx
|
||||
s.flows = s.flows[:len(s.flows)-1]
|
||||
f.ref.Add(-1)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type flowAndClientSet struct {
|
||||
f *flow // always non-nil
|
||||
cs *clientSet // may be nil if peer not connected/known
|
||||
}
|
||||
|
||||
// lookupDest returns the flow (always non-nil) and sclient and/or
|
||||
// PacketForwarder (at least one of which will be nil, possibly both) for the
|
||||
// given destination node.
|
||||
|
||||
// It must only be called from the [sclient.run] goroutine.
|
||||
func (c *sclient) lookupDest(dst key.NodePublic) (_ *flow, _ *sclient, fwd PacketForwarder) {
|
||||
peer, ok := c.flows.GetOk(dst)
|
||||
if ok && peer.cs != nil {
|
||||
if c := peer.cs.activeClient.Load(); c != nil {
|
||||
// Common case for hot flows within the same node: we know the
|
||||
// clientSet and no mutex is needed.
|
||||
return peer.f, c, nil
|
||||
}
|
||||
}
|
||||
|
||||
if peer.f == nil {
|
||||
peer.f = c.s.getMakeFlow(c.key, dst)
|
||||
peer.f.ref.Add(1)
|
||||
// At least store the flow in the map, even if we don't find the
|
||||
// clientSet later. In theory we could coallesce this map write with a
|
||||
// possible one later, but they should be rare and uncontended so we
|
||||
// don't care as of 2024-09-18.
|
||||
c.flows.Set(dst, peer)
|
||||
c.maybeCleanFlows()
|
||||
}
|
||||
|
||||
srv := c.s
|
||||
srv.mu.Lock()
|
||||
set, ok := srv.clients[dst]
|
||||
if ok {
|
||||
if c := set.activeClient.Load(); c != nil {
|
||||
srv.mu.Unlock()
|
||||
peer.cs = set
|
||||
c.flows.Set(dst, peer)
|
||||
c.maybeCleanFlows()
|
||||
return peer.f, c, nil
|
||||
}
|
||||
fwd = srv.clientsMesh[dst]
|
||||
}
|
||||
srv.mu.Unlock()
|
||||
return peer.f, nil, fwd // fwd may be nil too
|
||||
}
|
||||
|
||||
// maybeCleanFlows cleans the oldest element from the client flows cache if
|
||||
// it's too big.
|
||||
//
|
||||
// It must only be called from the [sclient.run] goroutine.
|
||||
func (c *sclient) maybeCleanFlows() {
|
||||
const maxClientFlowTrack = 100
|
||||
if c.flows.Len() <= maxClientFlowTrack {
|
||||
return
|
||||
}
|
||||
|
||||
oldest, _ := c.flows.OldestKey()
|
||||
facs, ok := c.flows.PeekOk(oldest)
|
||||
if !ok {
|
||||
panic("lookupDest: OldestKey lied")
|
||||
}
|
||||
facs.f.ref.Add(-1)
|
||||
c.flows.Delete(oldest)
|
||||
}
|
52
derp/flow_test.go
Normal file
52
derp/flow_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
package derp
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"unique"
|
||||
|
||||
"go4.org/mem"
|
||||
"tailscale.com/types/key"
|
||||
)
|
||||
|
||||
func BenchmarkUnique(b *testing.B) {
|
||||
var keys [100]key.NodePublic
|
||||
for i := range keys {
|
||||
keys[i] = key.NodePublicFromRaw32(mem.B([]byte{31: byte(i)}))
|
||||
}
|
||||
b.Run("raw", func(b *testing.B) {
|
||||
m := map[flowKey]bool{}
|
||||
for range b.N {
|
||||
for _, k := range keys {
|
||||
key := flowKey{k, k}
|
||||
if _, ok := m[key]; !ok {
|
||||
m[key] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
b.Run("unique-tightmake", func(b *testing.B) {
|
||||
m := map[unique.Handle[flowKey]]bool{}
|
||||
for range b.N {
|
||||
for _, k := range keys {
|
||||
key := unique.Make(flowKey{k, k})
|
||||
if _, ok := m[key]; !ok {
|
||||
m[key] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
b.Run("unique-makeonce", func(b *testing.B) {
|
||||
m := map[unique.Handle[flowKey]]bool{}
|
||||
ukeys := make([]unique.Handle[flowKey], len(keys))
|
||||
for i, k := range keys {
|
||||
ukeys[i] = unique.Make(flowKey{k, k})
|
||||
}
|
||||
for range b.N {
|
||||
for _, key := range ukeys {
|
||||
if _, ok := m[key]; !ok {
|
||||
m[key] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
@ -133,6 +133,15 @@ func (c *Cache[K, V]) DeleteOldest() {
|
||||
}
|
||||
}
|
||||
|
||||
// OldestKey returns the oldest key, without bumping it to the head.
|
||||
// If the cache is empty, it returns ok false.
|
||||
func (c *Cache[K, V]) OldestKey() (key K, ok bool) {
|
||||
if c.head == nil {
|
||||
return key, false
|
||||
}
|
||||
return c.head.prev.key, true
|
||||
}
|
||||
|
||||
// Len returns the number of items in the cache.
|
||||
func (c *Cache[K, V]) Len() int { return len(c.lookup) }
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user