mirror of
				https://github.com/tailscale/tailscale.git
				synced 2025-10-31 13:05:22 +00:00 
			
		
		
		
	cmd/derpprobe,prober: add ability to perform continuous queuing delay measurements against DERP servers
This new type of probe sends DERP packets sized similarly to CallMeMaybe packets at a rate of 10 packets per second. It records the round-trip times in a Prometheus histogram. It also keeps track of how many packets are dropped. Packets that fail to arrive within 5 seconds are considered dropped, as are packets that arrive out of order. Updates tailscale/corp#24522 Signed-off-by: Percy Wegmann <percy@tailscale.com>
This commit is contained in:
		| @@ -29,7 +29,8 @@ var ( | |||||||
| 	tlsInterval      = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval") | 	tlsInterval      = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval") | ||||||
| 	bwInterval       = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing)") | 	bwInterval       = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing)") | ||||||
| 	bwSize           = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size") | 	bwSize           = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size") | ||||||
| 	bwTUNIPv4Address = flag.String("bw-tun-ipv4-addr", "", "if specified, bandwidth probes will be performed over a TUN device at this address in order to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP. We will use a /30 subnet including this IP address.") | 	bwTUNIPv4Address = flag.String("bw-tun-ipv4-addr", "", "if specified, bandwidth probes will be performed over a TUN device at this address in order to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP; we will use a /30 subnet including this IP address") | ||||||
|  | 	queuingDelay     = flag.Bool("qd", false, "if specified, queuing delay will be measured continuously using 260 byte packets (approximate size of a CallMeMaybe packet) sent at a rate of 10 per second") | ||||||
| 	regionCode       = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed") | 	regionCode       = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed") | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @@ -45,6 +46,7 @@ func main() { | |||||||
| 		prober.WithMeshProbing(*meshInterval), | 		prober.WithMeshProbing(*meshInterval), | ||||||
| 		prober.WithSTUNProbing(*stunInterval), | 		prober.WithSTUNProbing(*stunInterval), | ||||||
| 		prober.WithTLSProbing(*tlsInterval), | 		prober.WithTLSProbing(*tlsInterval), | ||||||
|  | 		prober.WithQueuingDelayProbing(*queuingDelay), | ||||||
| 	} | 	} | ||||||
| 	if *bwInterval > 0 { | 	if *bwInterval > 0 { | ||||||
| 		opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize, *bwTUNIPv4Address)) | 		opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize, *bwTUNIPv4Address)) | ||||||
| @@ -107,7 +109,7 @@ func getOverallStatus(p *prober.Prober) (o overallStatus) { | |||||||
| 			// Do not show probes that have not finished yet. | 			// Do not show probes that have not finished yet. | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if i.Result { | 		if i.Status == prober.ProbeStatusSucceeded { | ||||||
| 			o.addGoodf("%s: %s", p, i.Latency) | 			o.addGoodf("%s: %s", p, i.Latency) | ||||||
| 		} else { | 		} else { | ||||||
| 			o.addBadf("%s: %s", p, i.Error) | 			o.addBadf("%s: %s", p, i.Error) | ||||||
|   | |||||||
							
								
								
									
										213
									
								
								prober/derp.go
									
									
									
									
									
								
							
							
						
						
									
										213
									
								
								prober/derp.go
									
									
									
									
									
								
							| @@ -8,6 +8,7 @@ import ( | |||||||
| 	"cmp" | 	"cmp" | ||||||
| 	"context" | 	"context" | ||||||
| 	crand "crypto/rand" | 	crand "crypto/rand" | ||||||
|  | 	"encoding/binary" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"expvar" | 	"expvar" | ||||||
| @@ -37,6 +38,7 @@ import ( | |||||||
| 	"tailscale.com/tailcfg" | 	"tailscale.com/tailcfg" | ||||||
| 	"tailscale.com/types/key" | 	"tailscale.com/types/key" | ||||||
| 	"tailscale.com/types/logger" | 	"tailscale.com/types/logger" | ||||||
|  | 	"tailscale.com/util/circularqueue" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // derpProber dynamically manages several probes for each DERP server | // derpProber dynamically manages several probes for each DERP server | ||||||
| @@ -53,6 +55,9 @@ type derpProber struct { | |||||||
| 	bwProbeSize     int64 | 	bwProbeSize     int64 | ||||||
| 	bwTUNIPv4Prefix *netip.Prefix // or nil to not use TUN | 	bwTUNIPv4Prefix *netip.Prefix // or nil to not use TUN | ||||||
| 
 | 
 | ||||||
|  | 	// Optional queuing delay probing. | ||||||
|  | 	qdProbe bool | ||||||
|  | 
 | ||||||
| 	// Optionally restrict probes to a single regionCode. | 	// Optionally restrict probes to a single regionCode. | ||||||
| 	regionCode string | 	regionCode string | ||||||
| 
 | 
 | ||||||
| @@ -64,6 +69,7 @@ type derpProber struct { | |||||||
| 	udpProbeFn  func(string, int) ProbeClass | 	udpProbeFn  func(string, int) ProbeClass | ||||||
| 	meshProbeFn func(string, string) ProbeClass | 	meshProbeFn func(string, string) ProbeClass | ||||||
| 	bwProbeFn   func(string, string, int64) ProbeClass | 	bwProbeFn   func(string, string, int64) ProbeClass | ||||||
|  | 	qdProbeFn   func(string, string) ProbeClass | ||||||
| 
 | 
 | ||||||
| 	sync.Mutex | 	sync.Mutex | ||||||
| 	lastDERPMap   *tailcfg.DERPMap | 	lastDERPMap   *tailcfg.DERPMap | ||||||
| @@ -93,6 +99,13 @@ func WithBandwidthProbing(interval time.Duration, size int64, tunAddress string) | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // WithQueuingDelayProbing enables/disables queuing delay probing. | ||||||
|  | func WithQueuingDelayProbing(qdProbe bool) DERPOpt { | ||||||
|  | 	return func(d *derpProber) { | ||||||
|  | 		d.qdProbe = qdProbe | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // WithMeshProbing enables mesh probing. When enabled, a small message will be | // WithMeshProbing enables mesh probing. When enabled, a small message will be | ||||||
| // transferred through each DERP server and each pair of DERP servers. | // transferred through each DERP server and each pair of DERP servers. | ||||||
| func WithMeshProbing(interval time.Duration) DERPOpt { | func WithMeshProbing(interval time.Duration) DERPOpt { | ||||||
| @@ -147,6 +160,7 @@ func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) { | |||||||
| 	d.udpProbeFn = d.ProbeUDP | 	d.udpProbeFn = d.ProbeUDP | ||||||
| 	d.meshProbeFn = d.probeMesh | 	d.meshProbeFn = d.probeMesh | ||||||
| 	d.bwProbeFn = d.probeBandwidth | 	d.bwProbeFn = d.probeBandwidth | ||||||
|  | 	d.qdProbeFn = d.probeQueuingDelay | ||||||
| 	return d, nil | 	return d, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @@ -213,7 +227,7 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { | |||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				if d.bwInterval > 0 && d.bwProbeSize > 0 { | 				if d.bwInterval != 0 && d.bwProbeSize > 0 { | ||||||
| 					n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name) | 					n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name) | ||||||
| 					wantProbes[n] = true | 					wantProbes[n] = true | ||||||
| 					if d.probes[n] == nil { | 					if d.probes[n] == nil { | ||||||
| @@ -225,6 +239,15 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { | |||||||
| 						d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) | 						d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
|  | 
 | ||||||
|  | 				if d.qdProbe { | ||||||
|  | 					n := fmt.Sprintf("derp/%s/%s/%s/qd", region.RegionCode, server.Name, to.Name) | ||||||
|  | 					wantProbes[n] = true | ||||||
|  | 					if d.probes[n] == nil { | ||||||
|  | 						log.Printf("adding DERP queuing delay probe for %s->%s (%s)", server.Name, to.Name, region.RegionName) | ||||||
|  | 						d.probes[n] = d.p.Run(n, -10*time.Second, labels, d.qdProbeFn(server.Name, to.Name)) | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -240,7 +263,7 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // probeMesh returs a probe class that sends a test packet through a pair of DERP | // probeMesh returns a probe class that sends a test packet through a pair of DERP | ||||||
| // servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to' | // servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to' | ||||||
| // are expected to be names (DERPNode.Name) of two DERP servers in the same region. | // are expected to be names (DERPNode.Name) of two DERP servers in the same region. | ||||||
| func (d *derpProber) probeMesh(from, to string) ProbeClass { | func (d *derpProber) probeMesh(from, to string) ProbeClass { | ||||||
| @@ -263,7 +286,7 @@ func (d *derpProber) probeMesh(from, to string) ProbeClass { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // probeBandwidth returs a probe class that sends a payload of a given size | // probeBandwidth returns a probe class that sends a payload of a given size | ||||||
| // through a pair of DERP servers (or just one server, if 'from' and 'to' are | // through a pair of DERP servers (or just one server, if 'from' and 'to' are | ||||||
| // the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two | // the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two | ||||||
| // DERP servers in the same region. | // DERP servers in the same region. | ||||||
| @@ -295,6 +318,188 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // probeQueuingDelay returns a probe class that continuously sends packets | ||||||
|  | // through a pair of DERP servers (or just one server, if 'from' and 'to' are | ||||||
|  | // the same) at a rate of 10 packets per second in order to measure queuing | ||||||
|  | // delays. 'from' and 'to' are expected to be names (DERPNode.Name) of two DERP | ||||||
|  | // servers in the same region. | ||||||
|  | func (d *derpProber) probeQueuingDelay(from, to string) ProbeClass { | ||||||
|  | 	derpPath := "mesh" | ||||||
|  | 	if from == to { | ||||||
|  | 		derpPath = "single" | ||||||
|  | 	} | ||||||
|  | 	var packetsDropped expvar.Float | ||||||
|  | 	qdh := queuingDelayHistogram{ | ||||||
|  | 		buckets: make(map[float64]uint64, len(qdhBuckets)), | ||||||
|  | 	} | ||||||
|  | 	return ProbeClass{ | ||||||
|  | 		Probe: func(ctx context.Context) error { | ||||||
|  | 			fromN, toN, err := d.getNodePair(from, to) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 			return derpProbeQueuingDelay(ctx, d.lastDERPMap, fromN, toN, &packetsDropped, &qdh) | ||||||
|  | 		}, | ||||||
|  | 		Class:  "derp_qd", | ||||||
|  | 		Labels: Labels{"derp_path": derpPath}, | ||||||
|  | 		Metrics: func(l prometheus.Labels) []prometheus.Metric { | ||||||
|  | 			qdh.mx.Lock() | ||||||
|  | 			result := []prometheus.Metric{ | ||||||
|  | 				prometheus.MustNewConstMetric(prometheus.NewDesc("derp_qd_probe_dropped_packets", "Total packets dropped", nil, l), prometheus.CounterValue, float64(packetsDropped.Value())), | ||||||
|  | 				prometheus.MustNewConstHistogram(prometheus.NewDesc("derp_qd_probe_delays_seconds", "Distribution of queuing delays", nil, l), qdh.count, qdh.sum, qdh.buckets), | ||||||
|  | 			} | ||||||
|  | 			qdh.mx.Unlock() | ||||||
|  | 			return result | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // derpProbeQueuingDelay continuously sends data between two local DERP clients | ||||||
|  | // connected to two DERP servers in order to measure queuing delays. | ||||||
|  | func derpProbeQueuingDelay(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, packetsDropped *expvar.Float, qdh *queuingDelayHistogram) (err error) { | ||||||
|  | 	// This probe uses clients with isProber=false to avoid spamming the derper | ||||||
|  | 	// logs with every packet sent by the queuing delay probe. | ||||||
|  | 	fromc, err := newConn(ctx, dm, from, false) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	defer fromc.Close() | ||||||
|  | 	toc, err := newConn(ctx, dm, to, false) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	defer toc.Close() | ||||||
|  | 
 | ||||||
|  | 	// Wait a bit for from's node to hear about to existing on the | ||||||
|  | 	// other node in the region, in the case where the two nodes | ||||||
|  | 	// are different. | ||||||
|  | 	if from.Name != to.Name { | ||||||
|  | 		time.Sleep(100 * time.Millisecond) // pretty arbitrary | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := runDerpProbeQueuingDelayContinously(ctx, from, to, fromc, toc, packetsDropped, qdh); err != nil { | ||||||
|  | 		// Record pubkeys on failed probes to aid investigation. | ||||||
|  | 		return fmt.Errorf("%s -> %s: %w", | ||||||
|  | 			fromc.SelfPublicKey().ShortString(), | ||||||
|  | 			toc.SelfPublicKey().ShortString(), err) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, packetsDropped *expvar.Float, qdh *queuingDelayHistogram) error { | ||||||
|  | 	// Circular buffer to hold packet send times. It is sized to hold timings | ||||||
|  | 	// for up to 5 seconds when sending packets at a rate of 10 per second. | ||||||
|  | 	// It assumes that packets may be dropped, but that they will generally | ||||||
|  | 	// arrive in order. Packets arriving out of order will result in older | ||||||
|  | 	// packets being ignored, effectively overcounting the number of dropped | ||||||
|  | 	// packets. | ||||||
|  | 	sentTimes := circularqueue.NewFIFO(50, func(t time.Time) { | ||||||
|  | 		// If a sent time is evicted, that means we'll never record a timing | ||||||
|  | 		// for this packet, so we considered it dropped. | ||||||
|  | 		packetsDropped.Add(1) | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	// Send the packets. | ||||||
|  | 	sendErrC := make(chan error, 1) | ||||||
|  | 	pkt := make([]byte, 260) // the same size as a CallMeMaybe packet observed on a Tailscale client. | ||||||
|  | 	crand.Read(pkt) | ||||||
|  | 
 | ||||||
|  | 	go func() { | ||||||
|  | 		t := time.NewTicker(time.Second / 10) // 10 packets per second | ||||||
|  | 		defer t.Stop() | ||||||
|  | 
 | ||||||
|  | 		seq := 0 | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-ctx.Done(): | ||||||
|  | 				return | ||||||
|  | 			case <-t.C: | ||||||
|  | 				sentTimes.Push(time.Now()) | ||||||
|  | 				binary.BigEndian.PutUint64(pkt, uint64(seq)) | ||||||
|  | 				seq++ | ||||||
|  | 				if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil { | ||||||
|  | 					sendErrC <- fmt.Errorf("sending packet %w", err) | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// Receive the packets. | ||||||
|  | 	recvFinishedC := make(chan error, 1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(recvFinishedC) // to break out of 'select' below. | ||||||
|  | 		for { | ||||||
|  | 			m, err := toc.Recv() | ||||||
|  | 			if err != nil { | ||||||
|  | 				recvFinishedC <- err | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			switch v := m.(type) { | ||||||
|  | 			case derp.ReceivedPacket: | ||||||
|  | 				now := time.Now() | ||||||
|  | 				if v.Source != fromc.SelfPublicKey() { | ||||||
|  | 					recvFinishedC <- fmt.Errorf("got data packet from unexpected source, %v", v.Source) | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				seq := binary.BigEndian.Uint64(v.Data) | ||||||
|  | 				sent := sentTimes.Pop(int(seq)) | ||||||
|  | 				if sent == nil { | ||||||
|  | 					// No sent time found, ignore | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				qdh.add(now.Sub(*sent)) | ||||||
|  | 
 | ||||||
|  | 			case derp.KeepAliveMessage: | ||||||
|  | 				// Silently ignore. | ||||||
|  | 			default: | ||||||
|  | 				log.Printf("%v: ignoring Recv frame type %T", to.Name, v) | ||||||
|  | 				// Loop. | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	select { | ||||||
|  | 	case <-ctx.Done(): | ||||||
|  | 		return fmt.Errorf("timeout: %w", ctx.Err()) | ||||||
|  | 	case err := <-sendErrC: | ||||||
|  | 		return fmt.Errorf("error sending via %q: %w", from.Name, err) | ||||||
|  | 	case err := <-recvFinishedC: | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("error receiving from %q: %w", to.Name, err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // queuingDelayHistogram allows tracking a histogram of queuing delays | ||||||
|  | type queuingDelayHistogram struct { | ||||||
|  | 	count   uint64 | ||||||
|  | 	sum     float64 | ||||||
|  | 	buckets map[float64]uint64 | ||||||
|  | 	mx      sync.Mutex | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // qdhBuckets defines the buckets (in seconds) for the queuingDelayHistogram. | ||||||
|  | var qdhBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1} | ||||||
|  | 
 | ||||||
|  | func (qdh *queuingDelayHistogram) add(d time.Duration) { | ||||||
|  | 	qdh.mx.Lock() | ||||||
|  | 	defer qdh.mx.Unlock() | ||||||
|  | 
 | ||||||
|  | 	seconds := float64(d.Seconds()) | ||||||
|  | 	qdh.count++ | ||||||
|  | 	qdh.sum += seconds | ||||||
|  | 
 | ||||||
|  | 	for _, b := range qdhBuckets { | ||||||
|  | 		if seconds > b { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		qdh.buckets[b] += 1 | ||||||
|  | 		break | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // getNodePair returns DERPNode objects for two DERP servers based on their | // getNodePair returns DERPNode objects for two DERP servers based on their | ||||||
| // short names. | // short names. | ||||||
| func (d *derpProber) getNodePair(n1, n2 string) (ret1, ret2 *tailcfg.DERPNode, _ error) { | func (d *derpProber) getNodePair(n1, n2 string) (ret1, ret2 *tailcfg.DERPNode, _ error) { | ||||||
| @@ -573,6 +778,8 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc | |||||||
| 					recvc <- fmt.Errorf("got data packet %d from unexpected source, %v", idx, v.Source) | 					recvc <- fmt.Errorf("got data packet %d from unexpected source, %v", idx, v.Source) | ||||||
| 					return | 					return | ||||||
| 				} | 				} | ||||||
|  | 				// This assumes that the packets are received reliably and in order. | ||||||
|  | 				// The DERP protocol does not guarantee this, but this probe assumes it. | ||||||
| 				if got, want := v.Data, pkts[idx]; !bytes.Equal(got, want) { | 				if got, want := v.Data, pkts[idx]; !bytes.Equal(got, want) { | ||||||
| 					recvc <- fmt.Errorf("unexpected data packet %d (out of %d)", idx, len(pkts)) | 					recvc <- fmt.Errorf("unexpected data packet %d (out of %d)", idx, len(pkts)) | ||||||
| 					return | 					return | ||||||
|   | |||||||
| @@ -256,6 +256,11 @@ type Probe struct { | |||||||
| 	latencyHist *ring.Ring | 	latencyHist *ring.Ring | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // IsContinuous indicates that this is a continuous probe. | ||||||
|  | func (p *Probe) IsContinuous() bool { | ||||||
|  | 	return p.interval < 0 | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Close shuts down the Probe and unregisters it from its Prober. | // Close shuts down the Probe and unregisters it from its Prober. | ||||||
| // It is safe to Run a new probe of the same name after Close returns. | // It is safe to Run a new probe of the same name after Close returns. | ||||||
| func (p *Probe) Close() error { | func (p *Probe) Close() error { | ||||||
| @@ -288,6 +293,22 @@ func (p *Probe) loop() { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if p.IsContinuous() { | ||||||
|  | 		// Probe function is going to run continuously. | ||||||
|  | 		for { | ||||||
|  | 			p.run() | ||||||
|  | 			// Wait and then retry if probe fails. We use the inverse of the | ||||||
|  | 			// configured negative interval as our sleep period. | ||||||
|  | 			// TODO(percy):implement exponential backoff, possibly using logtail/backoff. | ||||||
|  | 			select { | ||||||
|  | 			case <-time.After(-1 * p.interval): | ||||||
|  | 				p.run() | ||||||
|  | 			case <-p.ctx.Done(): | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	p.tick = p.prober.newTicker(p.interval) | 	p.tick = p.prober.newTicker(p.interval) | ||||||
| 	defer p.tick.Stop() | 	defer p.tick.Stop() | ||||||
| 	for { | 	for { | ||||||
| @@ -323,9 +344,17 @@ func (p *Probe) run() (pi ProbeInfo, err error) { | |||||||
| 			p.recordEnd(err) | 			p.recordEnd(err) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  | 	ctx := p.ctx | ||||||
|  | 	if p.IsContinuous() { | ||||||
|  | 		p.mu.Lock() | ||||||
|  | 		p.lastErr = nil | ||||||
|  | 		p.mu.Unlock() | ||||||
|  | 	} else { | ||||||
| 		timeout := time.Duration(float64(p.interval) * 0.8) | 		timeout := time.Duration(float64(p.interval) * 0.8) | ||||||
| 	ctx, cancel := context.WithTimeout(p.ctx, timeout) | 		var cancel func() | ||||||
|  | 		ctx, cancel = context.WithTimeout(ctx, timeout) | ||||||
| 		defer cancel() | 		defer cancel() | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	err = p.probeClass.Probe(ctx) | 	err = p.probeClass.Probe(ctx) | ||||||
| 	p.recordEnd(err) | 	p.recordEnd(err) | ||||||
| @@ -365,6 +394,16 @@ func (p *Probe) recordEnd(err error) { | |||||||
| 	p.successHist = p.successHist.Next() | 	p.successHist = p.successHist.Next() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // ProbeStatus indicates the status of a probe. | ||||||
|  | type ProbeStatus string | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	ProbeStatusUnknown   = "unknown" | ||||||
|  | 	ProbeStatusRunning   = "running" | ||||||
|  | 	ProbeStatusFailed    = "failed" | ||||||
|  | 	ProbeStatusSucceeded = "succeeded" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
| // ProbeInfo is a snapshot of the configuration and state of a Probe. | // ProbeInfo is a snapshot of the configuration and state of a Probe. | ||||||
| type ProbeInfo struct { | type ProbeInfo struct { | ||||||
| 	Name            string | 	Name            string | ||||||
| @@ -374,7 +413,7 @@ type ProbeInfo struct { | |||||||
| 	Start           time.Time | 	Start           time.Time | ||||||
| 	End             time.Time | 	End             time.Time | ||||||
| 	Latency         time.Duration | 	Latency         time.Duration | ||||||
| 	Result          bool | 	Status          ProbeStatus | ||||||
| 	Error           string | 	Error           string | ||||||
| 	RecentResults   []bool | 	RecentResults   []bool | ||||||
| 	RecentLatencies []time.Duration | 	RecentLatencies []time.Duration | ||||||
| @@ -402,6 +441,10 @@ func (pb ProbeInfo) RecentMedianLatency() time.Duration { | |||||||
| 	return pb.RecentLatencies[len(pb.RecentLatencies)/2] | 	return pb.RecentLatencies[len(pb.RecentLatencies)/2] | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (pb ProbeInfo) Continuous() bool { | ||||||
|  | 	return pb.Interval < 0 | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // ProbeInfo returns the state of all probes. | // ProbeInfo returns the state of all probes. | ||||||
| func (p *Prober) ProbeInfo() map[string]ProbeInfo { | func (p *Prober) ProbeInfo() map[string]ProbeInfo { | ||||||
| 	out := map[string]ProbeInfo{} | 	out := map[string]ProbeInfo{} | ||||||
| @@ -429,9 +472,14 @@ func (probe *Probe) probeInfoLocked() ProbeInfo { | |||||||
| 		Labels:   probe.metricLabels, | 		Labels:   probe.metricLabels, | ||||||
| 		Start:    probe.start, | 		Start:    probe.start, | ||||||
| 		End:      probe.end, | 		End:      probe.end, | ||||||
| 		Result:   probe.succeeded, |  | ||||||
| 	} | 	} | ||||||
| 	if probe.lastErr != nil { | 	inf.Status = ProbeStatusUnknown | ||||||
|  | 	if probe.end.Before(probe.start) { | ||||||
|  | 		inf.Status = ProbeStatusRunning | ||||||
|  | 	} else if probe.succeeded { | ||||||
|  | 		inf.Status = ProbeStatusSucceeded | ||||||
|  | 	} else if probe.lastErr != nil { | ||||||
|  | 		inf.Status = ProbeStatusFailed | ||||||
| 		inf.Error = probe.lastErr.Error() | 		inf.Error = probe.lastErr.Error() | ||||||
| 	} | 	} | ||||||
| 	if probe.latency > 0 { | 	if probe.latency > 0 { | ||||||
| @@ -467,7 +515,7 @@ func (p *Prober) RunHandler(w http.ResponseWriter, r *http.Request) error { | |||||||
| 	p.mu.Lock() | 	p.mu.Lock() | ||||||
| 	probe, ok := p.probes[name] | 	probe, ok := p.probes[name] | ||||||
| 	p.mu.Unlock() | 	p.mu.Unlock() | ||||||
| 	if !ok { | 	if !ok || probe.IsContinuous() { | ||||||
| 		return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil) | 		return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @@ -531,7 +579,8 @@ func (p *Probe) Collect(ch chan<- prometheus.Metric) { | |||||||
| 	if !p.start.IsZero() { | 	if !p.start.IsZero() { | ||||||
| 		ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix())) | 		ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix())) | ||||||
| 	} | 	} | ||||||
| 	if p.end.IsZero() { | 	// For periodic probes that haven't ended, don't collect probe metrics yet. | ||||||
|  | 	if p.end.IsZero() && !p.IsContinuous() { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix())) | 	ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix())) | ||||||
|   | |||||||
| @@ -316,7 +316,7 @@ func TestProberProbeInfo(t *testing.T) { | |||||||
| 			Interval:        probeInterval, | 			Interval:        probeInterval, | ||||||
| 			Labels:          map[string]string{"class": "", "name": "probe1"}, | 			Labels:          map[string]string{"class": "", "name": "probe1"}, | ||||||
| 			Latency:         500 * time.Millisecond, | 			Latency:         500 * time.Millisecond, | ||||||
| 			Result:          true, | 			Status:          ProbeStatusSucceeded, | ||||||
| 			RecentResults:   []bool{true}, | 			RecentResults:   []bool{true}, | ||||||
| 			RecentLatencies: []time.Duration{500 * time.Millisecond}, | 			RecentLatencies: []time.Duration{500 * time.Millisecond}, | ||||||
| 		}, | 		}, | ||||||
| @@ -324,6 +324,7 @@ func TestProberProbeInfo(t *testing.T) { | |||||||
| 			Name:            "probe2", | 			Name:            "probe2", | ||||||
| 			Interval:        probeInterval, | 			Interval:        probeInterval, | ||||||
| 			Labels:          map[string]string{"class": "", "name": "probe2"}, | 			Labels:          map[string]string{"class": "", "name": "probe2"}, | ||||||
|  | 			Status:          ProbeStatusFailed, | ||||||
| 			Error:           "error2", | 			Error:           "error2", | ||||||
| 			RecentResults:   []bool{false}, | 			RecentResults:   []bool{false}, | ||||||
| 			RecentLatencies: nil, // no latency for failed probes | 			RecentLatencies: nil, // no latency for failed probes | ||||||
| @@ -349,7 +350,7 @@ func TestProbeInfoRecent(t *testing.T) { | |||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| 			name:                    "no_runs", | 			name:                    "no_runs", | ||||||
| 			wantProbeInfo:           ProbeInfo{}, | 			wantProbeInfo:           ProbeInfo{Status: ProbeStatusUnknown}, | ||||||
| 			wantRecentSuccessRatio:  0, | 			wantRecentSuccessRatio:  0, | ||||||
| 			wantRecentMedianLatency: 0, | 			wantRecentMedianLatency: 0, | ||||||
| 		}, | 		}, | ||||||
| @@ -358,7 +359,7 @@ func TestProbeInfoRecent(t *testing.T) { | |||||||
| 			results: []probeResult{{latency: 100 * time.Millisecond, err: nil}}, | 			results: []probeResult{{latency: 100 * time.Millisecond, err: nil}}, | ||||||
| 			wantProbeInfo: ProbeInfo{ | 			wantProbeInfo: ProbeInfo{ | ||||||
| 				Latency:         100 * time.Millisecond, | 				Latency:         100 * time.Millisecond, | ||||||
| 				Result:          true, | 				Status:          ProbeStatusSucceeded, | ||||||
| 				RecentResults:   []bool{true}, | 				RecentResults:   []bool{true}, | ||||||
| 				RecentLatencies: []time.Duration{100 * time.Millisecond}, | 				RecentLatencies: []time.Duration{100 * time.Millisecond}, | ||||||
| 			}, | 			}, | ||||||
| @@ -369,7 +370,7 @@ func TestProbeInfoRecent(t *testing.T) { | |||||||
| 			name:    "single_failure", | 			name:    "single_failure", | ||||||
| 			results: []probeResult{{latency: 100 * time.Millisecond, err: errors.New("error123")}}, | 			results: []probeResult{{latency: 100 * time.Millisecond, err: errors.New("error123")}}, | ||||||
| 			wantProbeInfo: ProbeInfo{ | 			wantProbeInfo: ProbeInfo{ | ||||||
| 				Result:          false, | 				Status:          ProbeStatusFailed, | ||||||
| 				RecentResults:   []bool{false}, | 				RecentResults:   []bool{false}, | ||||||
| 				RecentLatencies: nil, | 				RecentLatencies: nil, | ||||||
| 				Error:           "error123", | 				Error:           "error123", | ||||||
| @@ -390,7 +391,7 @@ func TestProbeInfoRecent(t *testing.T) { | |||||||
| 				{latency: 80 * time.Millisecond, err: nil}, | 				{latency: 80 * time.Millisecond, err: nil}, | ||||||
| 			}, | 			}, | ||||||
| 			wantProbeInfo: ProbeInfo{ | 			wantProbeInfo: ProbeInfo{ | ||||||
| 				Result:        true, | 				Status:        ProbeStatusSucceeded, | ||||||
| 				Latency:       80 * time.Millisecond, | 				Latency:       80 * time.Millisecond, | ||||||
| 				RecentResults: []bool{false, true, true, false, true, true, false, true}, | 				RecentResults: []bool{false, true, true, false, true, true, false, true}, | ||||||
| 				RecentLatencies: []time.Duration{ | 				RecentLatencies: []time.Duration{ | ||||||
| @@ -420,7 +421,7 @@ func TestProbeInfoRecent(t *testing.T) { | |||||||
| 				{latency: 110 * time.Millisecond, err: nil}, | 				{latency: 110 * time.Millisecond, err: nil}, | ||||||
| 			}, | 			}, | ||||||
| 			wantProbeInfo: ProbeInfo{ | 			wantProbeInfo: ProbeInfo{ | ||||||
| 				Result:        true, | 				Status:        ProbeStatusSucceeded, | ||||||
| 				Latency:       110 * time.Millisecond, | 				Latency:       110 * time.Millisecond, | ||||||
| 				RecentResults: []bool{true, true, true, true, true, true, true, true, true, true}, | 				RecentResults: []bool{true, true, true, true, true, true, true, true, true, true}, | ||||||
| 				RecentLatencies: []time.Duration{ | 				RecentLatencies: []time.Duration{ | ||||||
| @@ -483,7 +484,7 @@ func TestProberRunHandler(t *testing.T) { | |||||||
| 				ProbeInfo: ProbeInfo{ | 				ProbeInfo: ProbeInfo{ | ||||||
| 					Name:          "success", | 					Name:          "success", | ||||||
| 					Interval:      probeInterval, | 					Interval:      probeInterval, | ||||||
| 					Result:        true, | 					Status:        ProbeStatusSucceeded, | ||||||
| 					RecentResults: []bool{true, true}, | 					RecentResults: []bool{true, true}, | ||||||
| 				}, | 				}, | ||||||
| 				PreviousSuccessRatio: 1, | 				PreviousSuccessRatio: 1, | ||||||
| @@ -498,7 +499,7 @@ func TestProberRunHandler(t *testing.T) { | |||||||
| 				ProbeInfo: ProbeInfo{ | 				ProbeInfo: ProbeInfo{ | ||||||
| 					Name:          "failure", | 					Name:          "failure", | ||||||
| 					Interval:      probeInterval, | 					Interval:      probeInterval, | ||||||
| 					Result:        false, | 					Status:        ProbeStatusFailed, | ||||||
| 					Error:         "error123", | 					Error:         "error123", | ||||||
| 					RecentResults: []bool{false, false}, | 					RecentResults: []bool{false, false}, | ||||||
| 				}, | 				}, | ||||||
|   | |||||||
| @@ -62,7 +62,8 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc | |||||||
| 	return func(w http.ResponseWriter, r *http.Request) error { | 	return func(w http.ResponseWriter, r *http.Request) error { | ||||||
| 		type probeStatus struct { | 		type probeStatus struct { | ||||||
| 			ProbeInfo | 			ProbeInfo | ||||||
| 			TimeSinceLast time.Duration | 			TimeSinceLastStart time.Duration | ||||||
|  | 			TimeSinceLastEnd   time.Duration | ||||||
| 			Links              map[string]template.URL | 			Links              map[string]template.URL | ||||||
| 		} | 		} | ||||||
| 		vars := struct { | 		vars := struct { | ||||||
| @@ -81,12 +82,15 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc | |||||||
| 
 | 
 | ||||||
| 		for name, info := range p.ProbeInfo() { | 		for name, info := range p.ProbeInfo() { | ||||||
| 			vars.TotalProbes++ | 			vars.TotalProbes++ | ||||||
| 			if !info.Result { | 			if info.Error != "" { | ||||||
| 				vars.UnhealthyProbes++ | 				vars.UnhealthyProbes++ | ||||||
| 			} | 			} | ||||||
| 			s := probeStatus{ProbeInfo: info} | 			s := probeStatus{ProbeInfo: info} | ||||||
|  | 			if !info.Start.IsZero() { | ||||||
|  | 				s.TimeSinceLastStart = time.Since(info.Start).Truncate(time.Second) | ||||||
|  | 			} | ||||||
| 			if !info.End.IsZero() { | 			if !info.End.IsZero() { | ||||||
| 				s.TimeSinceLast = time.Since(info.End).Truncate(time.Second) | 				s.TimeSinceLastEnd = time.Since(info.End).Truncate(time.Second) | ||||||
| 			} | 			} | ||||||
| 			for textTpl, urlTpl := range params.probeLinks { | 			for textTpl, urlTpl := range params.probeLinks { | ||||||
| 				text, err := renderTemplate(textTpl, info) | 				text, err := renderTemplate(textTpl, info) | ||||||
|   | |||||||
| @@ -73,8 +73,9 @@ | |||||||
|             <th>Name</th> |             <th>Name</th> | ||||||
|             <th>Probe Class & Labels</th> |             <th>Probe Class & Labels</th> | ||||||
|             <th>Interval</th> |             <th>Interval</th> | ||||||
|             <th>Last Attempt</th> |             <th>Last Finished</th> | ||||||
|             <th>Success</th> |             <th>Last Started</th> | ||||||
|  |             <th>Status</th> | ||||||
|             <th>Latency</th> |             <th>Latency</th> | ||||||
|             <th>Last Error</th> |             <th>Last Error</th> | ||||||
|         </tr></thead> |         </tr></thead> | ||||||
| @@ -85,10 +86,12 @@ | |||||||
|                 {{$name}} |                 {{$name}} | ||||||
|                 {{range $text, $url := $probeInfo.Links}} |                 {{range $text, $url := $probeInfo.Links}} | ||||||
|                 <br/> |                 <br/> | ||||||
|  |                 {{if not $probeInfo.Continuous}} | ||||||
|                     <button onclick="location.href='{{$url}}';" type="button"> |                     <button onclick="location.href='{{$url}}';" type="button"> | ||||||
|                         {{$text}} |                         {{$text}} | ||||||
|                     </button> |                     </button> | ||||||
|                 {{end}} |                 {{end}} | ||||||
|  |                 {{end}} | ||||||
|             </td> |             </td> | ||||||
|             <td>{{$probeInfo.Class}}<br/> |             <td>{{$probeInfo.Class}}<br/> | ||||||
|                 <div class="small"> |                 <div class="small"> | ||||||
| @@ -97,28 +100,48 @@ | |||||||
|                 {{end}} |                 {{end}} | ||||||
|                 </div> |                 </div> | ||||||
|             </td> |             </td> | ||||||
|             <td>{{$probeInfo.Interval}}</td> |             <td> | ||||||
|             <td data-sort="{{$probeInfo.TimeSinceLast.Milliseconds}}"> |                 {{if $probeInfo.Continuous}} | ||||||
|                 {{if $probeInfo.TimeSinceLast}} |                     Continuous | ||||||
|                     {{$probeInfo.TimeSinceLast.String}} ago<br/> |                 {{else}} | ||||||
|  |                     {{$probeInfo.Interval}} | ||||||
|  |                 {{end}} | ||||||
|  |             </td> | ||||||
|  |             <td data-sort="{{$probeInfo.TimeSinceLastEnd.Milliseconds}}"> | ||||||
|  |                 {{if $probeInfo.TimeSinceLastEnd}} | ||||||
|  |                     {{$probeInfo.TimeSinceLastEnd.String}} ago<br/> | ||||||
|                     <span class="small">{{$probeInfo.End.Format "2006-01-02T15:04:05Z07:00"}}</span> |                     <span class="small">{{$probeInfo.End.Format "2006-01-02T15:04:05Z07:00"}}</span> | ||||||
|                 {{else}} |                 {{else}} | ||||||
|                     Never |                     Never | ||||||
|                 {{end}} |                 {{end}} | ||||||
|             </td> |             </td> | ||||||
|             <td> |             <td data-sort="{{$probeInfo.TimeSinceLastStart.Milliseconds}}"> | ||||||
|                 {{if $probeInfo.Result}} |                 {{if $probeInfo.TimeSinceLastStart}} | ||||||
|                     {{$probeInfo.Result}} |                     {{$probeInfo.TimeSinceLastStart.String}} ago<br/> | ||||||
|  |                     <span class="small">{{$probeInfo.Start.Format "2006-01-02T15:04:05Z07:00"}}</span> | ||||||
|                 {{else}} |                 {{else}} | ||||||
|                     <span class="error">{{$probeInfo.Result}}</span> |                     Never | ||||||
|  |                 {{end}} | ||||||
|  |             </td> | ||||||
|  |             <td> | ||||||
|  |                 {{if $probeInfo.Error}} | ||||||
|  |                     <span class="error">{{$probeInfo.Status}}</span> | ||||||
|  |                 {{else}} | ||||||
|  |                     {{$probeInfo.Status}} | ||||||
|                 {{end}}<br/> |                 {{end}}<br/> | ||||||
|  |                 {{if not $probeInfo.Continuous}} | ||||||
|                     <div class="small">Recent: {{$probeInfo.RecentResults}}</div> |                     <div class="small">Recent: {{$probeInfo.RecentResults}}</div> | ||||||
|                     <div class="small">Mean: {{$probeInfo.RecentSuccessRatio}}</div> |                     <div class="small">Mean: {{$probeInfo.RecentSuccessRatio}}</div> | ||||||
|  |                 {{end}} | ||||||
|             </td> |             </td> | ||||||
|             <td data-sort="{{$probeInfo.Latency.Milliseconds}}"> |             <td data-sort="{{$probeInfo.Latency.Milliseconds}}"> | ||||||
|  |                 {{if $probeInfo.Continuous}} | ||||||
|  |                     n/a | ||||||
|  |                 {{else}} | ||||||
|                     {{$probeInfo.Latency.String}} |                     {{$probeInfo.Latency.String}} | ||||||
|                     <div class="small">Recent: {{$probeInfo.RecentLatencies}}</div> |                     <div class="small">Recent: {{$probeInfo.RecentLatencies}}</div> | ||||||
|                     <div class="small">Median: {{$probeInfo.RecentMedianLatency}}</div> |                     <div class="small">Median: {{$probeInfo.RecentMedianLatency}}</div> | ||||||
|  |                 {{end}} | ||||||
|             </td> |             </td> | ||||||
|             <td class="small">{{$probeInfo.Error}}</td> |             <td class="small">{{$probeInfo.Error}}</td> | ||||||
|         </tr> |         </tr> | ||||||
|   | |||||||
							
								
								
									
										94
									
								
								util/circularqueue/circularqueue.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										94
									
								
								util/circularqueue/circularqueue.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,94 @@ | |||||||
|  | // Copyright (c) Tailscale Inc & AUTHORS | ||||||
|  | // SPDX-License-Identifier: BSD-3-Clause | ||||||
|  | 
 | ||||||
|  | // package circularqueue provides circular queues. | ||||||
|  | package circularqueue | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"math" | ||||||
|  | 	"sync" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	// Head is the index of the head of a queue. | ||||||
|  | 	Head = -1 | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // FIFO is a bounded queue that acts as if it has infinite depth. When an item | ||||||
|  | // is added to an already full queue, the oldest item in the queue is evicted | ||||||
|  | // to make room. | ||||||
|  | // | ||||||
|  | // Items in the queue are indexed, such that one can pop specific items by | ||||||
|  | // index. If an item is popped that is not at the head of the queue, all items | ||||||
|  | // up to the popped item are immediately evicted. | ||||||
|  | type FIFO[T any] struct { | ||||||
|  | 	// mu protects all of the below fields | ||||||
|  | 	mu sync.Mutex | ||||||
|  | 
 | ||||||
|  | 	capacity int | ||||||
|  | 	head     int | ||||||
|  | 	tail     int | ||||||
|  | 	onEvict  func(T) | ||||||
|  | 	items    []T | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // NewFIFO constructs a new [FIFO] queue with the given capacity and onEvict | ||||||
|  | // callback. | ||||||
|  | func NewFIFO[T any](capacity int, onEvict func(T)) *FIFO[T] { | ||||||
|  | 	return &FIFO[T]{ | ||||||
|  | 		capacity: capacity, | ||||||
|  | 		tail:     -1, | ||||||
|  | 		onEvict:  onEvict, | ||||||
|  | 		items:    make([]T, capacity), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Push pushes a new item onto the queue, evicting the item at the head if the | ||||||
|  | // queue is at capacity. If the number of items pushed to the queue reaches | ||||||
|  | // [math.MaxInt], this will panic with "FIFO queue sequence number exhausted". | ||||||
|  | func (q *FIFO[T]) Push(item T) { | ||||||
|  | 	q.mu.Lock() | ||||||
|  | 	defer q.mu.Unlock() | ||||||
|  | 
 | ||||||
|  | 	q.tail++ | ||||||
|  | 	if q.tail == math.MaxInt { | ||||||
|  | 		// We don't currently handle wrapping indexes | ||||||
|  | 		panic("FIFO queue sequence number exhausted") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if q.tail-q.head >= q.capacity { | ||||||
|  | 		q.onEvict(q.itemAtLocked(q.head)) | ||||||
|  | 		q.head++ | ||||||
|  | 	} | ||||||
|  | 	q.items[q.tail%q.capacity] = item | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Pop removes the item at idx. If idx is past the tail or before the head of | ||||||
|  | // this queue, Pop returns nil. If an item at idx is available, all items in | ||||||
|  | // the queue at indices less than idx are immediately evicted. If idx <= [Head], | ||||||
|  | // this pops the item at the head of the queue. | ||||||
|  | func (q *FIFO[T]) Pop(idx int) *T { | ||||||
|  | 	q.mu.Lock() | ||||||
|  | 	defer q.mu.Unlock() | ||||||
|  | 
 | ||||||
|  | 	if idx < 0 { | ||||||
|  | 		idx = q.head | ||||||
|  | 	} else if idx < q.head { | ||||||
|  | 		return nil | ||||||
|  | 	} else if idx > q.tail { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Evict items if necessary | ||||||
|  | 	for i := q.head; i < idx; i++ { | ||||||
|  | 		q.onEvict(q.itemAtLocked(i)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	q.head = idx + 1 | ||||||
|  | 	item := q.itemAtLocked(idx) | ||||||
|  | 	return &item | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (q *FIFO[T]) itemAtLocked(idx int) T { | ||||||
|  | 	return q.items[idx%q.capacity] | ||||||
|  | } | ||||||
							
								
								
									
										47
									
								
								util/circularqueue/circularqueue_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								util/circularqueue/circularqueue_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | |||||||
|  | // Copyright (c) Tailscale Inc & AUTHORS | ||||||
|  | // SPDX-License-Identifier: BSD-3-Clause | ||||||
|  | 
 | ||||||
|  | package circularqueue | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"testing" | ||||||
|  | 
 | ||||||
|  | 	"github.com/google/go-cmp/cmp" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func TestFIFO(t *testing.T) { | ||||||
|  | 	var evicted []int | ||||||
|  | 	q := NewFIFO(3, func(item int) { | ||||||
|  | 		evicted = append(evicted, item) | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	assertPop := func(idx int, want int) { | ||||||
|  | 		t.Helper() | ||||||
|  | 		got := q.Pop(idx) | ||||||
|  | 		var _want *int | ||||||
|  | 		if want >= 0 { | ||||||
|  | 			_want = &want | ||||||
|  | 		} | ||||||
|  | 		if diff := cmp.Diff(got, _want); diff != "" { | ||||||
|  | 			t.Fatalf("unexpected item (-got +want):\n%s", diff) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	q.Push(1) | ||||||
|  | 	q.Push(2) | ||||||
|  | 	q.Push(3) | ||||||
|  | 	assertPop(3, -1) | ||||||
|  | 	assertPop(Head, 1) | ||||||
|  | 	assertPop(2, 3) // Should evict 2 | ||||||
|  | 	assertPop(2, -1) | ||||||
|  | 
 | ||||||
|  | 	q.Push(4) | ||||||
|  | 	q.Push(5) | ||||||
|  | 	q.Push(6) | ||||||
|  | 	assertPop(5, 6) // Should evict 4 and 5 | ||||||
|  | 
 | ||||||
|  | 	if diff := cmp.Diff(evicted, []int{2, 4, 5}); diff != "" { | ||||||
|  | 		t.Fatalf("unexpected evicted (-got +want):\n%s", diff) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user
	 Percy Wegmann
					Percy Wegmann