mirror of
https://github.com/tailscale/tailscale.git
synced 2025-04-21 06:01:42 +00:00
prober: support adding key/value labels to probes. (#4250)
prober: add labels to Probe instances. This allows especially dynamically-registered probes to have a bunch more dimensions along which they can be sliced in Prometheus. Signed-off-by: David Anderson <danderson@tailscale.com>
This commit is contained in:
parent
f3b13604b3
commit
0968b2d55a
198
prober/prober.go
198
prober/prober.go
@ -9,13 +9,16 @@ package prober
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"tailscale.com/metrics"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProbeFunc is a function that probes something and reports whether
|
// ProbeFunc is a function that probes something and reports whether
|
||||||
@ -29,33 +32,6 @@ type Prober struct {
|
|||||||
now func() time.Time
|
now func() time.Time
|
||||||
newTicker func(time.Duration) ticker
|
newTicker func(time.Duration) ticker
|
||||||
|
|
||||||
// lastStart is the time, in seconds since epoch, of the last time
|
|
||||||
// each probe started a probe cycle.
|
|
||||||
lastStart metrics.LabelMap
|
|
||||||
// lastEnd is the time, in seconds since epoch, of the last time
|
|
||||||
// each probe finished a probe cycle.
|
|
||||||
lastEnd metrics.LabelMap
|
|
||||||
// lastResult records whether probes succeeded. A successful probe
|
|
||||||
// is recorded as 1, a failure as 0.
|
|
||||||
lastResult metrics.LabelMap
|
|
||||||
// lastLatency records how long the last probe cycle took for each
|
|
||||||
// probe, in milliseconds.
|
|
||||||
lastLatency metrics.LabelMap
|
|
||||||
// probeInterval records the time in seconds between successive
|
|
||||||
// runs of each probe.
|
|
||||||
//
|
|
||||||
// This is to help Prometheus figure out how long a probe should
|
|
||||||
// be failing before it fires an alert for it. To avoid random
|
|
||||||
// background noise, you want it to wait for more than 1
|
|
||||||
// datapoint, but you also can't use a fixed interval because some
|
|
||||||
// probes might run every few seconds, while e.g. TLS certificate
|
|
||||||
// expiry might only run once a day.
|
|
||||||
//
|
|
||||||
// So, for each probe, the prober tells Prometheus how often it
|
|
||||||
// runs, so that the alert can autotune itself to eliminate noise
|
|
||||||
// without being excessively delayed.
|
|
||||||
probeInterval metrics.LabelMap
|
|
||||||
|
|
||||||
mu sync.Mutex // protects all following fields
|
mu sync.Mutex // protects all following fields
|
||||||
probes map[string]*Probe
|
probes map[string]*Probe
|
||||||
}
|
}
|
||||||
@ -67,32 +43,21 @@ func New() *Prober {
|
|||||||
|
|
||||||
func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober {
|
func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober {
|
||||||
return &Prober{
|
return &Prober{
|
||||||
now: now,
|
now: now,
|
||||||
newTicker: newTicker,
|
newTicker: newTicker,
|
||||||
lastStart: metrics.LabelMap{Label: "probe"},
|
probes: map[string]*Probe{},
|
||||||
lastEnd: metrics.LabelMap{Label: "probe"},
|
|
||||||
lastResult: metrics.LabelMap{Label: "probe"},
|
|
||||||
lastLatency: metrics.LabelMap{Label: "probe"},
|
|
||||||
probeInterval: metrics.LabelMap{Label: "probe"},
|
|
||||||
probes: map[string]*Probe{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expvar returns the metrics for running probes.
|
// Expvar returns the metrics for running probes.
|
||||||
func (p *Prober) Expvar() *metrics.Set {
|
func (p *Prober) Expvar() expvar.Var {
|
||||||
ret := new(metrics.Set)
|
return varExporter{p}
|
||||||
ret.Set("start_secs", &p.lastStart)
|
|
||||||
ret.Set("end_secs", &p.lastEnd)
|
|
||||||
ret.Set("result", &p.lastResult)
|
|
||||||
ret.Set("latency_millis", &p.lastLatency)
|
|
||||||
ret.Set("interval_secs", &p.probeInterval)
|
|
||||||
return ret
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run executes fun every interval, and exports probe results under probeName.
|
// Run executes fun every interval, and exports probe results under probeName.
|
||||||
//
|
//
|
||||||
// Registering a probe under an already-registered name panics.
|
// Registering a probe under an already-registered name panics.
|
||||||
func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) *Probe {
|
func (p *Prober) Run(name string, interval time.Duration, labels map[string]string, fun ProbeFunc) *Probe {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
if _, ok := p.probes[name]; ok {
|
if _, ok := p.probes[name]; ok {
|
||||||
@ -111,9 +76,9 @@ func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) *Probe
|
|||||||
doProbe: fun,
|
doProbe: fun,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
tick: ticker,
|
tick: ticker,
|
||||||
|
labels: labels,
|
||||||
}
|
}
|
||||||
p.probes[name] = probe
|
p.probes[name] = probe
|
||||||
p.probeInterval.Get(name).Set(int64(interval.Seconds()))
|
|
||||||
go probe.loop()
|
go probe.loop()
|
||||||
return probe
|
return probe
|
||||||
}
|
}
|
||||||
@ -123,11 +88,6 @@ func (p *Prober) unregister(probe *Probe) {
|
|||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
name := probe.name
|
name := probe.name
|
||||||
delete(p.probes, name)
|
delete(p.probes, name)
|
||||||
p.lastStart.Delete(name)
|
|
||||||
p.lastEnd.Delete(name)
|
|
||||||
p.lastResult.Delete(name)
|
|
||||||
p.lastLatency.Delete(name)
|
|
||||||
p.probeInterval.Delete(name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reports the number of registered probes. For tests only.
|
// Reports the number of registered probes. For tests only.
|
||||||
@ -149,6 +109,12 @@ type Probe struct {
|
|||||||
doProbe ProbeFunc
|
doProbe ProbeFunc
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
tick ticker
|
tick ticker
|
||||||
|
labels map[string]string
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
start time.Time // last time doProbe started
|
||||||
|
end time.Time // last time doProbe returned
|
||||||
|
result bool // whether the last doProbe call succeeded
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close shuts down the Probe and unregisters it from its Prober.
|
// Close shuts down the Probe and unregisters it from its Prober.
|
||||||
@ -183,7 +149,7 @@ func (p *Probe) loop() {
|
|||||||
// the probe either succeeds or fails before the next cycle is
|
// the probe either succeeds or fails before the next cycle is
|
||||||
// scheduled to start.
|
// scheduled to start.
|
||||||
func (p *Probe) run() {
|
func (p *Probe) run() {
|
||||||
start := p.start()
|
start := p.recordStart()
|
||||||
defer func() {
|
defer func() {
|
||||||
// Prevent a panic within one probe function from killing the
|
// Prevent a panic within one probe function from killing the
|
||||||
// entire prober, so that a single buggy probe doesn't destroy
|
// entire prober, so that a single buggy probe doesn't destroy
|
||||||
@ -192,7 +158,7 @@ func (p *Probe) run() {
|
|||||||
// alert for debugging.
|
// alert for debugging.
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Printf("probe %s panicked: %v", p.name, r)
|
log.Printf("probe %s panicked: %v", p.name, r)
|
||||||
p.end(start, errors.New("panic"))
|
p.recordEnd(start, errors.New("panic"))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
timeout := time.Duration(float64(p.interval) * 0.8)
|
timeout := time.Duration(float64(p.interval) * 0.8)
|
||||||
@ -200,27 +166,131 @@ func (p *Probe) run() {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
err := p.doProbe(ctx)
|
err := p.doProbe(ctx)
|
||||||
p.end(start, err)
|
p.recordEnd(start, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("probe %s: %v", p.name, err)
|
log.Printf("probe %s: %v", p.name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Probe) start() time.Time {
|
func (p *Probe) recordStart() time.Time {
|
||||||
st := p.prober.now()
|
st := p.prober.now()
|
||||||
p.prober.lastStart.Get(p.name).Set(st.Unix())
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
p.start = st
|
||||||
return st
|
return st
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Probe) end(start time.Time, err error) {
|
func (p *Probe) recordEnd(start time.Time, err error) {
|
||||||
end := p.prober.now()
|
end := p.prober.now()
|
||||||
p.prober.lastEnd.Get(p.name).Set(end.Unix())
|
p.mu.Lock()
|
||||||
p.prober.lastLatency.Get(p.name).Set(end.Sub(start).Milliseconds())
|
defer p.mu.Unlock()
|
||||||
v := int64(1)
|
p.end = end
|
||||||
if err != nil {
|
p.result = err == nil
|
||||||
v = 0
|
}
|
||||||
|
|
||||||
|
type varExporter struct {
|
||||||
|
p *Prober
|
||||||
|
}
|
||||||
|
|
||||||
|
// probeInfo is the state of a Probe. Used in expvar-format debug
|
||||||
|
// data.
|
||||||
|
type probeInfo struct {
|
||||||
|
Labels map[string]string
|
||||||
|
Start time.Time
|
||||||
|
End time.Time
|
||||||
|
Latency string // as a string because time.Duration doesn't encode readably to JSON
|
||||||
|
Result bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// String implements expvar.Var, returning the prober's state as an
|
||||||
|
// encoded JSON map of probe name to its probeInfo.
|
||||||
|
func (v varExporter) String() string {
|
||||||
|
out := map[string]probeInfo{}
|
||||||
|
|
||||||
|
v.p.mu.Lock()
|
||||||
|
probes := make([]*Probe, 0, len(v.p.probes))
|
||||||
|
for _, probe := range v.p.probes {
|
||||||
|
probes = append(probes, probe)
|
||||||
|
}
|
||||||
|
v.p.mu.Unlock()
|
||||||
|
|
||||||
|
for _, probe := range probes {
|
||||||
|
probe.mu.Lock()
|
||||||
|
inf := probeInfo{
|
||||||
|
Labels: probe.labels,
|
||||||
|
Start: probe.start,
|
||||||
|
End: probe.end,
|
||||||
|
Result: probe.result,
|
||||||
|
}
|
||||||
|
if probe.end.After(probe.start) {
|
||||||
|
inf.Latency = probe.end.Sub(probe.start).String()
|
||||||
|
}
|
||||||
|
out[probe.name] = inf
|
||||||
|
probe.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
bs, err := json.Marshal(out)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Sprintf(`{"error": %q}`, err)
|
||||||
|
}
|
||||||
|
return string(bs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WritePrometheus writes the the state of all probes to w.
|
||||||
|
//
|
||||||
|
// For each probe, WritePrometheus exports 5 variables:
|
||||||
|
// - <prefix>_interval_secs, how frequently the probe runs.
|
||||||
|
// - <prefix>_start_secs, when the probe last started running, in seconds since epoch.
|
||||||
|
// - <prefix>_end_secs, when the probe last finished running, in seconds since epoch.
|
||||||
|
// - <prefix>_latency_millis, how long the last probe cycle took, in
|
||||||
|
// milliseconds. This is just (end_secs-start_secs) in an easier to
|
||||||
|
// graph form.
|
||||||
|
// - <prefix>_result, 1 if the last probe succeeded, 0 if it failed.
|
||||||
|
//
|
||||||
|
// Each probe has a set of static key/value labels (defined once at
|
||||||
|
// probe creation), which are added as Prometheus metric labels to
|
||||||
|
// that probe's variables.
|
||||||
|
func (v varExporter) WritePrometheus(w io.Writer, prefix string) {
|
||||||
|
v.p.mu.Lock()
|
||||||
|
probes := make([]*Probe, 0, len(v.p.probes))
|
||||||
|
for _, probe := range v.p.probes {
|
||||||
|
probes = append(probes, probe)
|
||||||
|
}
|
||||||
|
v.p.mu.Unlock()
|
||||||
|
|
||||||
|
sort.Slice(probes, func(i, j int) bool {
|
||||||
|
return probes[i].name < probes[j].name
|
||||||
|
})
|
||||||
|
for _, probe := range probes {
|
||||||
|
probe.mu.Lock()
|
||||||
|
keys := make([]string, 0, len(probe.labels))
|
||||||
|
for k := range probe.labels {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
var sb strings.Builder
|
||||||
|
fmt.Fprintf(&sb, "name=%q", probe.name)
|
||||||
|
for _, k := range keys {
|
||||||
|
fmt.Fprintf(&sb, ",%s=%q", k, probe.labels[k])
|
||||||
|
}
|
||||||
|
labels := sb.String()
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "%s_interval_secs{%s} %f\n", prefix, labels, probe.interval.Seconds())
|
||||||
|
if !probe.start.IsZero() {
|
||||||
|
fmt.Fprintf(w, "%s_start_secs{%s} %d\n", prefix, labels, probe.start.Unix())
|
||||||
|
}
|
||||||
|
if !probe.end.IsZero() {
|
||||||
|
fmt.Fprintf(w, "%s_end_secs{%s} %d\n", prefix, labels, probe.end.Unix())
|
||||||
|
// Start is always present if end is.
|
||||||
|
fmt.Fprintf(w, "%s_latency_millis{%s} %d\n", prefix, labels, probe.end.Sub(probe.start).Milliseconds())
|
||||||
|
if probe.result {
|
||||||
|
fmt.Fprintf(w, "%s_result{%s} 1\n", prefix, labels)
|
||||||
|
} else {
|
||||||
|
fmt.Fprintf(w, "%s_result{%s} 0\n", prefix, labels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
probe.mu.Unlock()
|
||||||
}
|
}
|
||||||
p.prober.lastResult.Get(p.name).Set(v)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ticker wraps a time.Ticker in a way that can be faked for tests.
|
// ticker wraps a time.Ticker in a way that can be faked for tests.
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
package prober
|
package prober
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
@ -14,8 +15,10 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
"tailscale.com/syncs"
|
"tailscale.com/syncs"
|
||||||
"tailscale.com/tstest"
|
"tailscale.com/tstest"
|
||||||
|
"tailscale.com/tsweb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -24,6 +27,7 @@ const (
|
|||||||
quarterProbeInterval = probeInterval / 4
|
quarterProbeInterval = probeInterval / 4
|
||||||
convergenceTimeout = time.Second
|
convergenceTimeout = time.Second
|
||||||
convergenceSleep = time.Millisecond
|
convergenceSleep = time.Millisecond
|
||||||
|
aFewMillis = 20 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
var epoch = time.Unix(0, 0)
|
var epoch = time.Unix(0, 0)
|
||||||
@ -51,7 +55,7 @@ func TestProberTiming(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Run("test-probe", probeInterval, func(context.Context) error {
|
p.Run("test-probe", probeInterval, nil, func(context.Context) error {
|
||||||
invoked <- struct{}{}
|
invoked <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -83,7 +87,7 @@ func TestProberRun(t *testing.T) {
|
|||||||
var probes []*Probe
|
var probes []*Probe
|
||||||
|
|
||||||
for i := 0; i < startingProbes; i++ {
|
for i := 0; i < startingProbes; i++ {
|
||||||
probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, func(context.Context) error {
|
probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, nil, func(context.Context) error {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
cnt++
|
cnt++
|
||||||
@ -92,6 +96,7 @@ func TestProberRun(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
checkCnt := func(want int) {
|
checkCnt := func(want int) {
|
||||||
|
t.Helper()
|
||||||
err := tstest.WaitFor(convergenceTimeout, func() error {
|
err := tstest.WaitFor(convergenceTimeout, func() error {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
@ -126,9 +131,8 @@ func TestExpvar(t *testing.T) {
|
|||||||
clk := newFakeTime()
|
clk := newFakeTime()
|
||||||
p := newForTest(clk.Now, clk.NewTicker)
|
p := newForTest(clk.Now, clk.NewTicker)
|
||||||
|
|
||||||
const aFewMillis = 20 * time.Millisecond
|
|
||||||
var succeed syncs.AtomicBool
|
var succeed syncs.AtomicBool
|
||||||
p.Run("probe", probeInterval, func(context.Context) error {
|
p.Run("probe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error {
|
||||||
clk.Advance(aFewMillis)
|
clk.Advance(aFewMillis)
|
||||||
if succeed.Get() {
|
if succeed.Get() {
|
||||||
return nil
|
return nil
|
||||||
@ -138,20 +142,106 @@ func TestExpvar(t *testing.T) {
|
|||||||
|
|
||||||
waitActiveProbes(t, p, 1)
|
waitActiveProbes(t, p, 1)
|
||||||
|
|
||||||
waitExpInt(t, p, "start_secs/probe", 0)
|
check := func(name string, want probeInfo) {
|
||||||
waitExpInt(t, p, "end_secs/probe", 0)
|
t.Helper()
|
||||||
waitExpInt(t, p, "interval_secs/probe", int(probeInterval.Seconds()))
|
err := tstest.WaitFor(convergenceTimeout, func() error {
|
||||||
waitExpInt(t, p, "latency_millis/probe", int(aFewMillis.Milliseconds()))
|
vars := probeExpvar(t, p)
|
||||||
waitExpInt(t, p, "result/probe", 0)
|
if got, want := len(vars), 1; got != want {
|
||||||
|
return fmt.Errorf("wrong probe count in expvar, got %d want %d", got, want)
|
||||||
|
}
|
||||||
|
for k, v := range vars {
|
||||||
|
if k != name {
|
||||||
|
return fmt.Errorf("wrong probe name in expvar, got %q want %q", k, name)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(v, &want); diff != "" {
|
||||||
|
return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
check("probe", probeInfo{
|
||||||
|
Labels: map[string]string{"label": "value"},
|
||||||
|
Start: epoch,
|
||||||
|
End: epoch.Add(aFewMillis),
|
||||||
|
Latency: aFewMillis.String(),
|
||||||
|
Result: false,
|
||||||
|
})
|
||||||
|
|
||||||
succeed.Set(true)
|
succeed.Set(true)
|
||||||
clk.Advance(probeInterval + halfProbeInterval)
|
clk.Advance(probeInterval + halfProbeInterval)
|
||||||
|
|
||||||
waitExpInt(t, p, "start_secs/probe", int((probeInterval + halfProbeInterval).Seconds()))
|
st := epoch.Add(probeInterval + halfProbeInterval + aFewMillis)
|
||||||
waitExpInt(t, p, "end_secs/probe", int((probeInterval + halfProbeInterval).Seconds()))
|
check("probe", probeInfo{
|
||||||
waitExpInt(t, p, "interval_secs/probe", int(probeInterval.Seconds()))
|
Labels: map[string]string{"label": "value"},
|
||||||
waitExpInt(t, p, "latency_millis/probe", int(aFewMillis.Milliseconds()))
|
Start: st,
|
||||||
waitExpInt(t, p, "result/probe", 1)
|
End: st.Add(aFewMillis),
|
||||||
|
Latency: aFewMillis.String(),
|
||||||
|
Result: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrometheus(t *testing.T) {
|
||||||
|
clk := newFakeTime()
|
||||||
|
p := newForTest(clk.Now, clk.NewTicker)
|
||||||
|
|
||||||
|
var succeed syncs.AtomicBool
|
||||||
|
p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error {
|
||||||
|
clk.Advance(aFewMillis)
|
||||||
|
if succeed.Get() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("failing, as instructed by test")
|
||||||
|
})
|
||||||
|
|
||||||
|
waitActiveProbes(t, p, 1)
|
||||||
|
|
||||||
|
err := tstest.WaitFor(convergenceTimeout, func() error {
|
||||||
|
var b bytes.Buffer
|
||||||
|
p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe")
|
||||||
|
want := strings.TrimSpace(fmt.Sprintf(`
|
||||||
|
probe_interval_secs{name="testprobe",label="value"} %f
|
||||||
|
probe_start_secs{name="testprobe",label="value"} %d
|
||||||
|
probe_end_secs{name="testprobe",label="value"} %d
|
||||||
|
probe_latency_millis{name="testprobe",label="value"} %d
|
||||||
|
probe_result{name="testprobe",label="value"} 0
|
||||||
|
`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix(), aFewMillis.Milliseconds()))
|
||||||
|
if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" {
|
||||||
|
return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
succeed.Set(true)
|
||||||
|
clk.Advance(probeInterval + halfProbeInterval)
|
||||||
|
|
||||||
|
err = tstest.WaitFor(convergenceTimeout, func() error {
|
||||||
|
var b bytes.Buffer
|
||||||
|
p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe")
|
||||||
|
start := epoch.Add(probeInterval + halfProbeInterval)
|
||||||
|
end := start.Add(aFewMillis)
|
||||||
|
want := strings.TrimSpace(fmt.Sprintf(`
|
||||||
|
probe_interval_secs{name="testprobe",label="value"} %f
|
||||||
|
probe_start_secs{name="testprobe",label="value"} %d
|
||||||
|
probe_end_secs{name="testprobe",label="value"} %d
|
||||||
|
probe_latency_millis{name="testprobe",label="value"} %d
|
||||||
|
probe_result{name="testprobe",label="value"} 1
|
||||||
|
`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds()))
|
||||||
|
if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" {
|
||||||
|
return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeTicker struct {
|
type fakeTicker struct {
|
||||||
@ -185,7 +275,9 @@ func (t *fakeTicker) fire(now time.Time) {
|
|||||||
case t.ch <- now:
|
case t.ch <- now:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
t.next = now.Add(t.interval)
|
for now.After(t.next) {
|
||||||
|
t.next = t.next.Add(t.interval)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeTime struct {
|
type fakeTime struct {
|
||||||
@ -200,7 +292,6 @@ func newFakeTime() *fakeTime {
|
|||||||
curTime: epoch,
|
curTime: epoch,
|
||||||
}
|
}
|
||||||
ret.Cond = &sync.Cond{L: &ret.Mutex}
|
ret.Cond = &sync.Cond{L: &ret.Mutex}
|
||||||
ret.Advance(time.Duration(1)) // so that Now never IsZero
|
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,8 +299,6 @@ func (t *fakeTime) Now() time.Time {
|
|||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
ret := t.curTime
|
ret := t.curTime
|
||||||
// so that time always seems to advance for the program under test
|
|
||||||
t.curTime = t.curTime.Add(time.Microsecond)
|
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -237,47 +326,14 @@ func (t *fakeTime) Advance(d time.Duration) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitExpInt(t *testing.T, p *Prober, path string, want int) {
|
func probeExpvar(t *testing.T, p *Prober) map[string]*probeInfo {
|
||||||
t.Helper()
|
|
||||||
err := tstest.WaitFor(convergenceTimeout, func() error {
|
|
||||||
got, ok := getExpInt(t, p, path)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("expvar %q did not get set", path)
|
|
||||||
}
|
|
||||||
if got != want {
|
|
||||||
return fmt.Errorf("expvar %q is %d, want %d", path, got, want)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getExpInt(t *testing.T, p *Prober, path string) (ret int, ok bool) {
|
|
||||||
t.Helper()
|
t.Helper()
|
||||||
s := p.Expvar().String()
|
s := p.Expvar().String()
|
||||||
dec := map[string]interface{}{}
|
ret := map[string]*probeInfo{}
|
||||||
if err := json.Unmarshal([]byte(s), &dec); err != nil {
|
if err := json.Unmarshal([]byte(s), &ret); err != nil {
|
||||||
t.Fatalf("couldn't unmarshal expvar data: %v", err)
|
t.Fatalf("expvar json decode failed: %v", err)
|
||||||
}
|
}
|
||||||
var v interface{} = dec
|
return ret
|
||||||
for _, d := range strings.Split(path, "/") {
|
|
||||||
m, ok := v.(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("expvar path %q ended early with a leaf value", path)
|
|
||||||
}
|
|
||||||
child, ok := m[d]
|
|
||||||
if !ok {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
v = child
|
|
||||||
}
|
|
||||||
f, ok := v.(float64)
|
|
||||||
if !ok {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
return int(f), true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitActiveProbes(t *testing.T, p *Prober, want int) {
|
func waitActiveProbes(t *testing.T, p *Prober, want int) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user