cmd/derpprobe,prober: add ability to continuously probe bandwidth and track queuing delay

Setting the bw-interval to a negative value enables continuous bandwidth probing.
When probing bandwidth continuously, the prober also tracks packet transit times,
which can be used as a measure of queuing delay.

Updates tailscale/corp#24522

Signed-off-by: Percy Wegmann <percy@tailscale.com>
This commit is contained in:
Percy Wegmann 2024-11-15 17:48:49 -06:00
parent 1355f622be
commit feaf4743da
No known key found for this signature in database
GPG Key ID: 29D8CDEB4C13D48B
8 changed files with 389 additions and 67 deletions

View File

@ -27,7 +27,7 @@ var (
meshInterval = flag.Duration("mesh-interval", 15*time.Second, "mesh probe interval") meshInterval = flag.Duration("mesh-interval", 15*time.Second, "mesh probe interval")
stunInterval = flag.Duration("stun-interval", 15*time.Second, "STUN probe interval") stunInterval = flag.Duration("stun-interval", 15*time.Second, "STUN probe interval")
tlsInterval = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval") tlsInterval = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval")
bwInterval = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing)") bwInterval = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing, > 0 = periodic bandwidth probing, < 0 = continuous bandwidth probing)")
bwSize = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size") bwSize = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size")
regionCode = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed") regionCode = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed")
) )
@ -45,7 +45,7 @@ func main() {
prober.WithSTUNProbing(*stunInterval), prober.WithSTUNProbing(*stunInterval),
prober.WithTLSProbing(*tlsInterval), prober.WithTLSProbing(*tlsInterval),
} }
if *bwInterval > 0 { if *bwInterval != 0 {
opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize)) opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize))
} }
if *regionCode != "" { if *regionCode != "" {
@ -106,7 +106,7 @@ func getOverallStatus(p *prober.Prober) (o overallStatus) {
// Do not show probes that have not finished yet. // Do not show probes that have not finished yet.
continue continue
} }
if i.Result { if i.Status == prober.ProbeStatusSucceeded {
o.addGoodf("%s: %s", p, i.Latency) o.addGoodf("%s: %s", p, i.Latency)
} else { } else {
o.addBadf("%s: %s", p, i.Error) o.addBadf("%s: %s", p, i.Error)

View File

@ -748,7 +748,7 @@ func (c *Client) dialNode(ctx context.Context, n *tailcfg.DERPNode) (net.Conn, e
select { select {
case <-ctx.Done(): case <-ctx.Done():
// Either user canceled original context, // Either user canceled original context,
// it timed out, or the v6 dial succeeded. // if timed out, or the v6 dial succeeded.
t.Stop() t.Stop()
return return
case <-tChannel: case <-tChannel:
@ -757,6 +757,9 @@ func (c *Client) dialNode(ctx context.Context, n *tailcfg.DERPNode) (net.Conn, e
} }
dst := cmp.Or(dstPrimary, n.HostName) dst := cmp.Or(dstPrimary, n.HostName)
port := "443" port := "443"
if !c.useHTTPS() {
port = "3340"
}
if n.DERPPort != 0 { if n.DERPPort != 0 {
port = fmt.Sprint(n.DERPPort) port = fmt.Sprint(n.DERPPort)
} }

View File

@ -8,6 +8,7 @@ import (
"cmp" "cmp"
"context" "context"
crand "crypto/rand" crand "crypto/rand"
"encoding/binary"
"encoding/json" "encoding/json"
"errors" "errors"
"expvar" "expvar"
@ -15,10 +16,12 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"reflect"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"unsafe"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"tailscale.com/client/tailscale" "tailscale.com/client/tailscale"
@ -55,7 +58,7 @@ type derpProber struct {
tlsProbeFn func(string) ProbeClass tlsProbeFn func(string) ProbeClass
udpProbeFn func(string, int) ProbeClass udpProbeFn func(string, int) ProbeClass
meshProbeFn func(string, string) ProbeClass meshProbeFn func(string, string) ProbeClass
bwProbeFn func(string, string, int64) ProbeClass bwProbeFn func(string, string, int64, bool) ProbeClass
sync.Mutex sync.Mutex
lastDERPMap *tailcfg.DERPMap lastDERPMap *tailcfg.DERPMap
@ -69,6 +72,11 @@ type DERPOpt func(*derpProber)
// WithBandwidthProbing enables bandwidth probing. When enabled, a payload of // WithBandwidthProbing enables bandwidth probing. When enabled, a payload of
// `size` bytes will be regularly transferred through each DERP server, and each // `size` bytes will be regularly transferred through each DERP server, and each
// pair of DERP servers in every region. // pair of DERP servers in every region.
//
// If `size` is less than 0, this will trigger the probe to continuously send
// data as fast as possible. Continuous mode can consume huge amounts of
// network bandwidth, so it is best used when probing DERP servers on the same
// LAN/datacenter.
func WithBandwidthProbing(interval time.Duration, size int64) DERPOpt { func WithBandwidthProbing(interval time.Duration, size int64) DERPOpt {
return func(d *derpProber) { return func(d *derpProber) {
d.bwInterval = interval d.bwInterval = interval
@ -94,7 +102,7 @@ func WithSTUNProbing(interval time.Duration) DERPOpt {
// WithTLSProbing enables TLS probing that will check TLS certificate on port // WithTLSProbing enables TLS probing that will check TLS certificate on port
// 443 of each DERP server every `interval`. // 443 of each DERP server every `interval`.
func WithTLSProbing(interval time.Duration) DERPOpt { func WithTLSProbing(interval time.Duration, regions ...string) DERPOpt {
return func(d *derpProber) { return func(d *derpProber) {
d.tlsInterval = interval d.tlsInterval = interval
} }
@ -196,12 +204,12 @@ func (d *derpProber) probeMapFn(ctx context.Context) error {
} }
} }
if d.bwInterval > 0 && d.bwProbeSize > 0 { if d.bwInterval != 0 && d.bwProbeSize > 0 {
n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name) n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name)
wantProbes[n] = true wantProbes[n] = true
if d.probes[n] == nil { if d.probes[n] == nil {
log.Printf("adding DERP bandwidth probe for %s->%s (%s) %v bytes every %v", server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval) log.Printf("adding DERP bandwidth probe for %s->%s (%s) %v bytes every %v", server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval)
d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize, d.bwInterval < 0))
} }
} }
} }
@ -246,11 +254,19 @@ func (d *derpProber) probeMesh(from, to string) ProbeClass {
// through a pair of DERP servers (or just one server, if 'from' and 'to' are // through a pair of DERP servers (or just one server, if 'from' and 'to' are
// the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two // the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two
// DERP servers in the same region. // DERP servers in the same region.
func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { func (d *derpProber) probeBandwidth(from, to string, size int64, continuous bool) ProbeClass {
derpPath := "mesh" derpPath := "mesh"
if from == to { if from == to {
derpPath = "single" derpPath = "single"
} }
if continuous {
return d.probeBandwidthContinuous(from, to, derpPath, size)
} else {
return d.probeBandwidthOnce(from, to, derpPath, size)
}
}
func (d *derpProber) probeBandwidthOnce(from, to, derpPath string, size int64) ProbeClass {
var transferTime expvar.Float var transferTime expvar.Float
return ProbeClass{ return ProbeClass{
Probe: func(ctx context.Context) error { Probe: func(ctx context.Context) error {
@ -258,7 +274,7 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass {
if err != nil { if err != nil {
return err return err
} }
return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTime) return derpProbeBandwidthOnce(ctx, d.lastDERPMap, fromN, toN, size, &transferTime)
}, },
Class: "derp_bw", Class: "derp_bw",
Labels: Labels{"derp_path": derpPath}, Labels: Labels{"derp_path": derpPath},
@ -271,6 +287,76 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass {
} }
} }
func (d *derpProber) probeBandwidthContinuous(from, to, derpPath string, size int64) ProbeClass {
var bytesTransferred expvar.Float
var transferTime expvar.Float
var qdh queuingDelayHistogram
qdh.reset()
return ProbeClass{
Probe: func(ctx context.Context) error {
fromN, toN, err := d.getNodePair(from, to)
if err != nil {
return err
}
return derpProbeBandwidthContinuous(ctx, d.lastDERPMap, fromN, toN, size, &bytesTransferred, &transferTime, &qdh)
},
Class: "derp_bw",
Labels: Labels{"derp_path": derpPath},
Metrics: func(l prometheus.Labels) []prometheus.Metric {
qdh.mx.Lock()
result := []prometheus.Metric{
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_continuous_size_bytes", "Payload size of the bandwidth prober", nil, l), prometheus.GaugeValue, float64(size)),
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_continuous_bytes_total", "Total data transferred", nil, l), prometheus.CounterValue, float64(bytesTransferred.Value())),
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_continuous_transfer_time_seconds_total", "Time it took to transfer data", nil, l), prometheus.CounterValue, transferTime.Value()),
prometheus.MustNewConstHistogram(prometheus.NewDesc("derp_bw_probe_continuous_queuing_delays_seconds", "Distribution of queuing delays", nil, l), qdh.count, qdh.sum, qdh.buckets),
}
qdh.resetLocked()
qdh.mx.Unlock()
return result
},
}
}
// queuingDelayHistogram allows tracking a histogram of queuing delays
type queuingDelayHistogram struct {
count uint64
sum float64
buckets map[float64]uint64
mx sync.Mutex
}
// qdhBuckets defines the buckets (in seconds) for the queuingDelayHistogram.
var qdhBuckets = []float64{0.0001, 0.0002, 0.0005}
func (qdh *queuingDelayHistogram) reset() {
qdh.mx.Lock()
defer qdh.mx.Unlock()
qdh.resetLocked()
}
func (qdh *queuingDelayHistogram) resetLocked() {
qdh.count = 0
qdh.sum = 0
qdh.buckets = make(map[float64]uint64, len(qdhBuckets))
}
func (qdh *queuingDelayHistogram) add(d time.Duration) {
qdh.mx.Lock()
defer qdh.mx.Unlock()
seconds := float64(d.Seconds())
qdh.count++
qdh.sum += seconds
for _, b := range qdhBuckets {
if seconds > b {
continue
}
qdh.buckets[b] += 1
break
}
}
// getNodePair returns DERPNode objects for two DERP servers based on their // getNodePair returns DERPNode objects for two DERP servers based on their
// short names. // short names.
func (d *derpProber) getNodePair(n1, n2 string) (ret1, ret2 *tailcfg.DERPNode, _ error) { func (d *derpProber) getNodePair(n1, n2 string) (ret1, ret2 *tailcfg.DERPNode, _ error) {
@ -411,9 +497,9 @@ func derpProbeUDP(ctx context.Context, ipStr string, port int) error {
return nil return nil
} }
// derpProbeBandwidth sends a payload of a given size between two local // derpProbeBandwidthOnce sends a payload of a given size between two local
// DERP clients connected to two DERP servers. // DERP clients connected to two DERP servers.
func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTime *expvar.Float) (err error) { func derpProbeBandwidthOnce(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTime *expvar.Float) (err error) {
// This probe uses clients with isProber=false to avoid spamming the derper logs with every packet // This probe uses clients with isProber=false to avoid spamming the derper logs with every packet
// sent by the bandwidth probe. // sent by the bandwidth probe.
fromc, err := newConn(ctx, dm, from, false) fromc, err := newConn(ctx, dm, from, false)
@ -437,7 +523,39 @@ func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tail
start := time.Now() start := time.Now()
defer func() { transferTime.Add(time.Since(start).Seconds()) }() defer func() { transferTime.Add(time.Since(start).Seconds()) }()
if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, size); err != nil { if err := runDerpProbeNodePairOnce(ctx, from, to, fromc, toc, size); err != nil {
// Record pubkeys on failed probes to aid investigation.
return fmt.Errorf("%s -> %s: %w",
fromc.SelfPublicKey().ShortString(),
toc.SelfPublicKey().ShortString(), err)
}
return nil
}
// derpProbeBandwidthContinuous continuously sends data between two local
// DERP clients connected to two DERP servers.
func derpProbeBandwidthContinuous(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, bytesTransferred, transferTime *expvar.Float, qdh *queuingDelayHistogram) (err error) {
// This probe uses clients with isProber=false to avoid spamming the derper logs with every packet
// sent by the bandwidth probe.
fromc, err := newConn(ctx, dm, from, false)
if err != nil {
return err
}
defer fromc.Close()
toc, err := newConn(ctx, dm, to, false)
if err != nil {
return err
}
defer toc.Close()
// Wait a bit for from's node to hear about to existing on the
// other node in the region, in the case where the two nodes
// are different.
if from.Name != to.Name {
time.Sleep(100 * time.Millisecond) // pretty arbitrary
}
if err := runDerpProbeNodePairContinuously(ctx, from, to, fromc, toc, size, bytesTransferred, transferTime, qdh); err != nil {
// Record pubkeys on failed probes to aid investigation. // Record pubkeys on failed probes to aid investigation.
return fmt.Errorf("%s -> %s: %w", return fmt.Errorf("%s -> %s: %w",
fromc.SelfPublicKey().ShortString(), fromc.SelfPublicKey().ShortString(),
@ -468,7 +586,7 @@ func derpProbeNodePair(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailc
} }
const meshProbePacketSize = 8 const meshProbePacketSize = 8
if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, meshProbePacketSize); err != nil { if err := runDerpProbeNodePairOnce(ctx, from, to, fromc, toc, meshProbePacketSize); err != nil {
// Record pubkeys on failed probes to aid investigation. // Record pubkeys on failed probes to aid investigation.
return fmt.Errorf("%s -> %s: %w", return fmt.Errorf("%s -> %s: %w",
fromc.SelfPublicKey().ShortString(), fromc.SelfPublicKey().ShortString(),
@ -504,10 +622,10 @@ func packetsForSize(size int64) [][]byte {
return pkts return pkts
} }
// runDerpProbeNodePair takes two DERP clients (fromc and toc) connected to two // runDerpProbeNodePairOnce takes two DERP clients (fromc and toc) connected to two
// DERP servers (from and to) and sends a test payload of a given size from one // DERP servers (from and to) and sends a test payload of a given size from one
// to another. // to another.
func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64) error { func runDerpProbeNodePairOnce(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64) error {
// To avoid derper dropping enqueued packets, limit the number of packets in flight. // To avoid derper dropping enqueued packets, limit the number of packets in flight.
// The value here is slightly smaller than perClientSendQueueDepth in derp_server.go // The value here is slightly smaller than perClientSendQueueDepth in derp_server.go
inFlight := syncs.NewSemaphore(30) inFlight := syncs.NewSemaphore(30)
@ -544,6 +662,8 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc
recvc <- fmt.Errorf("got data packet %d from unexpected source, %v", idx, v.Source) recvc <- fmt.Errorf("got data packet %d from unexpected source, %v", idx, v.Source)
return return
} }
// This assumes that the packets are received reliably and in order.
// The DERP protocol does not guarantee this, but this probe assumes it.
if got, want := v.Data, pkts[idx]; !bytes.Equal(got, want) { if got, want := v.Data, pkts[idx]; !bytes.Equal(got, want) {
recvc <- fmt.Errorf("unexpected data packet %d (out of %d)", idx, len(pkts)) recvc <- fmt.Errorf("unexpected data packet %d (out of %d)", idx, len(pkts))
return return
@ -577,6 +697,91 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc
return nil return nil
} }
func runDerpProbeNodePairContinuously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, pktSize int64, bytesTransferred, transferTime *expvar.Float, qdh *queuingDelayHistogram) error {
// To avoid derper dropping enqueued packets, limit the number of packets in flight.
// The value here is slightly smaller than perClientSendQueueDepth in derp_server.go
inFlight := syncs.NewSemaphore(30)
// Send the packets.
sendc := make(chan error, 1)
pkt := make([]byte, pktSize) // sized slightly smaller than MTU to avoid fragmentation
crand.Read(pkt)
go func() {
for {
inFlight.AcquireContext(ctx)
now := time.Now()
// Write the monotonic time into the first 16 bytes of the packet
wall, ext := wallAndExt(now)
binary.BigEndian.PutUint64(pkt, wall)
binary.BigEndian.PutUint64(pkt[8:], uint64(ext))
if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil {
sendc <- fmt.Errorf("sending packet %w", err)
return
}
}
}()
// Increment transfer time every 1 second.
ticker := time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
transferTime.Add(1)
case <-ctx.Done():
return
}
}
}()
// Receive the packets.
recvc := make(chan error, 1)
go func() {
defer close(recvc) // to break out of 'select' below.
for {
m, err := toc.Recv()
if err != nil {
recvc <- err
return
}
switch v := m.(type) {
case derp.ReceivedPacket:
now := time.Now()
inFlight.Release()
if v.Source != fromc.SelfPublicKey() {
recvc <- fmt.Errorf("got data packet from unexpected source, %v", v.Source)
return
}
wall := binary.BigEndian.Uint64(v.Data)
ext := int64(binary.BigEndian.Uint64(v.Data[8:]))
sent := fromWallAndExt(wall, ext)
qdh.add(now.Sub(sent))
bytesTransferred.Add(float64(pktSize))
case derp.KeepAliveMessage:
// Silently ignore.
default:
log.Printf("%v: ignoring Recv frame type %T", to.Name, v)
// Loop.
}
}
}()
select {
case <-ctx.Done():
return fmt.Errorf("timeout: %w", ctx.Err())
case err := <-sendc:
if err != nil {
return fmt.Errorf("error sending via %q: %w", from.Name, err)
}
case err := <-recvc:
if err != nil {
return fmt.Errorf("error receiving from %q: %w", to.Name, err)
}
}
return nil
}
func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isProber bool) (*derphttp.Client, error) { func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isProber bool) (*derphttp.Client, error) {
// To avoid spamming the log with regular connection messages. // To avoid spamming the log with regular connection messages.
l := logger.Filtered(log.Printf, func(s string) bool { l := logger.Filtered(log.Printf, func(s string) bool {
@ -597,6 +802,9 @@ func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isPr
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Only verify TLS state if this is a prober.
if isProber {
cs, ok := dc.TLSConnectionState() cs, ok := dc.TLSConnectionState()
if !ok { if !ok {
dc.Close() dc.Close()
@ -610,6 +818,7 @@ func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isPr
dc.Close() dc.Close()
return nil, fmt.Errorf("TLS server name %q != derp hostname %q", cs.ServerName, n.HostName) return nil, fmt.Errorf("TLS server name %q != derp hostname %q", cs.ServerName, n.HostName)
} }
}
errc := make(chan error, 1) errc := make(chan error, 1)
go func() { go func() {
@ -645,3 +854,27 @@ func httpOrFileTransport() http.RoundTripper {
tr.RegisterProtocol("file", http.NewFileTransport(http.Dir("/"))) tr.RegisterProtocol("file", http.NewFileTransport(http.Dir("/")))
return tr return tr
} }
// wallAndExt extracts the wall and ext fields from a time.Time,
// allowing us to marshal a monotonic clock reading.
func wallAndExt(t time.Time) (uint64, int64) {
v := reflect.ValueOf(&t).Elem()
return exposeField(v.Field(0)).Uint(), exposeField(v.Field(1)).Int()
}
// fromWallAndExt constructs a time.Time from wall and ext fields,
// allowing us to unmarshal a monotonic clock reading.
func fromWallAndExt(wall uint64, ext int64) time.Time {
var t time.Time
v := reflect.ValueOf(&t).Elem()
exposeField(v.Field(0)).SetUint(wall)
exposeField(v.Field(1)).SetInt(ext)
return t
}
func exposeField(v reflect.Value) reflect.Value {
if v.CanSet() {
return v
}
return reflect.NewAt(v.Type(), unsafe.Pointer(v.UnsafeAddr())).Elem()
}

View File

@ -192,7 +192,7 @@ func TestRunDerpProbeNodePair(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
err = runDerpProbeNodePair(ctx, &tailcfg.DERPNode{Name: "c1"}, &tailcfg.DERPNode{Name: "c2"}, c1, c2, 100_000_000) err = runDerpProbeNodePairOnce(ctx, &tailcfg.DERPNode{Name: "c1"}, &tailcfg.DERPNode{Name: "c2"}, c1, c2, 100_000_000)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -237,3 +237,12 @@ func Test_packetsForSize(t *testing.T) {
}) })
} }
} }
func TestWallAndExt(t *testing.T) {
now := time.Now()
now2 := fromWallAndExt(wallAndExt(now))
diff := now2.Sub(now)
if now2.Sub(now) != 0 {
t.Fatalf("fromWallAndExt(wallAndExt(now)) = %v; want 0", diff)
}
}

View File

@ -256,6 +256,11 @@ type Probe struct {
latencyHist *ring.Ring latencyHist *ring.Ring
} }
// IsContinuous indicates that this is a continuous probe.
func (p *Probe) IsContinuous() bool {
return p.interval < 0
}
// Close shuts down the Probe and unregisters it from its Prober. // Close shuts down the Probe and unregisters it from its Prober.
// It is safe to Run a new probe of the same name after Close returns. // It is safe to Run a new probe of the same name after Close returns.
func (p *Probe) Close() error { func (p *Probe) Close() error {
@ -288,6 +293,22 @@ func (p *Probe) loop() {
return return
} }
if p.IsContinuous() {
// Probe function is going to run continuously.
for {
p.run()
// Wait and then retry if probe fails. We use the inverse of the
// configured negative interval as our sleep period.
// TODO(percy):implement exponential backoff.
select {
case <-time.After(-1 * p.interval):
p.run()
case <-p.ctx.Done():
return
}
}
}
p.tick = p.prober.newTicker(p.interval) p.tick = p.prober.newTicker(p.interval)
defer p.tick.Stop() defer p.tick.Stop()
for { for {
@ -323,9 +344,17 @@ func (p *Probe) run() (pi ProbeInfo, err error) {
p.recordEnd(err) p.recordEnd(err)
} }
}() }()
ctx := p.ctx
if p.IsContinuous() {
p.mu.Lock()
p.lastErr = nil
p.mu.Unlock()
} else {
timeout := time.Duration(float64(p.interval) * 0.8) timeout := time.Duration(float64(p.interval) * 0.8)
ctx, cancel := context.WithTimeout(p.ctx, timeout) var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel() defer cancel()
}
err = p.probeClass.Probe(ctx) err = p.probeClass.Probe(ctx)
p.recordEnd(err) p.recordEnd(err)
@ -365,6 +394,16 @@ func (p *Probe) recordEnd(err error) {
p.successHist = p.successHist.Next() p.successHist = p.successHist.Next()
} }
// ProbeStatus indicates the status of a probe.
type ProbeStatus string
const (
ProbeStatusUnknown = "unknown"
ProbeStatusRunning = "running"
ProbeStatusFailed = "failed"
ProbeStatusSucceeded = "succeeded"
)
// ProbeInfo is a snapshot of the configuration and state of a Probe. // ProbeInfo is a snapshot of the configuration and state of a Probe.
type ProbeInfo struct { type ProbeInfo struct {
Name string Name string
@ -374,7 +413,7 @@ type ProbeInfo struct {
Start time.Time Start time.Time
End time.Time End time.Time
Latency time.Duration Latency time.Duration
Result bool Status ProbeStatus
Error string Error string
RecentResults []bool RecentResults []bool
RecentLatencies []time.Duration RecentLatencies []time.Duration
@ -402,6 +441,10 @@ func (pb ProbeInfo) RecentMedianLatency() time.Duration {
return pb.RecentLatencies[len(pb.RecentLatencies)/2] return pb.RecentLatencies[len(pb.RecentLatencies)/2]
} }
func (pb ProbeInfo) Continuous() bool {
return pb.Interval < 0
}
// ProbeInfo returns the state of all probes. // ProbeInfo returns the state of all probes.
func (p *Prober) ProbeInfo() map[string]ProbeInfo { func (p *Prober) ProbeInfo() map[string]ProbeInfo {
out := map[string]ProbeInfo{} out := map[string]ProbeInfo{}
@ -429,9 +472,14 @@ func (probe *Probe) probeInfoLocked() ProbeInfo {
Labels: probe.metricLabels, Labels: probe.metricLabels,
Start: probe.start, Start: probe.start,
End: probe.end, End: probe.end,
Result: probe.succeeded,
} }
if probe.lastErr != nil { inf.Status = ProbeStatusUnknown
if probe.end.Before(probe.start) {
inf.Status = ProbeStatusRunning
} else if probe.succeeded {
inf.Status = ProbeStatusSucceeded
} else if probe.lastErr != nil {
inf.Status = ProbeStatusFailed
inf.Error = probe.lastErr.Error() inf.Error = probe.lastErr.Error()
} }
if probe.latency > 0 { if probe.latency > 0 {
@ -467,7 +515,7 @@ func (p *Prober) RunHandler(w http.ResponseWriter, r *http.Request) error {
p.mu.Lock() p.mu.Lock()
probe, ok := p.probes[name] probe, ok := p.probes[name]
p.mu.Unlock() p.mu.Unlock()
if !ok { if !ok || probe.IsContinuous() {
return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil) return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil)
} }
@ -531,7 +579,8 @@ func (p *Probe) Collect(ch chan<- prometheus.Metric) {
if !p.start.IsZero() { if !p.start.IsZero() {
ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix())) ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix()))
} }
if p.end.IsZero() { // For periodic probes that haven't ended, don't collect probe metrics yet.
if p.end.IsZero() && !p.IsContinuous() {
return return
} }
ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix())) ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix()))

View File

@ -316,7 +316,7 @@ func TestProberProbeInfo(t *testing.T) {
Interval: probeInterval, Interval: probeInterval,
Labels: map[string]string{"class": "", "name": "probe1"}, Labels: map[string]string{"class": "", "name": "probe1"},
Latency: 500 * time.Millisecond, Latency: 500 * time.Millisecond,
Result: true, Status: ProbeStatusSucceeded,
RecentResults: []bool{true}, RecentResults: []bool{true},
RecentLatencies: []time.Duration{500 * time.Millisecond}, RecentLatencies: []time.Duration{500 * time.Millisecond},
}, },
@ -324,6 +324,7 @@ func TestProberProbeInfo(t *testing.T) {
Name: "probe2", Name: "probe2",
Interval: probeInterval, Interval: probeInterval,
Labels: map[string]string{"class": "", "name": "probe2"}, Labels: map[string]string{"class": "", "name": "probe2"},
Status: ProbeStatusFailed,
Error: "error2", Error: "error2",
RecentResults: []bool{false}, RecentResults: []bool{false},
RecentLatencies: nil, // no latency for failed probes RecentLatencies: nil, // no latency for failed probes
@ -349,7 +350,7 @@ func TestProbeInfoRecent(t *testing.T) {
}{ }{
{ {
name: "no_runs", name: "no_runs",
wantProbeInfo: ProbeInfo{}, wantProbeInfo: ProbeInfo{Status: ProbeStatusUnknown},
wantRecentSuccessRatio: 0, wantRecentSuccessRatio: 0,
wantRecentMedianLatency: 0, wantRecentMedianLatency: 0,
}, },
@ -358,7 +359,7 @@ func TestProbeInfoRecent(t *testing.T) {
results: []probeResult{{latency: 100 * time.Millisecond, err: nil}}, results: []probeResult{{latency: 100 * time.Millisecond, err: nil}},
wantProbeInfo: ProbeInfo{ wantProbeInfo: ProbeInfo{
Latency: 100 * time.Millisecond, Latency: 100 * time.Millisecond,
Result: true, Status: ProbeStatusSucceeded,
RecentResults: []bool{true}, RecentResults: []bool{true},
RecentLatencies: []time.Duration{100 * time.Millisecond}, RecentLatencies: []time.Duration{100 * time.Millisecond},
}, },
@ -369,7 +370,7 @@ func TestProbeInfoRecent(t *testing.T) {
name: "single_failure", name: "single_failure",
results: []probeResult{{latency: 100 * time.Millisecond, err: errors.New("error123")}}, results: []probeResult{{latency: 100 * time.Millisecond, err: errors.New("error123")}},
wantProbeInfo: ProbeInfo{ wantProbeInfo: ProbeInfo{
Result: false, Status: ProbeStatusFailed,
RecentResults: []bool{false}, RecentResults: []bool{false},
RecentLatencies: nil, RecentLatencies: nil,
Error: "error123", Error: "error123",
@ -390,7 +391,7 @@ func TestProbeInfoRecent(t *testing.T) {
{latency: 80 * time.Millisecond, err: nil}, {latency: 80 * time.Millisecond, err: nil},
}, },
wantProbeInfo: ProbeInfo{ wantProbeInfo: ProbeInfo{
Result: true, Status: ProbeStatusSucceeded,
Latency: 80 * time.Millisecond, Latency: 80 * time.Millisecond,
RecentResults: []bool{false, true, true, false, true, true, false, true}, RecentResults: []bool{false, true, true, false, true, true, false, true},
RecentLatencies: []time.Duration{ RecentLatencies: []time.Duration{
@ -420,7 +421,7 @@ func TestProbeInfoRecent(t *testing.T) {
{latency: 110 * time.Millisecond, err: nil}, {latency: 110 * time.Millisecond, err: nil},
}, },
wantProbeInfo: ProbeInfo{ wantProbeInfo: ProbeInfo{
Result: true, Status: ProbeStatusSucceeded,
Latency: 110 * time.Millisecond, Latency: 110 * time.Millisecond,
RecentResults: []bool{true, true, true, true, true, true, true, true, true, true}, RecentResults: []bool{true, true, true, true, true, true, true, true, true, true},
RecentLatencies: []time.Duration{ RecentLatencies: []time.Duration{
@ -483,7 +484,7 @@ func TestProberRunHandler(t *testing.T) {
ProbeInfo: ProbeInfo{ ProbeInfo: ProbeInfo{
Name: "success", Name: "success",
Interval: probeInterval, Interval: probeInterval,
Result: true, Status: ProbeStatusSucceeded,
RecentResults: []bool{true, true}, RecentResults: []bool{true, true},
}, },
PreviousSuccessRatio: 1, PreviousSuccessRatio: 1,
@ -498,7 +499,7 @@ func TestProberRunHandler(t *testing.T) {
ProbeInfo: ProbeInfo{ ProbeInfo: ProbeInfo{
Name: "failure", Name: "failure",
Interval: probeInterval, Interval: probeInterval,
Result: false, Status: ProbeStatusFailed,
Error: "error123", Error: "error123",
RecentResults: []bool{false, false}, RecentResults: []bool{false, false},
}, },

View File

@ -62,7 +62,8 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc
return func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error {
type probeStatus struct { type probeStatus struct {
ProbeInfo ProbeInfo
TimeSinceLast time.Duration TimeSinceLastStart time.Duration
TimeSinceLastEnd time.Duration
Links map[string]template.URL Links map[string]template.URL
} }
vars := struct { vars := struct {
@ -81,12 +82,15 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc
for name, info := range p.ProbeInfo() { for name, info := range p.ProbeInfo() {
vars.TotalProbes++ vars.TotalProbes++
if !info.Result { if info.Error != "" {
vars.UnhealthyProbes++ vars.UnhealthyProbes++
} }
s := probeStatus{ProbeInfo: info} s := probeStatus{ProbeInfo: info}
if !info.Start.IsZero() {
s.TimeSinceLastStart = time.Since(info.Start).Truncate(time.Second)
}
if !info.End.IsZero() { if !info.End.IsZero() {
s.TimeSinceLast = time.Since(info.End).Truncate(time.Second) s.TimeSinceLastEnd = time.Since(info.End).Truncate(time.Second)
} }
for textTpl, urlTpl := range params.probeLinks { for textTpl, urlTpl := range params.probeLinks {
text, err := renderTemplate(textTpl, info) text, err := renderTemplate(textTpl, info)

View File

@ -73,8 +73,9 @@
<th>Name</th> <th>Name</th>
<th>Probe Class & Labels</th> <th>Probe Class & Labels</th>
<th>Interval</th> <th>Interval</th>
<th>Last Attempt</th> <th>Last Finished</th>
<th>Success</th> <th>Last Started</th>
<th>Status</th>
<th>Latency</th> <th>Latency</th>
<th>Last Error</th> <th>Last Error</th>
</tr></thead> </tr></thead>
@ -85,10 +86,12 @@
{{$name}} {{$name}}
{{range $text, $url := $probeInfo.Links}} {{range $text, $url := $probeInfo.Links}}
<br/> <br/>
{{if not $probeInfo.Continuous}}
<button onclick="location.href='{{$url}}';" type="button"> <button onclick="location.href='{{$url}}';" type="button">
{{$text}} {{$text}}
</button> </button>
{{end}} {{end}}
{{end}}
</td> </td>
<td>{{$probeInfo.Class}}<br/> <td>{{$probeInfo.Class}}<br/>
<div class="small"> <div class="small">
@ -97,28 +100,48 @@
{{end}} {{end}}
</div> </div>
</td> </td>
<td>{{$probeInfo.Interval}}</td> <td>
<td data-sort="{{$probeInfo.TimeSinceLast.Milliseconds}}"> {{if $probeInfo.Continuous}}
{{if $probeInfo.TimeSinceLast}} Continuous
{{$probeInfo.TimeSinceLast.String}} ago<br/> {{else}}
{{$probeInfo.Interval}}
{{end}}
</td>
<td data-sort="{{$probeInfo.TimeSinceLastEnd.Milliseconds}}">
{{if $probeInfo.TimeSinceLastEnd}}
{{$probeInfo.TimeSinceLastEnd.String}} ago<br/>
<span class="small">{{$probeInfo.End.Format "2006-01-02T15:04:05Z07:00"}}</span> <span class="small">{{$probeInfo.End.Format "2006-01-02T15:04:05Z07:00"}}</span>
{{else}} {{else}}
Never Never
{{end}} {{end}}
</td> </td>
<td> <td data-sort="{{$probeInfo.TimeSinceLastStart.Milliseconds}}">
{{if $probeInfo.Result}} {{if $probeInfo.TimeSinceLastStart}}
{{$probeInfo.Result}} {{$probeInfo.TimeSinceLastStart.String}} ago<br/>
<span class="small">{{$probeInfo.Start.Format "2006-01-02T15:04:05Z07:00"}}</span>
{{else}} {{else}}
<span class="error">{{$probeInfo.Result}}</span> Never
{{end}}
</td>
<td>
{{if $probeInfo.Error}}
<span class="error">{{$probeInfo.Status}}</span>
{{else}}
{{$probeInfo.Status}}
{{end}}<br/> {{end}}<br/>
{{if not $probeInfo.Continuous}}
<div class="small">Recent: {{$probeInfo.RecentResults}}</div> <div class="small">Recent: {{$probeInfo.RecentResults}}</div>
<div class="small">Mean: {{$probeInfo.RecentSuccessRatio}}</div> <div class="small">Mean: {{$probeInfo.RecentSuccessRatio}}</div>
{{end}}
</td> </td>
<td data-sort="{{$probeInfo.Latency.Milliseconds}}"> <td data-sort="{{$probeInfo.Latency.Milliseconds}}">
{{if $probeInfo.Continuous}}
n/a
{{else}}
{{$probeInfo.Latency.String}} {{$probeInfo.Latency.String}}
<div class="small">Recent: {{$probeInfo.RecentLatencies}}</div> <div class="small">Recent: {{$probeInfo.RecentLatencies}}</div>
<div class="small">Median: {{$probeInfo.RecentMedianLatency}}</div> <div class="small">Median: {{$probeInfo.RecentMedianLatency}}</div>
{{end}}
</td> </td>
<td class="small">{{$probeInfo.Error}}</td> <td class="small">{{$probeInfo.Error}}</td>
</tr> </tr>