mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-27 12:05:40 +00:00
f49b9f75b8
Adds NewGaugeFunc and NewCounterFunc (inspired by expvar.Func) which change the current value to be reported by a function. This allows some client metric values to be computed on-demand during uploading (at most every 15 seconds), instead of being continuously updated. clientmetric uploading had a bunch of micro-optimizations for memory access (#3331) which are not possible with this approach. However, any performance hit from function-based metrics is contained to those metrics only, and we expect to have very few. Also adds a DisableDeltas() option for client metrics, so that absolute values are always reported. This makes server-side processing of some metrics easier to reason about. Updates tailscale/corp#9230 Signed-off-by: Mihai Parparita <mihai@tailscale.com>
386 lines
10 KiB
Go
386 lines
10 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
// Package clientmetric provides client-side metrics whose values
|
|
// get occasionally logged.
|
|
package clientmetric
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
mu sync.Mutex // guards vars in this block
|
|
metrics = map[string]*Metric{}
|
|
numWireID int // how many wireIDs have been allocated
|
|
lastDelta time.Time // time of last call to EncodeLogTailMetricsDelta
|
|
sortedDirty bool // whether sorted needs to be rebuilt
|
|
sorted []*Metric // by name
|
|
lastLogVal []scanEntry // by Metric.regIdx
|
|
unsorted []*Metric // by Metric.regIdx
|
|
|
|
// valFreeList is a set of free contiguous int64s whose
|
|
// element addresses get assigned to Metric.v.
|
|
// Any memory address in len(valFreeList) is free for use.
|
|
// They're contiguous to reduce cache churn during diff scans.
|
|
// When out of length, a new backing array is made.
|
|
valFreeList []int64
|
|
)
|
|
|
|
// scanEntry contains the minimal data needed for quickly scanning
|
|
// memory for changed values. It's small to reduce memory pressure.
|
|
type scanEntry struct {
|
|
v *int64 // Metric.v
|
|
f func() int64 // Metric.f
|
|
lastLogged int64 // last logged value
|
|
}
|
|
|
|
// Type is a metric type: counter or gauge.
|
|
type Type uint8
|
|
|
|
const (
|
|
TypeGauge Type = iota
|
|
TypeCounter
|
|
)
|
|
|
|
// Metric is an integer metric value that's tracked over time.
|
|
//
|
|
// It's safe for concurrent use.
|
|
type Metric struct {
|
|
v *int64 // atomic; the metric value
|
|
f func() int64 // value function (v is ignored if f is non-nil)
|
|
regIdx int // index into lastLogVal and unsorted
|
|
name string
|
|
typ Type
|
|
deltasDisabled bool
|
|
|
|
// The following fields are owned by the package-level 'mu':
|
|
|
|
// wireID is the lazily-allocated "wire ID". Until a metric is encoded
|
|
// in the logs (by EncodeLogTailMetricsDelta), it has no wireID. This
|
|
// ensures that unused metrics don't waste valuable low numbers, which
|
|
// encode with varints with fewer bytes.
|
|
wireID int
|
|
|
|
// lastNamed is the last time the name of this metric was
|
|
// written on the wire.
|
|
lastNamed time.Time
|
|
}
|
|
|
|
func (m *Metric) Name() string { return m.name }
|
|
|
|
func (m *Metric) Value() int64 {
|
|
if m.f != nil {
|
|
return m.f()
|
|
}
|
|
return atomic.LoadInt64(m.v)
|
|
}
|
|
|
|
func (m *Metric) Type() Type { return m.typ }
|
|
|
|
// DisableDeltas disables uploading of deltas for this metric (absolute values
|
|
// are always uploaded).
|
|
func (m *Metric) DisableDeltas() {
|
|
m.deltasDisabled = true
|
|
}
|
|
|
|
// Add increments m's value by n.
|
|
//
|
|
// If m is of type counter, n should not be negative.
|
|
func (m *Metric) Add(n int64) {
|
|
if m.f != nil {
|
|
panic("Add() called on metric with value function")
|
|
}
|
|
atomic.AddInt64(m.v, n)
|
|
}
|
|
|
|
// Set sets m's value to v.
|
|
//
|
|
// If m is of type counter, Set should not be used.
|
|
func (m *Metric) Set(v int64) {
|
|
if m.f != nil {
|
|
panic("Set() called on metric with value function")
|
|
}
|
|
atomic.StoreInt64(m.v, v)
|
|
}
|
|
|
|
// Publish registers a metric in the global map.
|
|
// It panics if the name is a duplicate anywhere in the process.
|
|
func (m *Metric) Publish() {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if m.name == "" {
|
|
panic("unnamed Metric")
|
|
}
|
|
if _, dup := metrics[m.name]; dup {
|
|
panic("duplicate metric " + m.name)
|
|
}
|
|
metrics[m.name] = m
|
|
sortedDirty = true
|
|
|
|
if m.f != nil {
|
|
lastLogVal = append(lastLogVal, scanEntry{f: m.f})
|
|
} else {
|
|
if len(valFreeList) == 0 {
|
|
valFreeList = make([]int64, 256)
|
|
}
|
|
m.v = &valFreeList[0]
|
|
valFreeList = valFreeList[1:]
|
|
lastLogVal = append(lastLogVal, scanEntry{v: m.v})
|
|
}
|
|
|
|
m.regIdx = len(unsorted)
|
|
unsorted = append(unsorted, m)
|
|
}
|
|
|
|
// Metrics returns the sorted list of metrics.
|
|
//
|
|
// The returned slice should not be mutated.
|
|
func Metrics() []*Metric {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if sortedDirty {
|
|
sortedDirty = false
|
|
sorted = make([]*Metric, 0, len(metrics))
|
|
for _, m := range metrics {
|
|
sorted = append(sorted, m)
|
|
}
|
|
sort.Slice(sorted, func(i, j int) bool {
|
|
return sorted[i].name < sorted[j].name
|
|
})
|
|
}
|
|
return sorted
|
|
}
|
|
|
|
// HasPublished reports whether a metric with the given name has already been
|
|
// published.
|
|
func HasPublished(name string) bool {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
_, ok := metrics[name]
|
|
return ok
|
|
}
|
|
|
|
// NewUnpublished initializes a new Metric without calling Publish on
|
|
// it.
|
|
func NewUnpublished(name string, typ Type) *Metric {
|
|
if i := strings.IndexFunc(name, isIllegalMetricRune); name == "" || i != -1 {
|
|
panic(fmt.Sprintf("illegal metric name %q (index %v)", name, i))
|
|
}
|
|
return &Metric{
|
|
name: name,
|
|
typ: typ,
|
|
}
|
|
}
|
|
|
|
func isIllegalMetricRune(r rune) bool {
|
|
return !(r >= 'a' && r <= 'z' ||
|
|
r >= 'A' && r <= 'Z' ||
|
|
r >= '0' && r <= '9' ||
|
|
r == '_')
|
|
}
|
|
|
|
// NewCounter returns a new metric that can only increment.
|
|
func NewCounter(name string) *Metric {
|
|
m := NewUnpublished(name, TypeCounter)
|
|
m.Publish()
|
|
return m
|
|
}
|
|
|
|
// NewGauge returns a new metric that can both increment and decrement.
|
|
func NewGauge(name string) *Metric {
|
|
m := NewUnpublished(name, TypeGauge)
|
|
m.Publish()
|
|
return m
|
|
}
|
|
|
|
// NewCounterFunc returns a counter metric that has its value determined by
|
|
// calling the provided function (calling Add() and Set() will panic). No
|
|
// locking guarantees are made for the invocation.
|
|
func NewCounterFunc(name string, f func() int64) *Metric {
|
|
m := NewUnpublished(name, TypeCounter)
|
|
m.f = f
|
|
m.Publish()
|
|
return m
|
|
}
|
|
|
|
// NewGaugeFunc returns a gauge metric that has its value determined by
|
|
// calling the provided function (calling Add() and Set() will panic). No
|
|
// locking guarantees are made for the invocation.
|
|
func NewGaugeFunc(name string, f func() int64) *Metric {
|
|
m := NewUnpublished(name, TypeGauge)
|
|
m.f = f
|
|
m.Publish()
|
|
return m
|
|
}
|
|
|
|
// WritePrometheusExpositionFormat writes all client metrics to w in
|
|
// the Prometheus text-based exposition format.
|
|
//
|
|
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md
|
|
func WritePrometheusExpositionFormat(w io.Writer) {
|
|
for _, m := range Metrics() {
|
|
switch m.Type() {
|
|
case TypeGauge:
|
|
fmt.Fprintf(w, "# TYPE %s gauge\n", m.Name())
|
|
case TypeCounter:
|
|
fmt.Fprintf(w, "# TYPE %s counter\n", m.Name())
|
|
}
|
|
fmt.Fprintf(w, "%s %v\n", m.Name(), m.Value())
|
|
}
|
|
}
|
|
|
|
const (
|
|
// metricLogNameFrequency is how often a metric's name=>id
|
|
// mapping is redundantly put in the logs. In other words,
|
|
// this is how far in the logs you need to fetch from a
|
|
// given point in time to recompute the metrics at that point
|
|
// in time.
|
|
metricLogNameFrequency = 4 * time.Hour
|
|
|
|
// minMetricEncodeInterval is the minimum interval that the
|
|
// metrics will be scanned for changes before being encoded
|
|
// for logtail.
|
|
minMetricEncodeInterval = 15 * time.Second
|
|
)
|
|
|
|
// EncodeLogTailMetricsDelta return an encoded string representing the metrics
|
|
// differences since the previous call.
|
|
//
|
|
// It implements the requirements of a logtail.Config.MetricsDelta
|
|
// func. Notably, its output is safe to embed in a JSON string literal
|
|
// without further escaping.
|
|
//
|
|
// The current encoding is:
|
|
// - name immediately following metric:
|
|
// 'N' + hex(varint(len(name))) + name
|
|
// - set value of a metric:
|
|
// 'S' + hex(varint(wireid)) + hex(varint(value))
|
|
// - increment a metric: (decrements if negative)
|
|
// 'I' + hex(varint(wireid)) + hex(varint(value))
|
|
func EncodeLogTailMetricsDelta() string {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
now := time.Now()
|
|
if !lastDelta.IsZero() && now.Sub(lastDelta) < minMetricEncodeInterval {
|
|
return ""
|
|
}
|
|
lastDelta = now
|
|
|
|
var enc *deltaEncBuf // lazy
|
|
for i, ent := range lastLogVal {
|
|
var val int64
|
|
if ent.f != nil {
|
|
val = ent.f()
|
|
} else {
|
|
val = atomic.LoadInt64(ent.v)
|
|
}
|
|
delta := val - ent.lastLogged
|
|
if delta == 0 {
|
|
continue
|
|
}
|
|
lastLogVal[i].lastLogged = val
|
|
m := unsorted[i]
|
|
if enc == nil {
|
|
enc = deltaPool.Get().(*deltaEncBuf)
|
|
enc.buf.Reset()
|
|
}
|
|
if m.wireID == 0 {
|
|
numWireID++
|
|
m.wireID = numWireID
|
|
}
|
|
|
|
writeValue := m.deltasDisabled
|
|
if m.lastNamed.IsZero() || now.Sub(m.lastNamed) > metricLogNameFrequency {
|
|
enc.writeName(m.Name(), m.Type())
|
|
m.lastNamed = now
|
|
writeValue = true
|
|
}
|
|
if writeValue {
|
|
enc.writeValue(m.wireID, val)
|
|
} else {
|
|
enc.writeDelta(m.wireID, delta)
|
|
}
|
|
}
|
|
if enc == nil {
|
|
return ""
|
|
}
|
|
defer deltaPool.Put(enc)
|
|
return enc.buf.String()
|
|
}
|
|
|
|
var deltaPool = &sync.Pool{
|
|
New: func() any {
|
|
return new(deltaEncBuf)
|
|
},
|
|
}
|
|
|
|
// deltaEncBuf encodes metrics per the format described
|
|
// on EncodeLogTailMetricsDelta above.
|
|
type deltaEncBuf struct {
|
|
buf bytes.Buffer
|
|
scratch [binary.MaxVarintLen64]byte
|
|
}
|
|
|
|
// writeName writes a "name" (N) record to the buffer, which notes
|
|
// that the immediately following record's wireID has the provided
|
|
// name.
|
|
func (b *deltaEncBuf) writeName(name string, typ Type) {
|
|
var namePrefix string
|
|
if typ == TypeGauge {
|
|
// Add the gauge_ prefix so that tsweb knows that this is a gauge metric
|
|
// when generating the Prometheus version.
|
|
namePrefix = "gauge_"
|
|
}
|
|
b.buf.WriteByte('N')
|
|
b.writeHexVarint(int64(len(namePrefix) + len(name)))
|
|
b.buf.WriteString(namePrefix)
|
|
b.buf.WriteString(name)
|
|
}
|
|
|
|
// writeDelta writes a "set" (S) record to the buffer, noting that the
|
|
// metric with the given wireID now has value v.
|
|
func (b *deltaEncBuf) writeValue(wireID int, v int64) {
|
|
b.buf.WriteByte('S')
|
|
b.writeHexVarint(int64(wireID))
|
|
b.writeHexVarint(v)
|
|
}
|
|
|
|
// writeDelta writes an "increment" (I) delta value record to the
|
|
// buffer, noting that the metric with the given wireID now has a
|
|
// value that's v larger (or smaller if v is negative).
|
|
func (b *deltaEncBuf) writeDelta(wireID int, v int64) {
|
|
b.buf.WriteByte('I')
|
|
b.writeHexVarint(int64(wireID))
|
|
b.writeHexVarint(v)
|
|
}
|
|
|
|
// writeHexVarint writes v to the buffer as a hex-encoded varint.
|
|
func (b *deltaEncBuf) writeHexVarint(v int64) {
|
|
n := binary.PutVarint(b.scratch[:], v)
|
|
hexLen := n * 2
|
|
oldLen := b.buf.Len()
|
|
b.buf.Grow(hexLen)
|
|
hexBuf := b.buf.Bytes()[oldLen : oldLen+hexLen]
|
|
hex.Encode(hexBuf, b.scratch[:n])
|
|
b.buf.Write(hexBuf)
|
|
}
|
|
|
|
var TestHooks testHooks
|
|
|
|
type testHooks struct{}
|
|
|
|
func (testHooks) ResetLastDelta() {
|
|
lastDelta = time.Time{}
|
|
}
|