prober: qd example using slice

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited 2024-12-17 20:23:52 -08:00
parent 9d8c0c665e
commit 9b9d74ccf5
No known key found for this signature in database
GPG Key ID: 33DF352F65991EB8

View File

@ -38,7 +38,6 @@ import (
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/circularqueue"
)
// derpProber dynamically manages several probes for each DERP server
@ -387,17 +386,13 @@ func derpProbeQueuingDelay(ctx context.Context, dm *tailcfg.DERPMap, from, to *t
}
func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, packetsDropped *expvar.Float, qdh *queuingDelayHistogram) error {
// Circular buffer to hold packet send times. It is sized to hold timings
// for up to 5 seconds when sending packets at a rate of 10 per second.
// It assumes that packets may be dropped, but that they will generally
// arrive in order. Packets arriving out of order will result in older
// packets being ignored, effectively overcounting the number of dropped
// packets.
sentTimes := circularqueue.NewFIFO(50, func(t time.Time) {
// If a sent time is evicted, that means we'll never record a timing
// for this packet, so we considered it dropped.
packetsDropped.Add(1)
})
type txRecord struct {
at time.Time
seq uint64
}
// TODO: slice cap should be result of arithmetic involving tx Hz and slowest bucket bounds
txRecords := make([]txRecord, 0, 50)
var txRecordsMu sync.Mutex
// Send the packets.
sendErrC := make(chan error, 1)
@ -408,14 +403,20 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.
t := time.NewTicker(time.Second / 10) // 10 packets per second
defer t.Stop()
seq := 0
seq := uint64(0)
for {
select {
case <-ctx.Done():
return
case <-t.C:
sentTimes.Push(time.Now())
binary.BigEndian.PutUint64(pkt, uint64(seq))
txRecordsMu.Lock()
if len(txRecords) == cap(txRecords) {
txRecords = append(txRecords[:0], txRecords[1:]...)
// TODO: increment drop counter
}
txRecords = append(txRecords, txRecord{time.Now(), seq})
txRecordsMu.Unlock()
binary.BigEndian.PutUint64(pkt, seq)
seq++
if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil {
sendErrC <- fmt.Errorf("sending packet %w", err)
@ -443,13 +444,21 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.
return
}
seq := binary.BigEndian.Uint64(v.Data)
sent := sentTimes.Pop(int(seq))
if sent == nil {
// No sent time found, ignore
continue
txRecordsMu.Lock()
for _, record := range txRecords {
switch {
case record.seq == seq:
rtt := now.Sub(record.at)
qdh.add(rtt) // TODO: use prometheus native histo if possible
case record.seq > seq:
// No sent time found, probably a late arrival already
// recorded as drop by sender when deleted
break
case record.seq < seq:
continue
}
}
qdh.add(now.Sub(*sent))
txRecordsMu.Unlock()
case derp.KeepAliveMessage:
// Silently ignore.
default: