tstime/rate: add Value (#7491)

Add Value, which measures the rate at which an event occurs,
exponentially weighted towards recent activity.
It is guaranteed to occupy O(1) memory, operate in O(1) runtime,
and is safe for concurrent use.

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
This commit is contained in:
Joe Tsai 2023-03-09 11:13:09 -08:00 committed by GitHub
parent 4c2f67a1d0
commit 87b4bbb94f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 427 additions and 6 deletions

View File

@ -31,12 +31,14 @@ func Every(interval time.Duration) Limit {
}
// A Limiter controls how frequently events are allowed to happen.
// It implements a "token bucket" of size b, initially full and refilled
// at rate r tokens per second.
// Informally, in any large enough time interval, the Limiter limits the
// rate to r tokens per second, with a maximum burst size of b events.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
// It implements a [token bucket] of a particular size b,
// initially full and refilled at rate r tokens per second.
// Informally, in any large enough time interval,
// the Limiter limits the rate to r tokens per second,
// with a maximum burst size of b events.
// Use NewLimiter to create non-zero Limiters.
//
// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
type Limiter struct {
limit Limit
burst float64
@ -54,7 +56,7 @@ func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{limit: r, burst: float64(b)}
}
// AllowN reports whether an event may happen now.
// Allow reports whether an event may happen now.
func (lim *Limiter) Allow() bool {
return lim.allow(mono.Now())
}

183
tstime/rate/value.go Normal file
View File

@ -0,0 +1,183 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package rate
import (
"fmt"
"math"
"sync"
"time"
"tailscale.com/tstime/mono"
)
// Value measures the rate at which events occur,
// exponentially weighted towards recent activity.
// It is guaranteed to occupy O(1) memory, operate in O(1) runtime,
// and is safe for concurrent use.
// The zero value is safe for immediate use.
//
// The algorithm is based on and semantically equivalent to
// [exponentially weighted moving averages (EWMAs)],
// but modified to avoid assuming that event samples are gathered
// at fixed and discrete time-step intervals.
//
// In EWMA literature, the average is typically tuned with a λ parameter
// that determines how much weight to give to recent event samples.
// A high λ value reacts quickly to new events favoring recent history,
// while a low λ value reacts more slowly to new events.
// The EWMA is computed as:
//
// zᵢ = λxᵢ + (1-λ)zᵢ₋₁
//
// where:
// - λ is the weight parameter, where 0 ≤ λ ≤ 1
// - xᵢ is the number of events that has since occurred
// - zᵢ is the newly computed moving average
// - zᵢ₋₁ is the previous moving average one time-step ago
//
// As mentioned, this implementation does not assume that the average
// is updated periodically on a fixed time-step interval,
// but allows the application to indicate that events occurred
// at any point in time by simply calling Value.Add.
// Thus, for every time Value.Add is called, it takes into consideration
// the amount of time elapsed since the last call to Value.Add as
// opposed to assuming that every call to Value.Add is evenly spaced
// some fixed time-step interval apart.
//
// Since time is critical to this measurement, we tune the metric not
// with the weight parameter λ (a unit-less constant between 0 and 1),
// but rather as a half-life period t½. The half-life period is
// mathematically equivalent but easier for humans to reason about.
// The parameters λ and t½ and directly related in the following way:
//
// t½ = -(ln(2) · ΔT) / ln(1 - λ)
//
// λ = 1 - 2^-(ΔT / t½)
//
// where:
// - t½ is the half-life commonly used with exponential decay
// - λ is the unit-less weight parameter commonly used with EWMAs
// - ΔT is the discrete time-step interval used with EWMAs
//
// The internal algorithm does not use the EWMA formula,
// but is rather based on [half-life decay].
// The formula for half-life decay is mathematically related
// to the formula for computing the EWMA.
// The calculation of an EWMA is a geometric progression [[1]] and
// is essentially a discrete version of an exponential function [[2]],
// for which half-life decay is one particular expression.
// Given sufficiently small time-steps, the EWMA and half-life
// algorithms provide equivalent results.
//
// The Value type does not take ΔT as a parameter since it relies
// on a timer with nanosecond resolution. In a way, one could treat
// this algorithm as operating on a ΔT of 1ns. Practically speaking,
// the computation operates on non-discrete time intervals.
//
// [exponentially weighted moving averages (EWMAs)]: https://en.wikipedia.org/wiki/EWMA_chart
// [half-life decay]: https://en.wikipedia.org/wiki/Half-life
// [1]: https://en.wikipedia.org/wiki/Exponential_smoothing#%22Exponential%22_naming
// [2]: https://en.wikipedia.org/wiki/Exponential_decay
type Value struct {
// HalfLife specifies how quickly the rate reacts to rate changes.
//
// Specifically, if there is currently a steady-state rate of
// 0 events per second, and then immediately the rate jumped to
// N events per second, then it will take HalfLife seconds until
// the Value represents a rate of N/2 events per second and
// 2*HalfLife seconds until the Value represents a rate of 3*N/4
// events per second, and so forth. The rate represented by Value
// will asymptotically approach N events per second over time.
//
// In order for Value to stably represent a steady-state rate,
// the HalfLife should be larger than the average period between
// calls to Value.Add.
//
// A zero or negative HalfLife is by default 1 second.
HalfLife time.Duration
mu sync.Mutex
updated mono.Time
value float64 // adjusted count of events
}
// halfLife returns the half-life period in seconds.
func (r *Value) halfLife() float64 {
if r.HalfLife <= 0 {
return time.Second.Seconds()
}
return time.Duration(r.HalfLife).Seconds()
}
// Add records that n number of events just occurred,
// which must be a finite and non-negative number.
func (r *Value) Add(n float64) {
r.mu.Lock()
defer r.mu.Unlock()
r.addNow(mono.Now(), n)
}
func (r *Value) addNow(now mono.Time, n float64) {
if n < 0 || math.IsInf(n, 0) || math.IsNaN(n) {
panic(fmt.Sprintf("invalid count %f; must be a finite, non-negative number", n))
}
r.value = r.valueNow(now) + n
r.updated = now
}
// valueNow computes the number of events after some elapsed time.
// The total count of events decay exponentially so that
// the computed rate is biased towards recent history.
func (r *Value) valueNow(now mono.Time) float64 {
// This uses the half-life formula:
// N(t) = N₀ · 2^-(t / t½)
// where:
// N(t) is the amount remaining after time t,
// N₀ is the initial quantity, and
// t½ is the half-life of the decaying quantity.
//
// See https://en.wikipedia.org/wiki/Half-life
age := now.Sub(r.updated).Seconds()
return r.value * math.Exp2(-age/r.halfLife())
}
// Rate computes the rate as events per second.
func (r *Value) Rate() float64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.rateNow(mono.Now())
}
func (r *Value) rateNow(now mono.Time) float64 {
// The stored value carries the units "events"
// while we want to compute "events / second".
//
// In the trivial case where the events never decay,
// the average rate can be computed by dividing the total events
// by the total elapsed time since the start of the Value.
// This works because the weight distribution is uniform such that
// the weight of an event in the distant past is equal to
// the weight of a recent event. This is not the case with
// exponentially decaying weights, which complicates computation.
//
// Since our events are decaying, we can divide the number of events
// by the total possible accumulated value, which we determine
// by integrating the half-life formula from t=0 until t=∞,
// assuming that N₀ is 1:
// ∫ N(t) dt = t½ / ln(2)
//
// Recall that the integral of a curve is the area under a curve,
// which carries the units of the X-axis multiplied by the Y-axis.
// In our case this would be the units "events · seconds".
// By normalizing N₀ to 1, the Y-axis becomes a unit-less quantity,
// resulting in a integral unit of just "seconds".
// Dividing the events by the integral quantity correctly produces
// the units of "events / second".
return r.valueNow(now) / r.normalizedIntegral()
}
// normalizedIntegral computes the quantity t½ / ln(2).
// It carries the units of "seconds".
func (r *Value) normalizedIntegral() float64 {
return r.halfLife() / math.Ln2
}

236
tstime/rate/value_test.go Normal file
View File

@ -0,0 +1,236 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package rate
import (
"flag"
"math"
"testing"
"time"
qt "github.com/frankban/quicktest"
"github.com/google/go-cmp/cmp/cmpopts"
"tailscale.com/tstime/mono"
)
const (
min = mono.Time(time.Minute)
sec = mono.Time(time.Second)
msec = mono.Time(time.Millisecond)
usec = mono.Time(time.Microsecond)
nsec = mono.Time(time.Nanosecond)
val = 1.0e6
)
var longNumericalStabilityTest = flag.Bool("long-numerical-stability-test", false, "")
func TestValue(t *testing.T) {
// When performing many small calculations, the accuracy of the
// result can drift due to accumulated errors in the calculation.
// Verify that the result is correct even with many small updates.
// See https://en.wikipedia.org/wiki/Numerical_stability.
t.Run("NumericalStability", func(t *testing.T) {
step := usec
if *longNumericalStabilityTest {
step = nsec
}
numStep := int(sec / step)
c := qt.New(t)
var v Value
var now mono.Time
for i := 0; i < numStep; i++ {
v.addNow(now, float64(step))
now += step
}
c.Assert(v.rateNow(now), qt.CmpEquals(cmpopts.EquateApprox(1e-6, 0)), 1e9/2)
})
halfLives := []struct {
name string
period time.Duration
}{
{"½s", time.Second / 2},
{"1s", time.Second},
{"2s", 2 * time.Second},
}
for _, halfLife := range halfLives {
t.Run(halfLife.name+"/SpikeDecay", func(t *testing.T) {
testValueSpikeDecay(t, halfLife.period, false)
})
t.Run(halfLife.name+"/SpikeDecayAddZero", func(t *testing.T) {
testValueSpikeDecay(t, halfLife.period, true)
})
t.Run(halfLife.name+"/HighThenLow", func(t *testing.T) {
testValueHighThenLow(t, halfLife.period)
})
t.Run(halfLife.name+"/LowFrequency", func(t *testing.T) {
testLowFrequency(t, halfLife.period)
})
}
}
// testValueSpikeDecay starts with a target rate and ensure that it
// exponentially decays according to the half-life formula.
func testValueSpikeDecay(t *testing.T, halfLife time.Duration, addZero bool) {
c := qt.New(t)
v := Value{HalfLife: halfLife}
v.addNow(0, val*v.normalizedIntegral())
var now mono.Time
var prevRate float64
step := 100 * msec
wantHalfRate := float64(val)
for now < 10*sec {
// Adding zero for every time-step will repeatedly trigger the
// computation to decay the value, which may cause the result
// to become more numerically unstable.
if addZero {
v.addNow(now, 0)
}
currRate := v.rateNow(now)
t.Logf("%0.1fs:\t%0.3f", time.Duration(now).Seconds(), currRate)
// At every multiple of a half-life period,
// the current rate should be half the value of what
// it was at the last half-life period.
if time.Duration(now)%halfLife == 0 {
c.Assert(currRate, qt.CmpEquals(cmpopts.EquateApprox(1e-12, 0)), wantHalfRate)
wantHalfRate = currRate / 2
}
// Without any newly added events,
// the rate should be decaying over time.
if now > 0 && prevRate < currRate {
t.Errorf("%v: rate is not decaying: %0.1f < %0.1f", time.Duration(now), prevRate, currRate)
}
if currRate < 0 {
t.Errorf("%v: rate too low: %0.1f < %0.1f", time.Duration(now), currRate, 0.0)
}
prevRate = currRate
now += step
}
}
// testValueHighThenLow targets a steady-state rate that is high,
// then switches to a target steady-state rate that is low.
func testValueHighThenLow(t *testing.T, halfLife time.Duration) {
c := qt.New(t)
v := Value{HalfLife: halfLife}
var now mono.Time
var prevRate float64
var wantRate float64
const step = 10 * msec
const stepsPerSecond = int(sec / step)
// Target a higher steady-state rate.
wantRate = 2 * val
wantHalfRate := float64(0.0)
eventsPerStep := wantRate / float64(stepsPerSecond)
for now < 10*sec {
currRate := v.rateNow(now)
v.addNow(now, eventsPerStep)
t.Logf("%0.1fs:\t%0.3f", time.Duration(now).Seconds(), currRate)
// At every multiple of a half-life period,
// the current rate should be half-way more towards
// the target rate relative to before.
if time.Duration(now)%halfLife == 0 {
c.Assert(currRate, qt.CmpEquals(cmpopts.EquateApprox(0.1, 0)), wantHalfRate)
wantHalfRate += (wantRate - currRate) / 2
}
// Rate should approach wantRate from below,
// but never exceed it.
if now > 0 && prevRate > currRate {
t.Errorf("%v: rate is not growing: %0.1f > %0.1f", time.Duration(now), prevRate, currRate)
}
if currRate > 1.01*wantRate {
t.Errorf("%v: rate too high: %0.1f > %0.1f", time.Duration(now), currRate, wantRate)
}
prevRate = currRate
now += step
}
c.Assert(prevRate, qt.CmpEquals(cmpopts.EquateApprox(0.05, 0)), wantRate)
// Target a lower steady-state rate.
wantRate = val / 3
wantHalfRate = prevRate
eventsPerStep = wantRate / float64(stepsPerSecond)
for now < 20*sec {
currRate := v.rateNow(now)
v.addNow(now, eventsPerStep)
t.Logf("%0.1fs:\t%0.3f", time.Duration(now).Seconds(), currRate)
// At every multiple of a half-life period,
// the current rate should be half-way more towards
// the target rate relative to before.
if time.Duration(now)%halfLife == 0 {
c.Assert(currRate, qt.CmpEquals(cmpopts.EquateApprox(0.1, 0)), wantHalfRate)
wantHalfRate += (wantRate - currRate) / 2
}
// Rate should approach wantRate from above,
// but never exceed it.
if now > 10*sec && prevRate < currRate {
t.Errorf("%v: rate is not decaying: %0.1f < %0.1f", time.Duration(now), prevRate, currRate)
}
if currRate < 0.99*wantRate {
t.Errorf("%v: rate too low: %0.1f < %0.1f", time.Duration(now), currRate, wantRate)
}
prevRate = currRate
now += step
}
c.Assert(prevRate, qt.CmpEquals(cmpopts.EquateApprox(0.15, 0)), wantRate)
}
// testLowFrequency fires an event at a frequency much slower than
// the specified half-life period. While the average rate over time
// should be accurate, the standard deviation gets worse.
func testLowFrequency(t *testing.T, halfLife time.Duration) {
v := Value{HalfLife: halfLife}
var now mono.Time
var rates []float64
for now < 20*min {
if now%(10*sec) == 0 {
v.addNow(now, 1) // 1 event every 10 seconds
}
now += 50 * msec
rates = append(rates, v.rateNow(now))
now += 50 * msec
}
mean, stddev := stats(rates)
c := qt.New(t)
c.Assert(mean, qt.CmpEquals(cmpopts.EquateApprox(0.001, 0)), 0.1)
t.Logf("mean:%v stddev:%v", mean, stddev)
}
func stats(fs []float64) (mean, stddev float64) {
for _, rate := range fs {
mean += rate
}
mean /= float64(len(fs))
for _, rate := range fs {
stddev += (rate - mean) * (rate - mean)
}
stddev = math.Sqrt(stddev / float64(len(fs)))
return mean, stddev
}
// BenchmarkValue benchmarks the cost of Value.Add,
// which is called often and makes extensive use of floating-point math.
func BenchmarkValue(b *testing.B) {
b.ReportAllocs()
v := Value{HalfLife: time.Second}
for i := 0; i < b.N; i++ {
v.Add(1)
}
}