From feaf4743da550d4497eda0fcd4936bbfe7f2be23 Mon Sep 17 00:00:00 2001 From: Percy Wegmann Date: Fri, 15 Nov 2024 17:48:49 -0600 Subject: [PATCH] cmd/derpprobe,prober: add ability to continuously probe bandwidth and track queuing delay Setting the bw-interval to a negative value enables continuous bandwidth probing. When probing bandwidth continuously, the prober also tracks packet transit times, which can be used as a measure of queuing delay. Updates tailscale/corp#24522 Signed-off-by: Percy Wegmann --- cmd/derpprobe/derpprobe.go | 6 +- derp/derphttp/derphttp_client.go | 5 +- prober/derp.go | 281 ++++++++++++++++++++++++++++--- prober/derp_test.go | 11 +- prober/prober.go | 65 ++++++- prober/prober_test.go | 17 +- prober/status.go | 12 +- prober/status.html | 59 +++++-- 8 files changed, 389 insertions(+), 67 deletions(-) diff --git a/cmd/derpprobe/derpprobe.go b/cmd/derpprobe/derpprobe.go index 8f04326b0..c6657d7b4 100644 --- a/cmd/derpprobe/derpprobe.go +++ b/cmd/derpprobe/derpprobe.go @@ -27,7 +27,7 @@ var ( meshInterval = flag.Duration("mesh-interval", 15*time.Second, "mesh probe interval") stunInterval = flag.Duration("stun-interval", 15*time.Second, "STUN probe interval") tlsInterval = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval") - bwInterval = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing)") + bwInterval = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing, > 0 = periodic bandwidth probing, < 0 = continuous bandwidth probing)") bwSize = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size") regionCode = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed") ) @@ -45,7 +45,7 @@ func main() { prober.WithSTUNProbing(*stunInterval), prober.WithTLSProbing(*tlsInterval), } - if *bwInterval > 0 { + if *bwInterval != 0 { opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize)) } if *regionCode != "" { @@ -106,7 +106,7 @@ func getOverallStatus(p *prober.Prober) (o overallStatus) { // Do not show probes that have not finished yet. continue } - if i.Result { + if i.Status == prober.ProbeStatusSucceeded { o.addGoodf("%s: %s", p, i.Latency) } else { o.addBadf("%s: %s", p, i.Error) diff --git a/derp/derphttp/derphttp_client.go b/derp/derphttp/derphttp_client.go index c95d072b1..07dbe5bd9 100644 --- a/derp/derphttp/derphttp_client.go +++ b/derp/derphttp/derphttp_client.go @@ -748,7 +748,7 @@ func (c *Client) dialNode(ctx context.Context, n *tailcfg.DERPNode) (net.Conn, e select { case <-ctx.Done(): // Either user canceled original context, - // it timed out, or the v6 dial succeeded. + // if timed out, or the v6 dial succeeded. t.Stop() return case <-tChannel: @@ -757,6 +757,9 @@ func (c *Client) dialNode(ctx context.Context, n *tailcfg.DERPNode) (net.Conn, e } dst := cmp.Or(dstPrimary, n.HostName) port := "443" + if !c.useHTTPS() { + port = "3340" + } if n.DERPPort != 0 { port = fmt.Sprint(n.DERPPort) } diff --git a/prober/derp.go b/prober/derp.go index b1ebc590d..93964abb3 100644 --- a/prober/derp.go +++ b/prober/derp.go @@ -8,6 +8,7 @@ import ( "cmp" "context" crand "crypto/rand" + "encoding/binary" "encoding/json" "errors" "expvar" @@ -15,10 +16,12 @@ import ( "log" "net" "net/http" + "reflect" "strconv" "strings" "sync" "time" + "unsafe" "github.com/prometheus/client_golang/prometheus" "tailscale.com/client/tailscale" @@ -55,7 +58,7 @@ type derpProber struct { tlsProbeFn func(string) ProbeClass udpProbeFn func(string, int) ProbeClass meshProbeFn func(string, string) ProbeClass - bwProbeFn func(string, string, int64) ProbeClass + bwProbeFn func(string, string, int64, bool) ProbeClass sync.Mutex lastDERPMap *tailcfg.DERPMap @@ -69,6 +72,11 @@ type DERPOpt func(*derpProber) // WithBandwidthProbing enables bandwidth probing. When enabled, a payload of // `size` bytes will be regularly transferred through each DERP server, and each // pair of DERP servers in every region. +// +// If `size` is less than 0, this will trigger the probe to continuously send +// data as fast as possible. Continuous mode can consume huge amounts of +// network bandwidth, so it is best used when probing DERP servers on the same +// LAN/datacenter. func WithBandwidthProbing(interval time.Duration, size int64) DERPOpt { return func(d *derpProber) { d.bwInterval = interval @@ -94,7 +102,7 @@ func WithSTUNProbing(interval time.Duration) DERPOpt { // WithTLSProbing enables TLS probing that will check TLS certificate on port // 443 of each DERP server every `interval`. -func WithTLSProbing(interval time.Duration) DERPOpt { +func WithTLSProbing(interval time.Duration, regions ...string) DERPOpt { return func(d *derpProber) { d.tlsInterval = interval } @@ -196,12 +204,12 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { } } - if d.bwInterval > 0 && d.bwProbeSize > 0 { + if d.bwInterval != 0 && d.bwProbeSize > 0 { n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name) wantProbes[n] = true if d.probes[n] == nil { log.Printf("adding DERP bandwidth probe for %s->%s (%s) %v bytes every %v", server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval) - d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) + d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize, d.bwInterval < 0)) } } } @@ -246,11 +254,19 @@ func (d *derpProber) probeMesh(from, to string) ProbeClass { // through a pair of DERP servers (or just one server, if 'from' and 'to' are // the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two // DERP servers in the same region. -func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { +func (d *derpProber) probeBandwidth(from, to string, size int64, continuous bool) ProbeClass { derpPath := "mesh" if from == to { derpPath = "single" } + if continuous { + return d.probeBandwidthContinuous(from, to, derpPath, size) + } else { + return d.probeBandwidthOnce(from, to, derpPath, size) + } +} + +func (d *derpProber) probeBandwidthOnce(from, to, derpPath string, size int64) ProbeClass { var transferTime expvar.Float return ProbeClass{ Probe: func(ctx context.Context) error { @@ -258,7 +274,7 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { if err != nil { return err } - return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTime) + return derpProbeBandwidthOnce(ctx, d.lastDERPMap, fromN, toN, size, &transferTime) }, Class: "derp_bw", Labels: Labels{"derp_path": derpPath}, @@ -271,6 +287,76 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { } } +func (d *derpProber) probeBandwidthContinuous(from, to, derpPath string, size int64) ProbeClass { + var bytesTransferred expvar.Float + var transferTime expvar.Float + var qdh queuingDelayHistogram + qdh.reset() + return ProbeClass{ + Probe: func(ctx context.Context) error { + fromN, toN, err := d.getNodePair(from, to) + if err != nil { + return err + } + return derpProbeBandwidthContinuous(ctx, d.lastDERPMap, fromN, toN, size, &bytesTransferred, &transferTime, &qdh) + }, + Class: "derp_bw", + Labels: Labels{"derp_path": derpPath}, + Metrics: func(l prometheus.Labels) []prometheus.Metric { + qdh.mx.Lock() + result := []prometheus.Metric{ + prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_continuous_size_bytes", "Payload size of the bandwidth prober", nil, l), prometheus.GaugeValue, float64(size)), + prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_continuous_bytes_total", "Total data transferred", nil, l), prometheus.CounterValue, float64(bytesTransferred.Value())), + prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_continuous_transfer_time_seconds_total", "Time it took to transfer data", nil, l), prometheus.CounterValue, transferTime.Value()), + prometheus.MustNewConstHistogram(prometheus.NewDesc("derp_bw_probe_continuous_queuing_delays_seconds", "Distribution of queuing delays", nil, l), qdh.count, qdh.sum, qdh.buckets), + } + qdh.resetLocked() + qdh.mx.Unlock() + return result + }, + } +} + +// queuingDelayHistogram allows tracking a histogram of queuing delays +type queuingDelayHistogram struct { + count uint64 + sum float64 + buckets map[float64]uint64 + mx sync.Mutex +} + +// qdhBuckets defines the buckets (in seconds) for the queuingDelayHistogram. +var qdhBuckets = []float64{0.0001, 0.0002, 0.0005} + +func (qdh *queuingDelayHistogram) reset() { + qdh.mx.Lock() + defer qdh.mx.Unlock() + qdh.resetLocked() +} + +func (qdh *queuingDelayHistogram) resetLocked() { + qdh.count = 0 + qdh.sum = 0 + qdh.buckets = make(map[float64]uint64, len(qdhBuckets)) +} + +func (qdh *queuingDelayHistogram) add(d time.Duration) { + qdh.mx.Lock() + defer qdh.mx.Unlock() + + seconds := float64(d.Seconds()) + qdh.count++ + qdh.sum += seconds + + for _, b := range qdhBuckets { + if seconds > b { + continue + } + qdh.buckets[b] += 1 + break + } +} + // getNodePair returns DERPNode objects for two DERP servers based on their // short names. func (d *derpProber) getNodePair(n1, n2 string) (ret1, ret2 *tailcfg.DERPNode, _ error) { @@ -411,9 +497,9 @@ func derpProbeUDP(ctx context.Context, ipStr string, port int) error { return nil } -// derpProbeBandwidth sends a payload of a given size between two local +// derpProbeBandwidthOnce sends a payload of a given size between two local // DERP clients connected to two DERP servers. -func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTime *expvar.Float) (err error) { +func derpProbeBandwidthOnce(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTime *expvar.Float) (err error) { // This probe uses clients with isProber=false to avoid spamming the derper logs with every packet // sent by the bandwidth probe. fromc, err := newConn(ctx, dm, from, false) @@ -437,7 +523,39 @@ func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tail start := time.Now() defer func() { transferTime.Add(time.Since(start).Seconds()) }() - if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, size); err != nil { + if err := runDerpProbeNodePairOnce(ctx, from, to, fromc, toc, size); err != nil { + // Record pubkeys on failed probes to aid investigation. + return fmt.Errorf("%s -> %s: %w", + fromc.SelfPublicKey().ShortString(), + toc.SelfPublicKey().ShortString(), err) + } + return nil +} + +// derpProbeBandwidthContinuous continuously sends data between two local +// DERP clients connected to two DERP servers. +func derpProbeBandwidthContinuous(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, bytesTransferred, transferTime *expvar.Float, qdh *queuingDelayHistogram) (err error) { + // This probe uses clients with isProber=false to avoid spamming the derper logs with every packet + // sent by the bandwidth probe. + fromc, err := newConn(ctx, dm, from, false) + if err != nil { + return err + } + defer fromc.Close() + toc, err := newConn(ctx, dm, to, false) + if err != nil { + return err + } + defer toc.Close() + + // Wait a bit for from's node to hear about to existing on the + // other node in the region, in the case where the two nodes + // are different. + if from.Name != to.Name { + time.Sleep(100 * time.Millisecond) // pretty arbitrary + } + + if err := runDerpProbeNodePairContinuously(ctx, from, to, fromc, toc, size, bytesTransferred, transferTime, qdh); err != nil { // Record pubkeys on failed probes to aid investigation. return fmt.Errorf("%s -> %s: %w", fromc.SelfPublicKey().ShortString(), @@ -468,7 +586,7 @@ func derpProbeNodePair(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailc } const meshProbePacketSize = 8 - if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, meshProbePacketSize); err != nil { + if err := runDerpProbeNodePairOnce(ctx, from, to, fromc, toc, meshProbePacketSize); err != nil { // Record pubkeys on failed probes to aid investigation. return fmt.Errorf("%s -> %s: %w", fromc.SelfPublicKey().ShortString(), @@ -504,10 +622,10 @@ func packetsForSize(size int64) [][]byte { return pkts } -// runDerpProbeNodePair takes two DERP clients (fromc and toc) connected to two +// runDerpProbeNodePairOnce takes two DERP clients (fromc and toc) connected to two // DERP servers (from and to) and sends a test payload of a given size from one // to another. -func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64) error { +func runDerpProbeNodePairOnce(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64) error { // To avoid derper dropping enqueued packets, limit the number of packets in flight. // The value here is slightly smaller than perClientSendQueueDepth in derp_server.go inFlight := syncs.NewSemaphore(30) @@ -544,6 +662,8 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc recvc <- fmt.Errorf("got data packet %d from unexpected source, %v", idx, v.Source) return } + // This assumes that the packets are received reliably and in order. + // The DERP protocol does not guarantee this, but this probe assumes it. if got, want := v.Data, pkts[idx]; !bytes.Equal(got, want) { recvc <- fmt.Errorf("unexpected data packet %d (out of %d)", idx, len(pkts)) return @@ -577,6 +697,91 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc return nil } +func runDerpProbeNodePairContinuously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, pktSize int64, bytesTransferred, transferTime *expvar.Float, qdh *queuingDelayHistogram) error { + // To avoid derper dropping enqueued packets, limit the number of packets in flight. + // The value here is slightly smaller than perClientSendQueueDepth in derp_server.go + inFlight := syncs.NewSemaphore(30) + + // Send the packets. + sendc := make(chan error, 1) + pkt := make([]byte, pktSize) // sized slightly smaller than MTU to avoid fragmentation + crand.Read(pkt) + go func() { + for { + inFlight.AcquireContext(ctx) + now := time.Now() + // Write the monotonic time into the first 16 bytes of the packet + wall, ext := wallAndExt(now) + binary.BigEndian.PutUint64(pkt, wall) + binary.BigEndian.PutUint64(pkt[8:], uint64(ext)) + if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil { + sendc <- fmt.Errorf("sending packet %w", err) + return + } + } + }() + + // Increment transfer time every 1 second. + ticker := time.NewTicker(1 * time.Second) + go func() { + for { + select { + case <-ticker.C: + transferTime.Add(1) + case <-ctx.Done(): + return + } + } + }() + + // Receive the packets. + recvc := make(chan error, 1) + go func() { + defer close(recvc) // to break out of 'select' below. + for { + m, err := toc.Recv() + if err != nil { + recvc <- err + return + } + switch v := m.(type) { + case derp.ReceivedPacket: + now := time.Now() + inFlight.Release() + if v.Source != fromc.SelfPublicKey() { + recvc <- fmt.Errorf("got data packet from unexpected source, %v", v.Source) + return + } + wall := binary.BigEndian.Uint64(v.Data) + ext := int64(binary.BigEndian.Uint64(v.Data[8:])) + sent := fromWallAndExt(wall, ext) + qdh.add(now.Sub(sent)) + bytesTransferred.Add(float64(pktSize)) + + case derp.KeepAliveMessage: + // Silently ignore. + default: + log.Printf("%v: ignoring Recv frame type %T", to.Name, v) + // Loop. + } + } + }() + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout: %w", ctx.Err()) + case err := <-sendc: + if err != nil { + return fmt.Errorf("error sending via %q: %w", from.Name, err) + } + case err := <-recvc: + if err != nil { + return fmt.Errorf("error receiving from %q: %w", to.Name, err) + } + } + return nil +} + func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isProber bool) (*derphttp.Client, error) { // To avoid spamming the log with regular connection messages. l := logger.Filtered(log.Printf, func(s string) bool { @@ -597,18 +802,22 @@ func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isPr if err != nil { return nil, err } - cs, ok := dc.TLSConnectionState() - if !ok { - dc.Close() - return nil, errors.New("no TLS state") - } - if len(cs.PeerCertificates) == 0 { - dc.Close() - return nil, errors.New("no peer certificates") - } - if cs.ServerName != n.HostName { - dc.Close() - return nil, fmt.Errorf("TLS server name %q != derp hostname %q", cs.ServerName, n.HostName) + + // Only verify TLS state if this is a prober. + if isProber { + cs, ok := dc.TLSConnectionState() + if !ok { + dc.Close() + return nil, errors.New("no TLS state") + } + if len(cs.PeerCertificates) == 0 { + dc.Close() + return nil, errors.New("no peer certificates") + } + if cs.ServerName != n.HostName { + dc.Close() + return nil, fmt.Errorf("TLS server name %q != derp hostname %q", cs.ServerName, n.HostName) + } } errc := make(chan error, 1) @@ -645,3 +854,27 @@ func httpOrFileTransport() http.RoundTripper { tr.RegisterProtocol("file", http.NewFileTransport(http.Dir("/"))) return tr } + +// wallAndExt extracts the wall and ext fields from a time.Time, +// allowing us to marshal a monotonic clock reading. +func wallAndExt(t time.Time) (uint64, int64) { + v := reflect.ValueOf(&t).Elem() + return exposeField(v.Field(0)).Uint(), exposeField(v.Field(1)).Int() +} + +// fromWallAndExt constructs a time.Time from wall and ext fields, +// allowing us to unmarshal a monotonic clock reading. +func fromWallAndExt(wall uint64, ext int64) time.Time { + var t time.Time + v := reflect.ValueOf(&t).Elem() + exposeField(v.Field(0)).SetUint(wall) + exposeField(v.Field(1)).SetInt(ext) + return t +} + +func exposeField(v reflect.Value) reflect.Value { + if v.CanSet() { + return v + } + return reflect.NewAt(v.Type(), unsafe.Pointer(v.UnsafeAddr())).Elem() +} diff --git a/prober/derp_test.go b/prober/derp_test.go index c084803e9..12aba316d 100644 --- a/prober/derp_test.go +++ b/prober/derp_test.go @@ -192,7 +192,7 @@ func TestRunDerpProbeNodePair(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - err = runDerpProbeNodePair(ctx, &tailcfg.DERPNode{Name: "c1"}, &tailcfg.DERPNode{Name: "c2"}, c1, c2, 100_000_000) + err = runDerpProbeNodePairOnce(ctx, &tailcfg.DERPNode{Name: "c1"}, &tailcfg.DERPNode{Name: "c2"}, c1, c2, 100_000_000) if err != nil { t.Error(err) } @@ -237,3 +237,12 @@ func Test_packetsForSize(t *testing.T) { }) } } + +func TestWallAndExt(t *testing.T) { + now := time.Now() + now2 := fromWallAndExt(wallAndExt(now)) + diff := now2.Sub(now) + if now2.Sub(now) != 0 { + t.Fatalf("fromWallAndExt(wallAndExt(now)) = %v; want 0", diff) + } +} diff --git a/prober/prober.go b/prober/prober.go index 2a43628bd..73cf201dc 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -256,6 +256,11 @@ type Probe struct { latencyHist *ring.Ring } +// IsContinuous indicates that this is a continuous probe. +func (p *Probe) IsContinuous() bool { + return p.interval < 0 +} + // Close shuts down the Probe and unregisters it from its Prober. // It is safe to Run a new probe of the same name after Close returns. func (p *Probe) Close() error { @@ -288,6 +293,22 @@ func (p *Probe) loop() { return } + if p.IsContinuous() { + // Probe function is going to run continuously. + for { + p.run() + // Wait and then retry if probe fails. We use the inverse of the + // configured negative interval as our sleep period. + // TODO(percy):implement exponential backoff. + select { + case <-time.After(-1 * p.interval): + p.run() + case <-p.ctx.Done(): + return + } + } + } + p.tick = p.prober.newTicker(p.interval) defer p.tick.Stop() for { @@ -323,9 +344,17 @@ func (p *Probe) run() (pi ProbeInfo, err error) { p.recordEnd(err) } }() - timeout := time.Duration(float64(p.interval) * 0.8) - ctx, cancel := context.WithTimeout(p.ctx, timeout) - defer cancel() + ctx := p.ctx + if p.IsContinuous() { + p.mu.Lock() + p.lastErr = nil + p.mu.Unlock() + } else { + timeout := time.Duration(float64(p.interval) * 0.8) + var cancel func() + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } err = p.probeClass.Probe(ctx) p.recordEnd(err) @@ -365,6 +394,16 @@ func (p *Probe) recordEnd(err error) { p.successHist = p.successHist.Next() } +// ProbeStatus indicates the status of a probe. +type ProbeStatus string + +const ( + ProbeStatusUnknown = "unknown" + ProbeStatusRunning = "running" + ProbeStatusFailed = "failed" + ProbeStatusSucceeded = "succeeded" +) + // ProbeInfo is a snapshot of the configuration and state of a Probe. type ProbeInfo struct { Name string @@ -374,7 +413,7 @@ type ProbeInfo struct { Start time.Time End time.Time Latency time.Duration - Result bool + Status ProbeStatus Error string RecentResults []bool RecentLatencies []time.Duration @@ -402,6 +441,10 @@ func (pb ProbeInfo) RecentMedianLatency() time.Duration { return pb.RecentLatencies[len(pb.RecentLatencies)/2] } +func (pb ProbeInfo) Continuous() bool { + return pb.Interval < 0 +} + // ProbeInfo returns the state of all probes. func (p *Prober) ProbeInfo() map[string]ProbeInfo { out := map[string]ProbeInfo{} @@ -429,9 +472,14 @@ func (probe *Probe) probeInfoLocked() ProbeInfo { Labels: probe.metricLabels, Start: probe.start, End: probe.end, - Result: probe.succeeded, } - if probe.lastErr != nil { + inf.Status = ProbeStatusUnknown + if probe.end.Before(probe.start) { + inf.Status = ProbeStatusRunning + } else if probe.succeeded { + inf.Status = ProbeStatusSucceeded + } else if probe.lastErr != nil { + inf.Status = ProbeStatusFailed inf.Error = probe.lastErr.Error() } if probe.latency > 0 { @@ -467,7 +515,7 @@ func (p *Prober) RunHandler(w http.ResponseWriter, r *http.Request) error { p.mu.Lock() probe, ok := p.probes[name] p.mu.Unlock() - if !ok { + if !ok || probe.IsContinuous() { return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil) } @@ -531,7 +579,8 @@ func (p *Probe) Collect(ch chan<- prometheus.Metric) { if !p.start.IsZero() { ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix())) } - if p.end.IsZero() { + // For periodic probes that haven't ended, don't collect probe metrics yet. + if p.end.IsZero() && !p.IsContinuous() { return } ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix())) diff --git a/prober/prober_test.go b/prober/prober_test.go index 742a914b2..3905bfbc9 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -316,7 +316,7 @@ func TestProberProbeInfo(t *testing.T) { Interval: probeInterval, Labels: map[string]string{"class": "", "name": "probe1"}, Latency: 500 * time.Millisecond, - Result: true, + Status: ProbeStatusSucceeded, RecentResults: []bool{true}, RecentLatencies: []time.Duration{500 * time.Millisecond}, }, @@ -324,6 +324,7 @@ func TestProberProbeInfo(t *testing.T) { Name: "probe2", Interval: probeInterval, Labels: map[string]string{"class": "", "name": "probe2"}, + Status: ProbeStatusFailed, Error: "error2", RecentResults: []bool{false}, RecentLatencies: nil, // no latency for failed probes @@ -349,7 +350,7 @@ func TestProbeInfoRecent(t *testing.T) { }{ { name: "no_runs", - wantProbeInfo: ProbeInfo{}, + wantProbeInfo: ProbeInfo{Status: ProbeStatusUnknown}, wantRecentSuccessRatio: 0, wantRecentMedianLatency: 0, }, @@ -358,7 +359,7 @@ func TestProbeInfoRecent(t *testing.T) { results: []probeResult{{latency: 100 * time.Millisecond, err: nil}}, wantProbeInfo: ProbeInfo{ Latency: 100 * time.Millisecond, - Result: true, + Status: ProbeStatusSucceeded, RecentResults: []bool{true}, RecentLatencies: []time.Duration{100 * time.Millisecond}, }, @@ -369,7 +370,7 @@ func TestProbeInfoRecent(t *testing.T) { name: "single_failure", results: []probeResult{{latency: 100 * time.Millisecond, err: errors.New("error123")}}, wantProbeInfo: ProbeInfo{ - Result: false, + Status: ProbeStatusFailed, RecentResults: []bool{false}, RecentLatencies: nil, Error: "error123", @@ -390,7 +391,7 @@ func TestProbeInfoRecent(t *testing.T) { {latency: 80 * time.Millisecond, err: nil}, }, wantProbeInfo: ProbeInfo{ - Result: true, + Status: ProbeStatusSucceeded, Latency: 80 * time.Millisecond, RecentResults: []bool{false, true, true, false, true, true, false, true}, RecentLatencies: []time.Duration{ @@ -420,7 +421,7 @@ func TestProbeInfoRecent(t *testing.T) { {latency: 110 * time.Millisecond, err: nil}, }, wantProbeInfo: ProbeInfo{ - Result: true, + Status: ProbeStatusSucceeded, Latency: 110 * time.Millisecond, RecentResults: []bool{true, true, true, true, true, true, true, true, true, true}, RecentLatencies: []time.Duration{ @@ -483,7 +484,7 @@ func TestProberRunHandler(t *testing.T) { ProbeInfo: ProbeInfo{ Name: "success", Interval: probeInterval, - Result: true, + Status: ProbeStatusSucceeded, RecentResults: []bool{true, true}, }, PreviousSuccessRatio: 1, @@ -498,7 +499,7 @@ func TestProberRunHandler(t *testing.T) { ProbeInfo: ProbeInfo{ Name: "failure", Interval: probeInterval, - Result: false, + Status: ProbeStatusFailed, Error: "error123", RecentResults: []bool{false, false}, }, diff --git a/prober/status.go b/prober/status.go index aa9ef99d0..20fbeec58 100644 --- a/prober/status.go +++ b/prober/status.go @@ -62,8 +62,9 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc return func(w http.ResponseWriter, r *http.Request) error { type probeStatus struct { ProbeInfo - TimeSinceLast time.Duration - Links map[string]template.URL + TimeSinceLastStart time.Duration + TimeSinceLastEnd time.Duration + Links map[string]template.URL } vars := struct { Title string @@ -81,12 +82,15 @@ func (p *Prober) StatusHandler(opts ...statusHandlerOpt) tsweb.ReturnHandlerFunc for name, info := range p.ProbeInfo() { vars.TotalProbes++ - if !info.Result { + if info.Error != "" { vars.UnhealthyProbes++ } s := probeStatus{ProbeInfo: info} + if !info.Start.IsZero() { + s.TimeSinceLastStart = time.Since(info.Start).Truncate(time.Second) + } if !info.End.IsZero() { - s.TimeSinceLast = time.Since(info.End).Truncate(time.Second) + s.TimeSinceLastEnd = time.Since(info.End).Truncate(time.Second) } for textTpl, urlTpl := range params.probeLinks { text, err := renderTemplate(textTpl, info) diff --git a/prober/status.html b/prober/status.html index ff0f06c13..d26588da1 100644 --- a/prober/status.html +++ b/prober/status.html @@ -73,8 +73,9 @@ Name Probe Class & Labels Interval - Last Attempt - Success + Last Finished + Last Started + Status Latency Last Error @@ -85,9 +86,11 @@ {{$name}} {{range $text, $url := $probeInfo.Links}}
- + {{if not $probeInfo.Continuous}} + + {{end}} {{end}} {{$probeInfo.Class}}
@@ -97,28 +100,48 @@ {{end}} - {{$probeInfo.Interval}} - - {{if $probeInfo.TimeSinceLast}} - {{$probeInfo.TimeSinceLast.String}} ago
+ + {{if $probeInfo.Continuous}} + Continuous + {{else}} + {{$probeInfo.Interval}} + {{end}} + + + {{if $probeInfo.TimeSinceLastEnd}} + {{$probeInfo.TimeSinceLastEnd.String}} ago
{{$probeInfo.End.Format "2006-01-02T15:04:05Z07:00"}} {{else}} Never {{end}} - - {{if $probeInfo.Result}} - {{$probeInfo.Result}} + + {{if $probeInfo.TimeSinceLastStart}} + {{$probeInfo.TimeSinceLastStart.String}} ago
+ {{$probeInfo.Start.Format "2006-01-02T15:04:05Z07:00"}} {{else}} - {{$probeInfo.Result}} + Never + {{end}} + + + {{if $probeInfo.Error}} + {{$probeInfo.Status}} + {{else}} + {{$probeInfo.Status}} {{end}}
-
Recent: {{$probeInfo.RecentResults}}
-
Mean: {{$probeInfo.RecentSuccessRatio}}
+ {{if not $probeInfo.Continuous}} +
Recent: {{$probeInfo.RecentResults}}
+
Mean: {{$probeInfo.RecentSuccessRatio}}
+ {{end}} - {{$probeInfo.Latency.String}} -
Recent: {{$probeInfo.RecentLatencies}}
-
Median: {{$probeInfo.RecentMedianLatency}}
+ {{if $probeInfo.Continuous}} + n/a + {{else}} + {{$probeInfo.Latency.String}} +
Recent: {{$probeInfo.RecentLatencies}}
+
Median: {{$probeInfo.RecentMedianLatency}}
+ {{end}} {{$probeInfo.Error}}