mirror of
				https://github.com/tailscale/tailscale.git
				synced 2025-10-20 15:10:43 +00:00 
			
		
		
		
	 f49b9f75b8
			
		
	
	f49b9f75b8
	
	
	
		
			
			Adds NewGaugeFunc and NewCounterFunc (inspired by expvar.Func) which change the current value to be reported by a function. This allows some client metric values to be computed on-demand during uploading (at most every 15 seconds), instead of being continuously updated. clientmetric uploading had a bunch of micro-optimizations for memory access (#3331) which are not possible with this approach. However, any performance hit from function-based metrics is contained to those metrics only, and we expect to have very few. Also adds a DisableDeltas() option for client metrics, so that absolute values are always reported. This makes server-side processing of some metrics easier to reason about. Updates tailscale/corp#9230 Signed-off-by: Mihai Parparita <mihai@tailscale.com>
		
			
				
	
	
		
			386 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			386 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) Tailscale Inc & AUTHORS
 | |
| // SPDX-License-Identifier: BSD-3-Clause
 | |
| 
 | |
| // Package clientmetric provides client-side metrics whose values
 | |
| // get occasionally logged.
 | |
| package clientmetric
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"encoding/binary"
 | |
| 	"encoding/hex"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	mu          sync.Mutex // guards vars in this block
 | |
| 	metrics     = map[string]*Metric{}
 | |
| 	numWireID   int         // how many wireIDs have been allocated
 | |
| 	lastDelta   time.Time   // time of last call to EncodeLogTailMetricsDelta
 | |
| 	sortedDirty bool        // whether sorted needs to be rebuilt
 | |
| 	sorted      []*Metric   // by name
 | |
| 	lastLogVal  []scanEntry // by Metric.regIdx
 | |
| 	unsorted    []*Metric   // by Metric.regIdx
 | |
| 
 | |
| 	// valFreeList is a set of free contiguous int64s whose
 | |
| 	// element addresses get assigned to Metric.v.
 | |
| 	// Any memory address in len(valFreeList) is free for use.
 | |
| 	// They're contiguous to reduce cache churn during diff scans.
 | |
| 	// When out of length, a new backing array is made.
 | |
| 	valFreeList []int64
 | |
| )
 | |
| 
 | |
| // scanEntry contains the minimal data needed for quickly scanning
 | |
| // memory for changed values. It's small to reduce memory pressure.
 | |
| type scanEntry struct {
 | |
| 	v          *int64       // Metric.v
 | |
| 	f          func() int64 // Metric.f
 | |
| 	lastLogged int64        // last logged value
 | |
| }
 | |
| 
 | |
| // Type is a metric type: counter or gauge.
 | |
| type Type uint8
 | |
| 
 | |
| const (
 | |
| 	TypeGauge Type = iota
 | |
| 	TypeCounter
 | |
| )
 | |
| 
 | |
| // Metric is an integer metric value that's tracked over time.
 | |
| //
 | |
| // It's safe for concurrent use.
 | |
| type Metric struct {
 | |
| 	v              *int64       // atomic; the metric value
 | |
| 	f              func() int64 // value function (v is ignored if f is non-nil)
 | |
| 	regIdx         int          // index into lastLogVal and unsorted
 | |
| 	name           string
 | |
| 	typ            Type
 | |
| 	deltasDisabled bool
 | |
| 
 | |
| 	// The following fields are owned by the package-level 'mu':
 | |
| 
 | |
| 	// wireID is the lazily-allocated "wire ID". Until a metric is encoded
 | |
| 	// in the logs (by EncodeLogTailMetricsDelta), it has no wireID. This
 | |
| 	// ensures that unused metrics don't waste valuable low numbers, which
 | |
| 	// encode with varints with fewer bytes.
 | |
| 	wireID int
 | |
| 
 | |
| 	// lastNamed is the last time the name of this metric was
 | |
| 	// written on the wire.
 | |
| 	lastNamed time.Time
 | |
| }
 | |
| 
 | |
| func (m *Metric) Name() string { return m.name }
 | |
| 
 | |
| func (m *Metric) Value() int64 {
 | |
| 	if m.f != nil {
 | |
| 		return m.f()
 | |
| 	}
 | |
| 	return atomic.LoadInt64(m.v)
 | |
| }
 | |
| 
 | |
| func (m *Metric) Type() Type { return m.typ }
 | |
| 
 | |
| // DisableDeltas disables uploading of deltas for this metric (absolute values
 | |
| // are always uploaded).
 | |
| func (m *Metric) DisableDeltas() {
 | |
| 	m.deltasDisabled = true
 | |
| }
 | |
| 
 | |
| // Add increments m's value by n.
 | |
| //
 | |
| // If m is of type counter, n should not be negative.
 | |
| func (m *Metric) Add(n int64) {
 | |
| 	if m.f != nil {
 | |
| 		panic("Add() called on metric with value function")
 | |
| 	}
 | |
| 	atomic.AddInt64(m.v, n)
 | |
| }
 | |
| 
 | |
| // Set sets m's value to v.
 | |
| //
 | |
| // If m is of type counter, Set should not be used.
 | |
| func (m *Metric) Set(v int64) {
 | |
| 	if m.f != nil {
 | |
| 		panic("Set() called on metric with value function")
 | |
| 	}
 | |
| 	atomic.StoreInt64(m.v, v)
 | |
| }
 | |
| 
 | |
| // Publish registers a metric in the global map.
 | |
| // It panics if the name is a duplicate anywhere in the process.
 | |
| func (m *Metric) Publish() {
 | |
| 	mu.Lock()
 | |
| 	defer mu.Unlock()
 | |
| 	if m.name == "" {
 | |
| 		panic("unnamed Metric")
 | |
| 	}
 | |
| 	if _, dup := metrics[m.name]; dup {
 | |
| 		panic("duplicate metric " + m.name)
 | |
| 	}
 | |
| 	metrics[m.name] = m
 | |
| 	sortedDirty = true
 | |
| 
 | |
| 	if m.f != nil {
 | |
| 		lastLogVal = append(lastLogVal, scanEntry{f: m.f})
 | |
| 	} else {
 | |
| 		if len(valFreeList) == 0 {
 | |
| 			valFreeList = make([]int64, 256)
 | |
| 		}
 | |
| 		m.v = &valFreeList[0]
 | |
| 		valFreeList = valFreeList[1:]
 | |
| 		lastLogVal = append(lastLogVal, scanEntry{v: m.v})
 | |
| 	}
 | |
| 
 | |
| 	m.regIdx = len(unsorted)
 | |
| 	unsorted = append(unsorted, m)
 | |
| }
 | |
| 
 | |
| // Metrics returns the sorted list of metrics.
 | |
| //
 | |
| // The returned slice should not be mutated.
 | |
| func Metrics() []*Metric {
 | |
| 	mu.Lock()
 | |
| 	defer mu.Unlock()
 | |
| 	if sortedDirty {
 | |
| 		sortedDirty = false
 | |
| 		sorted = make([]*Metric, 0, len(metrics))
 | |
| 		for _, m := range metrics {
 | |
| 			sorted = append(sorted, m)
 | |
| 		}
 | |
| 		sort.Slice(sorted, func(i, j int) bool {
 | |
| 			return sorted[i].name < sorted[j].name
 | |
| 		})
 | |
| 	}
 | |
| 	return sorted
 | |
| }
 | |
| 
 | |
| // HasPublished reports whether a metric with the given name has already been
 | |
| // published.
 | |
| func HasPublished(name string) bool {
 | |
| 	mu.Lock()
 | |
| 	defer mu.Unlock()
 | |
| 	_, ok := metrics[name]
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| // NewUnpublished initializes a new Metric without calling Publish on
 | |
| // it.
 | |
| func NewUnpublished(name string, typ Type) *Metric {
 | |
| 	if i := strings.IndexFunc(name, isIllegalMetricRune); name == "" || i != -1 {
 | |
| 		panic(fmt.Sprintf("illegal metric name %q (index %v)", name, i))
 | |
| 	}
 | |
| 	return &Metric{
 | |
| 		name: name,
 | |
| 		typ:  typ,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func isIllegalMetricRune(r rune) bool {
 | |
| 	return !(r >= 'a' && r <= 'z' ||
 | |
| 		r >= 'A' && r <= 'Z' ||
 | |
| 		r >= '0' && r <= '9' ||
 | |
| 		r == '_')
 | |
| }
 | |
| 
 | |
| // NewCounter returns a new metric that can only increment.
 | |
| func NewCounter(name string) *Metric {
 | |
| 	m := NewUnpublished(name, TypeCounter)
 | |
| 	m.Publish()
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // NewGauge returns a new metric that can both increment and decrement.
 | |
| func NewGauge(name string) *Metric {
 | |
| 	m := NewUnpublished(name, TypeGauge)
 | |
| 	m.Publish()
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // NewCounterFunc returns a counter metric that has its value determined by
 | |
| // calling the provided function (calling Add() and Set() will panic). No
 | |
| // locking guarantees are made for the invocation.
 | |
| func NewCounterFunc(name string, f func() int64) *Metric {
 | |
| 	m := NewUnpublished(name, TypeCounter)
 | |
| 	m.f = f
 | |
| 	m.Publish()
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // NewGaugeFunc returns a gauge metric that has its value determined by
 | |
| // calling the provided function (calling Add() and Set() will panic). No
 | |
| // locking guarantees are made for the invocation.
 | |
| func NewGaugeFunc(name string, f func() int64) *Metric {
 | |
| 	m := NewUnpublished(name, TypeGauge)
 | |
| 	m.f = f
 | |
| 	m.Publish()
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // WritePrometheusExpositionFormat writes all client metrics to w in
 | |
| // the Prometheus text-based exposition format.
 | |
| //
 | |
| // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md
 | |
| func WritePrometheusExpositionFormat(w io.Writer) {
 | |
| 	for _, m := range Metrics() {
 | |
| 		switch m.Type() {
 | |
| 		case TypeGauge:
 | |
| 			fmt.Fprintf(w, "# TYPE %s gauge\n", m.Name())
 | |
| 		case TypeCounter:
 | |
| 			fmt.Fprintf(w, "# TYPE %s counter\n", m.Name())
 | |
| 		}
 | |
| 		fmt.Fprintf(w, "%s %v\n", m.Name(), m.Value())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	// metricLogNameFrequency is how often a metric's name=>id
 | |
| 	// mapping is redundantly put in the logs. In other words,
 | |
| 	// this is how far in the logs you need to fetch from a
 | |
| 	// given point in time to recompute the metrics at that point
 | |
| 	// in time.
 | |
| 	metricLogNameFrequency = 4 * time.Hour
 | |
| 
 | |
| 	// minMetricEncodeInterval is the minimum interval that the
 | |
| 	// metrics will be scanned for changes before being encoded
 | |
| 	// for logtail.
 | |
| 	minMetricEncodeInterval = 15 * time.Second
 | |
| )
 | |
| 
 | |
| // EncodeLogTailMetricsDelta return an encoded string representing the metrics
 | |
| // differences since the previous call.
 | |
| //
 | |
| // It implements the requirements of a logtail.Config.MetricsDelta
 | |
| // func. Notably, its output is safe to embed in a JSON string literal
 | |
| // without further escaping.
 | |
| //
 | |
| // The current encoding is:
 | |
| //   - name immediately following metric:
 | |
| //     'N' + hex(varint(len(name))) + name
 | |
| //   - set value of a metric:
 | |
| //     'S' + hex(varint(wireid)) + hex(varint(value))
 | |
| //   - increment a metric: (decrements if negative)
 | |
| //     'I' + hex(varint(wireid)) + hex(varint(value))
 | |
| func EncodeLogTailMetricsDelta() string {
 | |
| 	mu.Lock()
 | |
| 	defer mu.Unlock()
 | |
| 
 | |
| 	now := time.Now()
 | |
| 	if !lastDelta.IsZero() && now.Sub(lastDelta) < minMetricEncodeInterval {
 | |
| 		return ""
 | |
| 	}
 | |
| 	lastDelta = now
 | |
| 
 | |
| 	var enc *deltaEncBuf // lazy
 | |
| 	for i, ent := range lastLogVal {
 | |
| 		var val int64
 | |
| 		if ent.f != nil {
 | |
| 			val = ent.f()
 | |
| 		} else {
 | |
| 			val = atomic.LoadInt64(ent.v)
 | |
| 		}
 | |
| 		delta := val - ent.lastLogged
 | |
| 		if delta == 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 		lastLogVal[i].lastLogged = val
 | |
| 		m := unsorted[i]
 | |
| 		if enc == nil {
 | |
| 			enc = deltaPool.Get().(*deltaEncBuf)
 | |
| 			enc.buf.Reset()
 | |
| 		}
 | |
| 		if m.wireID == 0 {
 | |
| 			numWireID++
 | |
| 			m.wireID = numWireID
 | |
| 		}
 | |
| 
 | |
| 		writeValue := m.deltasDisabled
 | |
| 		if m.lastNamed.IsZero() || now.Sub(m.lastNamed) > metricLogNameFrequency {
 | |
| 			enc.writeName(m.Name(), m.Type())
 | |
| 			m.lastNamed = now
 | |
| 			writeValue = true
 | |
| 		}
 | |
| 		if writeValue {
 | |
| 			enc.writeValue(m.wireID, val)
 | |
| 		} else {
 | |
| 			enc.writeDelta(m.wireID, delta)
 | |
| 		}
 | |
| 	}
 | |
| 	if enc == nil {
 | |
| 		return ""
 | |
| 	}
 | |
| 	defer deltaPool.Put(enc)
 | |
| 	return enc.buf.String()
 | |
| }
 | |
| 
 | |
| var deltaPool = &sync.Pool{
 | |
| 	New: func() any {
 | |
| 		return new(deltaEncBuf)
 | |
| 	},
 | |
| }
 | |
| 
 | |
| // deltaEncBuf encodes metrics per the format described
 | |
| // on EncodeLogTailMetricsDelta above.
 | |
| type deltaEncBuf struct {
 | |
| 	buf     bytes.Buffer
 | |
| 	scratch [binary.MaxVarintLen64]byte
 | |
| }
 | |
| 
 | |
| // writeName writes a "name" (N) record to the buffer, which notes
 | |
| // that the immediately following record's wireID has the provided
 | |
| // name.
 | |
| func (b *deltaEncBuf) writeName(name string, typ Type) {
 | |
| 	var namePrefix string
 | |
| 	if typ == TypeGauge {
 | |
| 		// Add the gauge_ prefix so that tsweb knows that this is a gauge metric
 | |
| 		// when generating the Prometheus version.
 | |
| 		namePrefix = "gauge_"
 | |
| 	}
 | |
| 	b.buf.WriteByte('N')
 | |
| 	b.writeHexVarint(int64(len(namePrefix) + len(name)))
 | |
| 	b.buf.WriteString(namePrefix)
 | |
| 	b.buf.WriteString(name)
 | |
| }
 | |
| 
 | |
| // writeDelta writes a "set" (S) record to the buffer, noting that the
 | |
| // metric with the given wireID now has value v.
 | |
| func (b *deltaEncBuf) writeValue(wireID int, v int64) {
 | |
| 	b.buf.WriteByte('S')
 | |
| 	b.writeHexVarint(int64(wireID))
 | |
| 	b.writeHexVarint(v)
 | |
| }
 | |
| 
 | |
| // writeDelta writes an "increment" (I) delta value record to the
 | |
| // buffer, noting that the metric with the given wireID now has a
 | |
| // value that's v larger (or smaller if v is negative).
 | |
| func (b *deltaEncBuf) writeDelta(wireID int, v int64) {
 | |
| 	b.buf.WriteByte('I')
 | |
| 	b.writeHexVarint(int64(wireID))
 | |
| 	b.writeHexVarint(v)
 | |
| }
 | |
| 
 | |
| // writeHexVarint writes v to the buffer as a hex-encoded varint.
 | |
| func (b *deltaEncBuf) writeHexVarint(v int64) {
 | |
| 	n := binary.PutVarint(b.scratch[:], v)
 | |
| 	hexLen := n * 2
 | |
| 	oldLen := b.buf.Len()
 | |
| 	b.buf.Grow(hexLen)
 | |
| 	hexBuf := b.buf.Bytes()[oldLen : oldLen+hexLen]
 | |
| 	hex.Encode(hexBuf, b.scratch[:n])
 | |
| 	b.buf.Write(hexBuf)
 | |
| }
 | |
| 
 | |
| var TestHooks testHooks
 | |
| 
 | |
| type testHooks struct{}
 | |
| 
 | |
| func (testHooks) ResetLastDelta() {
 | |
| 	lastDelta = time.Time{}
 | |
| }
 |