diff --git a/cmd/derpprobe/derpprobe.go b/cmd/derpprobe/derpprobe.go index 620b96609..a4fa65b42 100644 --- a/cmd/derpprobe/derpprobe.go +++ b/cmd/derpprobe/derpprobe.go @@ -29,7 +29,8 @@ var ( tlsInterval = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval") bwInterval = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing)") bwSize = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size") - bwTUNIPv4Address = flag.String("bw-tun-ipv4-addr", "", "if specified, bandwidth probes will be performed over a TUN device at this address in order to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP. We will use a /30 subnet including this IP address.") + bwTUNIPv4Address = flag.String("bw-tun-ipv4-addr", "", "if specified, bandwidth probes will be performed over a TUN device at this address in order to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP; we will use a /30 subnet including this IP address") + queuingDelay = flag.Bool("qd", false, "if specified, queuing delay will be measured continuously using 260 byte packets (approximate size of a CallMeMaybe packet) sent at a rate of 10 per second") regionCode = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed") ) @@ -45,6 +46,7 @@ func main() { prober.WithMeshProbing(*meshInterval), prober.WithSTUNProbing(*stunInterval), prober.WithTLSProbing(*tlsInterval), + prober.WithQueuingDelayProbing(*queuingDelay), } if *bwInterval > 0 { opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize, *bwTUNIPv4Address)) @@ -107,7 +109,7 @@ func getOverallStatus(p *prober.Prober) (o overallStatus) { // Do not show probes that have not finished yet. continue } - if i.Result { + if i.Status == prober.ProbeStatusSucceeded { o.addGoodf("%s: %s", p, i.Latency) } else { o.addBadf("%s: %s", p, i.Error) diff --git a/prober/derp.go b/prober/derp.go index 742e8a5f4..b3e6b28f9 100644 --- a/prober/derp.go +++ b/prober/derp.go @@ -8,6 +8,7 @@ import ( "cmp" "context" crand "crypto/rand" + "encoding/binary" "encoding/json" "errors" "expvar" @@ -37,6 +38,7 @@ import ( "tailscale.com/tailcfg" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/util/circularqueue" ) // derpProber dynamically manages several probes for each DERP server @@ -53,6 +55,9 @@ type derpProber struct { bwProbeSize int64 bwTUNIPv4Prefix *netip.Prefix // or nil to not use TUN + // Optional queuing delay probing. + qdProbe bool + // Optionally restrict probes to a single regionCode. regionCode string @@ -64,6 +69,7 @@ type derpProber struct { udpProbeFn func(string, int) ProbeClass meshProbeFn func(string, string) ProbeClass bwProbeFn func(string, string, int64) ProbeClass + qdProbeFn func(string, string) ProbeClass sync.Mutex lastDERPMap *tailcfg.DERPMap @@ -93,6 +99,13 @@ func WithBandwidthProbing(interval time.Duration, size int64, tunAddress string) } } +// WithQueuingDelayProbing enables/disables queuing delay probing. +func WithQueuingDelayProbing(qdProbe bool) DERPOpt { + return func(d *derpProber) { + d.qdProbe = qdProbe + } +} + // WithMeshProbing enables mesh probing. When enabled, a small message will be // transferred through each DERP server and each pair of DERP servers. func WithMeshProbing(interval time.Duration) DERPOpt { @@ -147,6 +160,7 @@ func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) { d.udpProbeFn = d.ProbeUDP d.meshProbeFn = d.probeMesh d.bwProbeFn = d.probeBandwidth + d.qdProbeFn = d.probeQueuingDelay return d, nil } @@ -213,7 +227,7 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { } } - if d.bwInterval > 0 && d.bwProbeSize > 0 { + if d.bwInterval != 0 && d.bwProbeSize > 0 { n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name) wantProbes[n] = true if d.probes[n] == nil { @@ -225,6 +239,15 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) } } + + if d.qdProbe { + n := fmt.Sprintf("derp/%s/%s/%s/qd", region.RegionCode, server.Name, to.Name) + wantProbes[n] = true + if d.probes[n] == nil { + log.Printf("adding DERP queuing delay probe for %s->%s (%s)", server.Name, to.Name, region.RegionName) + d.probes[n] = d.p.Run(n, -10*time.Second, labels, d.qdProbeFn(server.Name, to.Name)) + } + } } } } @@ -240,7 +263,7 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { return nil } -// probeMesh returs a probe class that sends a test packet through a pair of DERP +// probeMesh returns a probe class that sends a test packet through a pair of DERP // servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to' // are expected to be names (DERPNode.Name) of two DERP servers in the same region. func (d *derpProber) probeMesh(from, to string) ProbeClass { @@ -263,7 +286,7 @@ func (d *derpProber) probeMesh(from, to string) ProbeClass { } } -// probeBandwidth returs a probe class that sends a payload of a given size +// probeBandwidth returns a probe class that sends a payload of a given size // through a pair of DERP servers (or just one server, if 'from' and 'to' are // the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two // DERP servers in the same region. @@ -295,6 +318,188 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { } } +// probeQueuingDelay returns a probe class that continuously sends packets +// through a pair of DERP servers (or just one server, if 'from' and 'to' are +// the same) at a rate of 10 packets per second in order to measure queuing +// delays. 'from' and 'to' are expected to be names (DERPNode.Name) of two DERP +// servers in the same region. +func (d *derpProber) probeQueuingDelay(from, to string) ProbeClass { + derpPath := "mesh" + if from == to { + derpPath = "single" + } + var packetsDropped expvar.Float + qdh := queuingDelayHistogram{ + buckets: make(map[float64]uint64, len(qdhBuckets)), + } + return ProbeClass{ + Probe: func(ctx context.Context) error { + fromN, toN, err := d.getNodePair(from, to) + if err != nil { + return err + } + return derpProbeQueuingDelay(ctx, d.lastDERPMap, fromN, toN, &packetsDropped, &qdh) + }, + Class: "derp_qd", + Labels: Labels{"derp_path": derpPath}, + Metrics: func(l prometheus.Labels) []prometheus.Metric { + qdh.mx.Lock() + result := []prometheus.Metric{ + prometheus.MustNewConstMetric(prometheus.NewDesc("derp_qd_probe_dropped_packets", "Total packets dropped", nil, l), prometheus.CounterValue, float64(packetsDropped.Value())), + prometheus.MustNewConstHistogram(prometheus.NewDesc("derp_qd_probe_delays_seconds", "Distribution of queuing delays", nil, l), qdh.count, qdh.sum, qdh.buckets), + } + qdh.mx.Unlock() + return result + }, + } +} + +// derpProbeQueuingDelay continuously sends data between two local DERP clients +// connected to two DERP servers in order to measure queuing delays. +func derpProbeQueuingDelay(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, packetsDropped *expvar.Float, qdh *queuingDelayHistogram) (err error) { + // This probe uses clients with isProber=false to avoid spamming the derper + // logs with every packet sent by the queuing delay probe. + fromc, err := newConn(ctx, dm, from, false) + if err != nil { + return err + } + defer fromc.Close() + toc, err := newConn(ctx, dm, to, false) + if err != nil { + return err + } + defer toc.Close() + + // Wait a bit for from's node to hear about to existing on the + // other node in the region, in the case where the two nodes + // are different. + if from.Name != to.Name { + time.Sleep(100 * time.Millisecond) // pretty arbitrary + } + + if err := runDerpProbeQueuingDelayContinously(ctx, from, to, fromc, toc, packetsDropped, qdh); err != nil { + // Record pubkeys on failed probes to aid investigation. + return fmt.Errorf("%s -> %s: %w", + fromc.SelfPublicKey().ShortString(), + toc.SelfPublicKey().ShortString(), err) + } + return nil +} + +func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, packetsDropped *expvar.Float, qdh *queuingDelayHistogram) error { + // Circular buffer to hold packet send times. It is sized to hold timings + // for up to 5 seconds when sending packets at a rate of 10 per second. + // It assumes that packets may be dropped, but that they will generally + // arrive in order. Packets arriving out of order will result in older + // packets being ignored, effectively overcounting the number of dropped + // packets. + sentTimes := circularqueue.NewFIFO(50, func(t time.Time) { + // If a sent time is evicted, that means we'll never record a timing + // for this packet, so we considered it dropped. + packetsDropped.Add(1) + }) + + // Send the packets. + sendErrC := make(chan error, 1) + pkt := make([]byte, 260) // the same size as a CallMeMaybe packet observed on a Tailscale client. + crand.Read(pkt) + + go func() { + t := time.NewTicker(time.Second / 10) // 10 packets per second + defer t.Stop() + + seq := 0 + for { + select { + case <-ctx.Done(): + return + case <-t.C: + sentTimes.Push(time.Now()) + binary.BigEndian.PutUint64(pkt, uint64(seq)) + seq++ + if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil { + sendErrC <- fmt.Errorf("sending packet %w", err) + return + } + } + } + }() + + // Receive the packets. + recvFinishedC := make(chan error, 1) + go func() { + defer close(recvFinishedC) // to break out of 'select' below. + for { + m, err := toc.Recv() + if err != nil { + recvFinishedC <- err + return + } + switch v := m.(type) { + case derp.ReceivedPacket: + now := time.Now() + if v.Source != fromc.SelfPublicKey() { + recvFinishedC <- fmt.Errorf("got data packet from unexpected source, %v", v.Source) + return + } + seq := binary.BigEndian.Uint64(v.Data) + sent := sentTimes.Pop(int(seq)) + if sent == nil { + // No sent time found, ignore + continue + } + qdh.add(now.Sub(*sent)) + + case derp.KeepAliveMessage: + // Silently ignore. + default: + log.Printf("%v: ignoring Recv frame type %T", to.Name, v) + // Loop. + } + } + }() + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout: %w", ctx.Err()) + case err := <-sendErrC: + return fmt.Errorf("error sending via %q: %w", from.Name, err) + case err := <-recvFinishedC: + if err != nil { + return fmt.Errorf("error receiving from %q: %w", to.Name, err) + } + } + return nil +} + +// queuingDelayHistogram allows tracking a histogram of queuing delays +type queuingDelayHistogram struct { + count uint64 + sum float64 + buckets map[float64]uint64 + mx sync.Mutex +} + +// qdhBuckets defines the buckets (in seconds) for the queuingDelayHistogram. +var qdhBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1} + +func (qdh *queuingDelayHistogram) add(d time.Duration) { + qdh.mx.Lock() + defer qdh.mx.Unlock() + + seconds := float64(d.Seconds()) + qdh.count++ + qdh.sum += seconds + + for _, b := range qdhBuckets { + if seconds > b { + continue + } + qdh.buckets[b] += 1 + break + } +} + // getNodePair returns DERPNode objects for two DERP servers based on their // short names. func (d *derpProber) getNodePair(n1, n2 string) (ret1, ret2 *tailcfg.DERPNode, _ error) { @@ -573,6 +778,8 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc recvc <- fmt.Errorf("got data packet %d from unexpected source, %v", idx, v.Source) return } + // This assumes that the packets are received reliably and in order. + // The DERP protocol does not guarantee this, but this probe assumes it. if got, want := v.Data, pkts[idx]; !bytes.Equal(got, want) { recvc <- fmt.Errorf("unexpected data packet %d (out of %d)", idx, len(pkts)) return diff --git a/prober/prober.go b/prober/prober.go index 2a43628bd..2e3b09b8c 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -256,6 +256,11 @@ type Probe struct { latencyHist *ring.Ring } +// IsContinuous indicates that this is a continuous probe. +func (p *Probe) IsContinuous() bool { + return p.interval < 0 +} + // Close shuts down the Probe and unregisters it from its Prober. // It is safe to Run a new probe of the same name after Close returns. func (p *Probe) Close() error { @@ -288,6 +293,22 @@ func (p *Probe) loop() { return } + if p.IsContinuous() { + // Probe function is going to run continuously. + for { + p.run() + // Wait and then retry if probe fails. We use the inverse of the + // configured negative interval as our sleep period. + // TODO(percy):implement exponential backoff, possibly using logtail/backoff. + select { + case <-time.After(-1 * p.interval): + p.run() + case <-p.ctx.Done(): + return + } + } + } + p.tick = p.prober.newTicker(p.interval) defer p.tick.Stop() for { @@ -323,9 +344,17 @@ func (p *Probe) run() (pi ProbeInfo, err error) { p.recordEnd(err) } }() - timeout := time.Duration(float64(p.interval) * 0.8) - ctx, cancel := context.WithTimeout(p.ctx, timeout) - defer cancel() + ctx := p.ctx + if p.IsContinuous() { + p.mu.Lock() + p.lastErr = nil + p.mu.Unlock() + } else { + timeout := time.Duration(float64(p.interval) * 0.8) + var cancel func() + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } err = p.probeClass.Probe(ctx) p.recordEnd(err) @@ -365,6 +394,16 @@ func (p *Probe) recordEnd(err error) { p.successHist = p.successHist.Next() } +// ProbeStatus indicates the status of a probe. +type ProbeStatus string + +const ( + ProbeStatusUnknown = "unknown" + ProbeStatusRunning = "running" + ProbeStatusFailed = "failed" + ProbeStatusSucceeded = "succeeded" +) + // ProbeInfo is a snapshot of the configuration and state of a Probe. type ProbeInfo struct { Name string @@ -374,7 +413,7 @@ type ProbeInfo struct { Start time.Time End time.Time Latency time.Duration - Result bool + Status ProbeStatus Error string RecentResults []bool RecentLatencies []time.Duration @@ -402,6 +441,10 @@ func (pb ProbeInfo) RecentMedianLatency() time.Duration { return pb.RecentLatencies[len(pb.RecentLatencies)/2] } +func (pb ProbeInfo) Continuous() bool { + return pb.Interval < 0 +} + // ProbeInfo returns the state of all probes. func (p *Prober) ProbeInfo() map[string]ProbeInfo { out := map[string]ProbeInfo{} @@ -429,9 +472,14 @@ func (probe *Probe) probeInfoLocked() ProbeInfo { Labels: probe.metricLabels, Start: probe.start, End: probe.end, - Result: probe.succeeded, } - if probe.lastErr != nil { + inf.Status = ProbeStatusUnknown + if probe.end.Before(probe.start) { + inf.Status = ProbeStatusRunning + } else if probe.succeeded { + inf.Status = ProbeStatusSucceeded + } else if probe.lastErr != nil { + inf.Status = ProbeStatusFailed inf.Error = probe.lastErr.Error() } if probe.latency > 0 { @@ -467,7 +515,7 @@ func (p *Prober) RunHandler(w http.ResponseWriter, r *http.Request) error { p.mu.Lock() probe, ok := p.probes[name] p.mu.Unlock() - if !ok { + if !ok || probe.IsContinuous() { return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil) } @@ -531,7 +579,8 @@ func (p *Probe) Collect(ch chan<- prometheus.Metric) { if !p.start.IsZero() { ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix())) } - if p.end.IsZero() { + // For periodic probes that haven't ended, don't collect probe metrics yet. + if p.end.IsZero() && !p.IsContinuous() { return } ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix())) diff --git a/prober/prober_test.go b/prober/prober_test.go index 742a914b2..3905bfbc9 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -316,7 +316,7 @@ func TestProberProbeInfo(t *testing.T) { Interval: probeInterval, Labels: map[string]string{"class": "", "name": "probe1"}, Latency: 500 * time.Millisecond, - Result: true, + Status: ProbeStatusSucceeded, RecentResults: []bool{true}, RecentLatencies: []time.Duration{500 * time.Millisecond}, }, @@ -324,6 +324,7 @@ func TestProberProbeInfo(t *testing.T) { Name: "probe2", Interval: probeInterval, Labels: map[string]string{"class": "", "name": "probe2"}, + Status: ProbeStatusFailed, Error: "error2", RecentResults: []bool{false}, RecentLatencies: nil, // no latency for failed probes @@ -349,7 +350,7 @@ func TestProbeInfoRecent(t *testing.T) { }{ { name: "no_runs", - wantProbeInfo: ProbeInfo{}, + wantProbeInfo: ProbeInfo{Status: ProbeStatusUnknown}, wantRecentSuccessRatio: 0, wantRecentMedianLatency: 0, }, @@ -358,7 +359,7 @@ func TestProbeInfoRecent(t *testing.T) { results: []probeResult{{latency: 100 * time.Millisecond, err: nil}}, wantProbeInfo: ProbeInfo{ Latency: 100 * time.Millisecond, - Result: true, + Status: ProbeStatusSucceeded, RecentResults: []bool{true}, RecentLatencies: []time.Duration{100 * time.Millisecond}, }, @@ -369,7 +370,7 @@ func TestProbeInfoRecent(t *testing.T) { name: "single_failure", results: []probeResult{{latency: 100 * time.Millisecond, err: errors.New("error123")}}, wantProbeInfo: ProbeInfo{ - Result: false, + Status: ProbeStatusFailed, RecentResults: []bool{false}, RecentLatencies: nil, Error: "error123", @@ -390,7 +391,7 @@ func TestProbeInfoRecent(t *testing.T) { {latency: 80 * time.Millisecond, err: nil}, }, wantProbeInfo: ProbeInfo{ - Result: true, + Status: ProbeStatusSucceeded, Latency: 80 * time.Millisecond, RecentResults: []bool{false, true, true, false, true, true, false, true}, RecentLatencies: []time.Duration{ @@ -420,7 +421,7 @@ func TestProbeInfoRecent(t *testing.T) { {latency: 110 * time.Millisecond, err: nil}, }, wantProbeInfo: ProbeInfo{ - Result: true, + Status: ProbeStatusSucceeded, Latency: 110 * time.Millisecond, RecentResults: []bool{true, true, true, true, true, true, true, true, true, true}, RecentLatencies: []time.Duration{ @@ -483,7 +484,7 @@ func TestProberRunHandler(t *testing.T) { ProbeInfo: ProbeInfo{ Name: "success", Interval: probeInterval, - Result: true, + Status: ProbeStatusSucceeded, RecentResults: []bool{true, true}, }, PreviousSuccessRatio: 1, @@ -498,7 +499,7 @@ func TestProberRunHandler(t *testing.T) { ProbeInfo: ProbeInfo{ Name: "failure", Interval: probeInterval, - Result: false, + Status: ProbeStatusFailed, Error: "error123", RecentResults: []bool{false, false}, }, diff --git a/prober/status.go b/prober/status.go index aa9ef99d0..20fbeec58 100644 --- a/prober/status.go +++ b/prober/status.go @@ -62,8 +62,9 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc return func(w http.ResponseWriter, r *http.Request) error { type probeStatus struct { ProbeInfo - TimeSinceLast time.Duration - Links map[string]template.URL + TimeSinceLastStart time.Duration + TimeSinceLastEnd time.Duration + Links map[string]template.URL } vars := struct { Title string @@ -81,12 +82,15 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc for name, info := range p.ProbeInfo() { vars.TotalProbes++ - if !info.Result { + if info.Error != "" { vars.UnhealthyProbes++ } s := probeStatus{ProbeInfo: info} + if !info.Start.IsZero() { + s.TimeSinceLastStart = time.Since(info.Start).Truncate(time.Second) + } if !info.End.IsZero() { - s.TimeSinceLast = time.Since(info.End).Truncate(time.Second) + s.TimeSinceLastEnd = time.Since(info.End).Truncate(time.Second) } for textTpl, urlTpl := range params.probeLinks { text, err := renderTemplate(textTpl, info) diff --git a/prober/status.html b/prober/status.html index ff0f06c13..d26588da1 100644 --- a/prober/status.html +++ b/prober/status.html @@ -73,8 +73,9 @@ Name Probe Class & Labels Interval - Last Attempt - Success + Last Finished + Last Started + Status Latency Last Error @@ -85,9 +86,11 @@ {{$name}} {{range $text, $url := $probeInfo.Links}}
- + {{if not $probeInfo.Continuous}} + + {{end}} {{end}} {{$probeInfo.Class}}
@@ -97,28 +100,48 @@ {{end}} - {{$probeInfo.Interval}} - - {{if $probeInfo.TimeSinceLast}} - {{$probeInfo.TimeSinceLast.String}} ago
+ + {{if $probeInfo.Continuous}} + Continuous + {{else}} + {{$probeInfo.Interval}} + {{end}} + + + {{if $probeInfo.TimeSinceLastEnd}} + {{$probeInfo.TimeSinceLastEnd.String}} ago
{{$probeInfo.End.Format "2006-01-02T15:04:05Z07:00"}} {{else}} Never {{end}} - - {{if $probeInfo.Result}} - {{$probeInfo.Result}} + + {{if $probeInfo.TimeSinceLastStart}} + {{$probeInfo.TimeSinceLastStart.String}} ago
+ {{$probeInfo.Start.Format "2006-01-02T15:04:05Z07:00"}} {{else}} - {{$probeInfo.Result}} + Never + {{end}} + + + {{if $probeInfo.Error}} + {{$probeInfo.Status}} + {{else}} + {{$probeInfo.Status}} {{end}}
-
Recent: {{$probeInfo.RecentResults}}
-
Mean: {{$probeInfo.RecentSuccessRatio}}
+ {{if not $probeInfo.Continuous}} +
Recent: {{$probeInfo.RecentResults}}
+
Mean: {{$probeInfo.RecentSuccessRatio}}
+ {{end}} - {{$probeInfo.Latency.String}} -
Recent: {{$probeInfo.RecentLatencies}}
-
Median: {{$probeInfo.RecentMedianLatency}}
+ {{if $probeInfo.Continuous}} + n/a + {{else}} + {{$probeInfo.Latency.String}} +
Recent: {{$probeInfo.RecentLatencies}}
+
Median: {{$probeInfo.RecentMedianLatency}}
+ {{end}} {{$probeInfo.Error}} diff --git a/util/circularqueue/circularqueue.go b/util/circularqueue/circularqueue.go new file mode 100644 index 000000000..97711f714 --- /dev/null +++ b/util/circularqueue/circularqueue.go @@ -0,0 +1,94 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// package circularqueue provides circular queues. +package circularqueue + +import ( + "math" + "sync" +) + +const ( + // Head is the index of the head of a queue. + Head = -1 +) + +// FIFO is a bounded queue that acts as if it has infinite depth. When an item +// is added to an already full queue, the oldest item in the queue is evicted +// to make room. +// +// Items in the queue are indexed, such that one can pop specific items by +// index. If an item is popped that is not at the head of the queue, all items +// up to the popped item are immediately evicted. +type FIFO[T any] struct { + // mu protects all of the below fields + mu sync.Mutex + + capacity int + head int + tail int + onEvict func(T) + items []T +} + +// NewFIFO constructs a new [FIFO] queue with the given capacity and onEvict +// callback. +func NewFIFO[T any](capacity int, onEvict func(T)) *FIFO[T] { + return &FIFO[T]{ + capacity: capacity, + tail: -1, + onEvict: onEvict, + items: make([]T, capacity), + } +} + +// Push pushes a new item onto the queue, evicting the item at the head if the +// queue is at capacity. If the number of items pushed to the queue reaches +// [math.MaxInt], this will panic with "FIFO queue sequence number exhausted". +func (q *FIFO[T]) Push(item T) { + q.mu.Lock() + defer q.mu.Unlock() + + q.tail++ + if q.tail == math.MaxInt { + // We don't currently handle wrapping indexes + panic("FIFO queue sequence number exhausted") + } + + if q.tail-q.head >= q.capacity { + q.onEvict(q.itemAtLocked(q.head)) + q.head++ + } + q.items[q.tail%q.capacity] = item +} + +// Pop removes the item at idx. If idx is past the tail or before the head of +// this queue, Pop returns nil. If an item at idx is available, all items in +// the queue at indices less than idx are immediately evicted. If idx <= [Head], +// this pops the item at the head of the queue. +func (q *FIFO[T]) Pop(idx int) *T { + q.mu.Lock() + defer q.mu.Unlock() + + if idx < 0 { + idx = q.head + } else if idx < q.head { + return nil + } else if idx > q.tail { + return nil + } + + // Evict items if necessary + for i := q.head; i < idx; i++ { + q.onEvict(q.itemAtLocked(i)) + } + + q.head = idx + 1 + item := q.itemAtLocked(idx) + return &item +} + +func (q *FIFO[T]) itemAtLocked(idx int) T { + return q.items[idx%q.capacity] +} diff --git a/util/circularqueue/circularqueue_test.go b/util/circularqueue/circularqueue_test.go new file mode 100644 index 000000000..92ebafd09 --- /dev/null +++ b/util/circularqueue/circularqueue_test.go @@ -0,0 +1,47 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package circularqueue + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestFIFO(t *testing.T) { + var evicted []int + q := NewFIFO(3, func(item int) { + evicted = append(evicted, item) + }) + + assertPop := func(idx int, want int) { + t.Helper() + got := q.Pop(idx) + var _want *int + if want >= 0 { + _want = &want + } + if diff := cmp.Diff(got, _want); diff != "" { + t.Fatalf("unexpected item (-got +want):\n%s", diff) + } + } + + q.Push(1) + q.Push(2) + q.Push(3) + assertPop(3, -1) + assertPop(Head, 1) + assertPop(2, 3) // Should evict 2 + assertPop(2, -1) + + q.Push(4) + q.Push(5) + q.Push(6) + assertPop(5, 6) // Should evict 4 and 5 + + if diff := cmp.Diff(evicted, []int{2, 4, 5}); diff != "" { + t.Fatalf("unexpected evicted (-got +want):\n%s", diff) + } + +}