diff --git a/prober/prober.go b/prober/prober.go index dd910089f..071e8d3cf 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -13,8 +13,10 @@ "errors" "expvar" "fmt" + "hash/fnv" "io" "log" + "math/rand" "sort" "strings" "sync" @@ -28,6 +30,10 @@ // a Prober manages a set of probes and keeps track of their results. type Prober struct { + // Whether to spread probe execution over time by introducing a + // random delay before the first probe run. + spread bool + // Time-related functions that get faked out during tests. now func() time.Time newTicker func(time.Duration) ticker @@ -65,18 +71,17 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri } ctx, cancel := context.WithCancel(context.Background()) - ticker := p.newTicker(interval) probe := &Probe{ prober: p, ctx: ctx, cancel: cancel, stopped: make(chan struct{}), - name: name, - doProbe: fun, - interval: interval, - tick: ticker, - labels: labels, + name: name, + doProbe: fun, + interval: interval, + initialDelay: initialDelay(name, interval), + labels: labels, } p.probes[name] = probe go probe.loop() @@ -90,6 +95,13 @@ func (p *Prober) unregister(probe *Probe) { delete(p.probes, name) } +// WithSpread is used to enable random delay before the first run of +// each added probe. +func (p *Prober) WithSpread(s bool) *Prober { + p.spread = s + return p +} + // Reports the number of registered probes. For tests only. func (p *Prober) activeProbes() int { p.mu.Lock() @@ -105,11 +117,12 @@ type Probe struct { cancel context.CancelFunc // run to initiate shutdown stopped chan struct{} // closed when shutdown is complete - name string - doProbe ProbeFunc - interval time.Duration - tick ticker - labels map[string]string + name string + doProbe ProbeFunc + interval time.Duration + initialDelay time.Duration + tick ticker + labels map[string]string mu sync.Mutex start time.Time // last time doProbe started @@ -127,12 +140,26 @@ func (p *Probe) Close() error { } // probeLoop invokes runProbe on fun every interval. The first probe -// is run after interval. +// is run after a random delay (if spreading is enabled) or immediately. func (p *Probe) loop() { defer close(p.stopped) - // Do a first probe right away, so that the prober immediately exports results for everything. - p.run() + if p.prober.spread && p.initialDelay > 0 { + t := p.prober.newTicker(p.initialDelay) + select { + case <-t.Chan(): + p.run() + case <-p.ctx.Done(): + t.Stop() + return + } + t.Stop() + } else { + p.run() + } + + p.tick = p.prober.newTicker(p.interval) + defer p.tick.Stop() for { select { case <-p.tick.Chan(): @@ -310,3 +337,12 @@ func (t *realTicker) Chan() <-chan time.Time { func newRealTicker(d time.Duration) ticker { return &realTicker{time.NewTicker(d)} } + +// initialDelay returns a pseudorandom duration in [0, interval) that +// is based on the provided seed string. +func initialDelay(seed string, interval time.Duration) time.Duration { + h := fnv.New64() + fmt.Fprint(h, seed) + r := rand.New(rand.NewSource(int64(h.Sum64()))).Float64() + return time.Duration(float64(interval) * r) +} diff --git a/prober/prober_test.go b/prober/prober_test.go index 80e425e7d..cb4c3ddb3 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -60,7 +60,7 @@ func TestProberTiming(t *testing.T) { return nil }) - waitActiveProbes(t, p, 1) + waitActiveProbes(t, p, clk, 1) called() notCalled() @@ -74,6 +74,49 @@ func TestProberTiming(t *testing.T) { notCalled() } +func TestProberTimingSpread(t *testing.T) { + clk := newFakeTime() + p := newForTest(clk.Now, clk.NewTicker).WithSpread(true) + + invoked := make(chan struct{}, 1) + + notCalled := func() { + t.Helper() + select { + case <-invoked: + t.Fatal("probe was invoked earlier than expected") + default: + } + } + called := func() { + t.Helper() + select { + case <-invoked: + case <-time.After(2 * time.Second): + t.Fatal("probe wasn't invoked as expected") + } + } + + p.Run("test-spread-probe", probeInterval, nil, func(context.Context) error { + invoked <- struct{}{} + return nil + }) + + waitActiveProbes(t, p, clk, 1) + + notCalled() + // Name of the probe (test-spread-probe) has been chosen to ensure that + // the initial delay is smaller than half of the probe interval. + clk.Advance(halfProbeInterval) + called() + notCalled() + clk.Advance(quarterProbeInterval) + notCalled() + clk.Advance(probeInterval) + called() + notCalled() +} + func TestProberRun(t *testing.T) { clk := newFakeTime() p := newForTest(clk.Now, clk.NewTicker) @@ -111,7 +154,7 @@ func TestProberRun(t *testing.T) { } } - waitActiveProbes(t, p, startingProbes) + waitActiveProbes(t, p, clk, startingProbes) checkCnt(startingProbes) clk.Advance(probeInterval + halfProbeInterval) checkCnt(startingProbes) @@ -121,7 +164,7 @@ func TestProberRun(t *testing.T) { for i := keep; i < startingProbes; i++ { probes[i].Close() } - waitActiveProbes(t, p, keep) + waitActiveProbes(t, p, clk, keep) clk.Advance(probeInterval) checkCnt(keep) @@ -140,7 +183,7 @@ func TestExpvar(t *testing.T) { return errors.New("failing, as instructed by test") }) - waitActiveProbes(t, p, 1) + waitActiveProbes(t, p, clk, 1) check := func(name string, want probeInfo) { t.Helper() @@ -198,7 +241,7 @@ func TestPrometheus(t *testing.T) { return errors.New("failing, as instructed by test") }) - waitActiveProbes(t, p, 1) + waitActiveProbes(t, p, clk, 1) err := tstest.WaitFor(convergenceTimeout, func() error { var b bytes.Buffer @@ -326,6 +369,17 @@ func (t *fakeTime) Advance(d time.Duration) { } } +func (t *fakeTime) activeTickers() (count int) { + t.Lock() + defer t.Unlock() + for _, tick := range t.tickers { + if !tick.stopped { + count += 1 + } + } + return +} + func probeExpvar(t *testing.T, p *Prober) map[string]*probeInfo { t.Helper() s := p.Expvar().String() @@ -336,11 +390,14 @@ func probeExpvar(t *testing.T, p *Prober) map[string]*probeInfo { return ret } -func waitActiveProbes(t *testing.T, p *Prober, want int) { +func waitActiveProbes(t *testing.T, p *Prober, clk *fakeTime, want int) { t.Helper() err := tstest.WaitFor(convergenceTimeout, func() error { if got := p.activeProbes(); got != want { - return fmt.Errorf("active probe count is %d, want %d", got, want) + return fmt.Errorf("installed probe count is %d, want %d", got, want) + } + if got := clk.activeTickers(); got != want { + return fmt.Errorf("active ticker count is %d, want %d", got, want) } return nil })