derp/: Switch approach for metrics

Instead of explicitly encoding 5 and 10 seconds and checking for then, use a timer to keep track
of how much total time has elapsed since a flow started, where a flow is defined for any period
of time where every contiguous 3 minute window has at least 1 packet.

Signed-off-by: julianknodt <julianknodt@gmail.com>
This commit is contained in:
julianknodt 2021-06-08 12:57:33 -07:00
parent 7423c72f84
commit 152707960e
3 changed files with 78 additions and 28 deletions

1
.gitignore vendored
View File

@ -5,6 +5,7 @@
*.dll
*.so
*.dylib
*.swp
cmd/tailscale/tailscale
cmd/tailscaled/tailscaled

View File

@ -20,6 +20,7 @@ import (
"io"
"io/ioutil"
"log"
"math"
"math/big"
"math/rand"
"os"
@ -41,6 +42,9 @@ import (
var debug, _ = strconv.ParseBool(os.Getenv("DERP_DEBUG_LOGS"))
// How long a flow is considered to be active for.
var DerpFlowLogTime = 3 * time.Minute
// verboseDropKeys is the set of destination public keys that should
// verbosely log whenever DERP drops a packet.
var verboseDropKeys = map[key.Public]bool{}
@ -120,8 +124,10 @@ type Server struct {
multiForwarderCreated expvar.Int
multiForwarderDeleted expvar.Int
removePktForwardOther expvar.Int
clientsInUse5Sec expvar.Int // Number of clients using Derp after 5 seconds.
clientsInUse10Sec expvar.Int
flow_mu sync.Mutex
activeFlows map[*sclient]flow
flowLogs metrics.LabelMap
mu sync.Mutex
closed bool
@ -142,6 +148,17 @@ type Server struct {
sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum
}
// Flow retains metrics about packets sent in serial.
//
type flow struct {
packetKinds struct {
disco expvar.Int
other expvar.Int
}
createdAt time.Time
timer *time.Timer
}
// PacketForwarder is something that can forward packets.
//
// It's mostly an inteface for circular dependency reasons; the
@ -184,6 +201,9 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
memSys0: ms.Sys,
watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
activeFlows: map[*sclient]flow{},
flowLogs: metrics.LabelMap{Label: "minutes"},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
@ -459,7 +479,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
done: ctx.Done(),
remoteAddr: remoteAddr,
connectedAt: time.Now(),
lastPktAt: time.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
@ -639,6 +658,8 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
if err != nil {
return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
}
/// Do not need to block on updating metrics
go s.updateFlow(c, disco.LooksLikeDiscoWrapper(contents), nil)
var fwd PacketForwarder
s.mu.Lock()
@ -666,7 +687,6 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
return nil
}
c.markLastPktAt()
p := pkt{
bs: contents,
@ -858,6 +878,46 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.Publi
return dstKey, contents, nil
}
// Updates an active flow for a given client given that we
// saw a packet from them
func (s *Server) updateFlow(c *sclient, isDiscoPacket bool, done chan<- bool) {
s.flow_mu.Lock()
defer s.flow_mu.Unlock()
flow, exists := s.activeFlows[c]
if isDiscoPacket {
flow.packetKinds.disco.Add(1)
} else {
flow.packetKinds.other.Add(1)
}
if !exists {
flow.createdAt = time.Now()
s.activeFlows[c] = flow
}
if flow.timer == nil {
flow.timer = time.AfterFunc(DerpFlowLogTime, func() {
s.flow_mu.Lock()
defer s.flow_mu.Unlock()
running_time := time.Since(flow.createdAt)
// report how many flows were alive for how many minutes
s.flowLogs.Get(fmt.Sprint(math.Ceil(running_time.Minutes()))).Add(1)
delete(s.activeFlows, c)
if done != nil {
done <- true
}
})
} else {
if !flow.timer.Reset(DerpFlowLogTime) {
// If the previous timer already ran, just stop timer and exit since it either removed it
// from the map or is waiting on the lock.
flow.timer.Stop()
}
}
}
// zpub is the key.Public zero value.
var zpub key.Public
@ -905,7 +965,6 @@ type sclient struct {
// Owned by run, not thread-safe.
br *bufio.Reader
connectedAt time.Time
lastPktAt time.Time
preferred bool
// Owned by sender, not thread-safe.
@ -1067,21 +1126,6 @@ func (c *sclient) sendPeerPresent(peer key.Public) error {
return err
}
func (c *sclient) markLastPktAt() {
old := c.lastPktAt
curr := time.Now()
c.lastPktAt = curr
// If we've been connected for over 5 seconds and haven't previously
since_old := old.Sub(c.connectedAt)
since_now := curr.Sub(c.connectedAt)
if since_old <= 5*time.Second && since_now > 5*time.Second {
c.s.clientsInUse5Sec.Add(1)
}
if since_old <= 10*time.Second && since_now > 10*time.Second {
c.s.clientsInUse10Sec.Add(1)
}
}
// sendMeshUpdates drains as many mesh peerStateChange entries as
// possible into the write buffer WITHOUT flushing or otherwise
// blocking (as it holds c.s.mu while working). If it can't drain them
@ -1310,8 +1354,7 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("multiforwarder_created", &s.multiForwarderCreated)
m.Set("multiforwarder_deleted", &s.multiForwarderDeleted)
m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther)
m.Set("clients_inuse_after_5_sec", &s.clientsInUse5Sec)
m.Set("clients_inuse_after_10_sec", &s.clientsInUse10Sec)
m.Set("live_flow_durations", &s.flowLogs)
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)

View File

@ -28,6 +28,10 @@ import (
"tailscale.com/types/logger"
)
func init() {
DerpFlowLogTime = time.Nanosecond
}
func newPrivateKey(tb testing.TB) (k key.Private) {
tb.Helper()
if _, err := crand.Read(k[:]); err != nil {
@ -774,7 +778,7 @@ func TestForwarderRegistration(t *testing.T) {
})
}
func TestLastAliveCounter(t *testing.T) {
func TestDerpFlowLogging(t *testing.T) {
ts := newTestServer(t)
defer ts.close(t)
wantCounter := func(c *expvar.Int, want int) {
@ -783,14 +787,16 @@ func TestLastAliveCounter(t *testing.T) {
t.Errorf("counter = %v; want %v", got, want)
}
}
wantCounter(&ts.s.clientsInUse5Sec, 0)
wantCounter(ts.s.flowLogs.Get("1"), 0)
tc0 := newRegularClient(t, ts, "c0")
time.Sleep(6 * time.Second)
defer tc0.close(t)
time.Sleep(10 * time.Microsecond)
for _, sc := range ts.s.clients {
sc.markLastPktAt()
done := make(chan bool, 1)
ts.s.updateFlow(sc, false, done)
<-done
}
wantCounter(&ts.s.clientsInUse5Sec, 1)
tc0.close(t)
wantCounter(ts.s.flowLogs.Get("1"), 1)
}
func TestMetaCert(t *testing.T) {