From 61ddecee313ea0b9526c63bc886226eb50afab7a Mon Sep 17 00:00:00 2001 From: Zach Hirschtritt Date: Thu, 27 Mar 2025 02:40:27 -0400 Subject: [PATCH] fix: add prometheus metrics on projection handlers (#9561) # Which Problems Are Solved With current provided telemetry it's difficult to predict when a projection handler is under increased load until it's too late and causes downstream issues. Importantly, projection updating is in the critical path for many login flows and increased latency there can result in system downtime for users. # How the Problems Are Solved This PR adds three new prometheus-style metrics: 1. **projection_events_processed** (_labels: projection, success_) - This metric gives us a counter of the number of events processed per projection update run and whether they we're processed without error. A high number of events being processed can let us know how busy a particular projection handler is. 2. **projection_handle_timer** _(labels: projection)_ - This is the time it takes to process a projection update given a batch of events - time to take the current_states lock, query for new events, reduce, update_the projection, and update current_states. 3. **projection_state_latency** _(labels: projection)_ - This is the time from the last event processed in the current_states table for a given projection. It tells us how old was the last event you processed? Or, how far behind are you running for this projection? Higher latencies could mean high load or stalled projection handling. # Additional Changes I also had to initialize the global otel metrics provider (`metrics.M`) in the `setup` step additionally to `start` since projection handlers are initialized at setup. The initialization checks if a metrics provider is already set (in case of `start-from-setup` or `start-from-init` to prevent overwriting, which causes the otel metrics provider to stop working. # Additional Context ## Example Dashboards ![image](https://github.com/user-attachments/assets/94ba5c2b-9c62-44cd-83ee-4db4a8859073) ![image](https://github.com/user-attachments/assets/60a1b406-a8c6-48dc-a925-575359f97e1e) --------- Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com> Co-authored-by: Livio Spring (cherry picked from commit c1535b7b4916abd415a29e034d266c4ea1dc3645) --- cmd/setup/config.go | 5 + internal/eventstore/handler/v2/handler.go | 17 ++- internal/eventstore/handler/v2/metrics.go | 70 ++++++++++ .../eventstore/handler/v2/metrics_test.go | 132 ++++++++++++++++++ internal/telemetry/metrics/config/config.go | 13 +- internal/telemetry/metrics/metrics.go | 16 +++ internal/telemetry/metrics/mock.go | 95 +++++++++++++ internal/telemetry/metrics/noop.go | 45 ++++++ .../telemetry/metrics/otel/open_telemetry.go | 42 +++++- 9 files changed, 428 insertions(+), 7 deletions(-) create mode 100644 internal/eventstore/handler/v2/metrics.go create mode 100644 internal/eventstore/handler/v2/metrics_test.go create mode 100644 internal/telemetry/metrics/mock.go create mode 100644 internal/telemetry/metrics/noop.go diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 2b74fb2dbc..2a94b7919e 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -26,6 +26,7 @@ import ( "github.com/zitadel/zitadel/internal/notification/handlers" "github.com/zitadel/zitadel/internal/query/projection" static_config "github.com/zitadel/zitadel/internal/static/config" + metrics "github.com/zitadel/zitadel/internal/telemetry/metrics/config" ) type Config struct { @@ -38,6 +39,7 @@ type Config struct { ExternalPort uint16 ExternalSecure bool Log *logging.Config + Metrics metrics.Config EncryptionKeys *encryption.EncryptionKeyConfig DefaultInstance command.InstanceSetup Machine *id.Config @@ -85,6 +87,9 @@ func MustNewConfig(v *viper.Viper) *Config { err = config.Log.SetLogger() logging.OnError(err).Fatal("unable to set logger") + err = config.Metrics.NewMeter() + logging.OnError(err).Fatal("unable to set meter") + id.Configure(config.Machine) // Copy the global role permissions mappings to the instance until we allow instance-level configuration over the API. diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 052f965e22..5c1eeef95d 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -67,6 +67,8 @@ type Handler struct { cacheInvalidations []func(ctx context.Context, aggregates []*eventstore.Aggregate) queryInstances func() ([]string, error) + + metrics *ProjectionMetrics } var _ migration.Migration = (*Handler)(nil) @@ -159,6 +161,8 @@ func NewHandler( aggregates[reducer.Aggregate] = eventTypes } + metrics := NewProjectionMetrics() + handler := &Handler{ projection: projection, client: config.Client, @@ -178,6 +182,7 @@ func NewHandler( } return nil, nil }, + metrics: metrics, } return handler @@ -483,6 +488,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add defer cancel() } + start := time.Now() + tx, err := h.client.BeginTx(txCtx, nil) if err != nil { return false, err @@ -502,7 +509,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add } return additionalIteration, err } - // stop execution if currentState.eventTimestamp >= config.maxCreatedAt + // stop execution if currentState.position >= config.maxPosition if config.maxPosition != 0 && currentState.position >= config.maxPosition { return false, nil } @@ -518,7 +525,14 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add if err == nil { err = commitErr } + + h.metrics.ProjectionEventsProcessed(ctx, h.ProjectionName(), int64(len(statements)), err == nil) + if err == nil && currentState.aggregateID != "" && len(statements) > 0 { + // Don't update projection timing or latency unless we successfully processed events + h.metrics.ProjectionUpdateTiming(ctx, h.ProjectionName(), float64(time.Since(start).Seconds())) + h.metrics.ProjectionStateLatency(ctx, h.ProjectionName(), time.Since(currentState.eventTimestamp).Seconds()) + h.invalidateCaches(ctx, aggregatesFromStatements(statements)) } }() @@ -540,6 +554,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add currentState.aggregateType = statements[lastProcessedIndex].Aggregate.Type currentState.sequence = statements[lastProcessedIndex].Sequence currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate + err = h.setState(tx, currentState) return additionalIteration, err diff --git a/internal/eventstore/handler/v2/metrics.go b/internal/eventstore/handler/v2/metrics.go new file mode 100644 index 0000000000..6876bb3aa4 --- /dev/null +++ b/internal/eventstore/handler/v2/metrics.go @@ -0,0 +1,70 @@ +package handler + +import ( + "context" + + "github.com/zitadel/logging" + "go.opentelemetry.io/otel/attribute" + + "github.com/zitadel/zitadel/internal/telemetry/metrics" +) + +const ( + ProjectionLabel = "projection" + SuccessLabel = "success" + + ProjectionEventsProcessed = "projection_events_processed" + ProjectionHandleTimerMetric = "projection_handle_timer" + ProjectionStateLatencyMetric = "projection_state_latency" +) + +type ProjectionMetrics struct { + provider metrics.Metrics +} + +func NewProjectionMetrics() *ProjectionMetrics { + projectionMetrics := &ProjectionMetrics{provider: metrics.M} + + err := projectionMetrics.provider.RegisterCounter( + ProjectionEventsProcessed, + "Number of events reduced to process projection updates", + ) + logging.OnError(err).Error("failed to register projection events processed counter") + err = projectionMetrics.provider.RegisterHistogram( + ProjectionHandleTimerMetric, + "Time taken to process a projection update", + "s", + []float64{0.005, 0.01, 0.05, 0.1, 1, 5, 10, 30, 60, 120}, + ) + logging.OnError(err).Error("failed to register projection handle timer metric") + err = projectionMetrics.provider.RegisterHistogram( + ProjectionStateLatencyMetric, + "When finishing processing a batch of events, this track the age of the last events seen from current time", + "s", + []float64{0.1, 0.5, 1, 5, 10, 30, 60, 300, 600, 1800}, + ) + logging.OnError(err).Error("failed to register projection state latency metric") + return projectionMetrics +} + +func (m *ProjectionMetrics) ProjectionUpdateTiming(ctx context.Context, projection string, duration float64) { + err := m.provider.AddHistogramMeasurement(ctx, ProjectionHandleTimerMetric, duration, map[string]attribute.Value{ + ProjectionLabel: attribute.StringValue(projection), + }) + logging.OnError(err).Error("failed to add projection trigger timing") +} + +func (m *ProjectionMetrics) ProjectionEventsProcessed(ctx context.Context, projection string, count int64, success bool) { + err := m.provider.AddCount(ctx, ProjectionEventsProcessed, count, map[string]attribute.Value{ + ProjectionLabel: attribute.StringValue(projection), + SuccessLabel: attribute.BoolValue(success), + }) + logging.OnError(err).Error("failed to add projection events processed metric") +} + +func (m *ProjectionMetrics) ProjectionStateLatency(ctx context.Context, projection string, latency float64) { + err := m.provider.AddHistogramMeasurement(ctx, ProjectionStateLatencyMetric, latency, map[string]attribute.Value{ + ProjectionLabel: attribute.StringValue(projection), + }) + logging.OnError(err).Error("failed to add projection state latency metric") +} diff --git a/internal/eventstore/handler/v2/metrics_test.go b/internal/eventstore/handler/v2/metrics_test.go new file mode 100644 index 0000000000..54e7623462 --- /dev/null +++ b/internal/eventstore/handler/v2/metrics_test.go @@ -0,0 +1,132 @@ +package handler + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zitadel/zitadel/internal/telemetry/metrics" +) + +func TestNewProjectionMetrics(t *testing.T) { + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + + metrics := NewProjectionMetrics() + require.NotNil(t, metrics) + assert.NotNil(t, metrics.provider) +} + +func TestProjectionMetrics_ProjectionUpdateTiming(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + duration := 0.5 + + projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration) + + values := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric) + require.Len(t, values, 1) + assert.Equal(t, duration, values[0]) + + labels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric) + require.Len(t, labels, 1) + assert.Equal(t, projection, labels[0][ProjectionLabel].AsString()) +} + +func TestProjectionMetrics_ProjectionEventsProcessed(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + count := int64(5) + success := true + + projectionMetrics.ProjectionEventsProcessed(ctx, projection, count, success) + + value := mockMetrics.GetCounterValue(ProjectionEventsProcessed) + assert.Equal(t, count, value) + + labels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed) + require.Len(t, labels, 1) + assert.Equal(t, projection, labels[0][ProjectionLabel].AsString()) + assert.Equal(t, success, labels[0][SuccessLabel].AsBool()) +} + +func TestProjectionMetrics_ProjectionStateLatency(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + latency := 10.0 + + projectionMetrics.ProjectionStateLatency(ctx, projection, latency) + + values := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric) + require.Len(t, values, 1) + assert.Equal(t, latency, values[0]) + + labels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric) + require.Len(t, labels, 1) + assert.Equal(t, projection, labels[0][ProjectionLabel].AsString()) +} + +func TestProjectionMetrics_Integration(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + + start := time.Now() + + projectionMetrics.ProjectionEventsProcessed(ctx, projection, 3, true) + projectionMetrics.ProjectionEventsProcessed(ctx, projection, 1, false) + + duration := time.Since(start).Seconds() + projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration) + + latency := 5.0 + projectionMetrics.ProjectionStateLatency(ctx, projection, latency) + + value := mockMetrics.GetCounterValue(ProjectionEventsProcessed) + assert.Equal(t, int64(4), value) + + timingValues := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric) + require.Len(t, timingValues, 1) + assert.Equal(t, duration, timingValues[0]) + + latencyValues := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric) + require.Len(t, latencyValues, 1) + assert.Equal(t, latency, latencyValues[0]) + + eventsLabels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed) + require.Len(t, eventsLabels, 2) + assert.Equal(t, projection, eventsLabels[0][ProjectionLabel].AsString()) + assert.Equal(t, true, eventsLabels[0][SuccessLabel].AsBool()) + assert.Equal(t, projection, eventsLabels[1][ProjectionLabel].AsString()) + assert.Equal(t, false, eventsLabels[1][SuccessLabel].AsBool()) + + timingLabels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric) + require.Len(t, timingLabels, 1) + assert.Equal(t, projection, timingLabels[0][ProjectionLabel].AsString()) + + latencyLabels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric) + require.Len(t, latencyLabels, 1) + assert.Equal(t, projection, latencyLabels[0][ProjectionLabel].AsString()) +} diff --git a/internal/telemetry/metrics/config/config.go b/internal/telemetry/metrics/config/config.go index e9bcbe45c2..9e9ebec52b 100644 --- a/internal/telemetry/metrics/config/config.go +++ b/internal/telemetry/metrics/config/config.go @@ -1,6 +1,7 @@ package config import ( + "github.com/zitadel/zitadel/internal/telemetry/metrics" "github.com/zitadel/zitadel/internal/telemetry/metrics/otel" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -12,11 +13,16 @@ type Config struct { var meter = map[string]func(map[string]interface{}) error{ "otel": otel.NewTracerFromConfig, - "none": NoMetrics, - "": NoMetrics, + "none": registerNoopMetrics, + "": registerNoopMetrics, } func (c *Config) NewMeter() error { + // When using start-from-init or start-from-setup the metric provider + // was already set in the setup phase and the start phase must not overwrite it. + if metrics.M != nil { + return nil + } t, ok := meter[c.Type] if !ok { return zerrors.ThrowInternalf(nil, "METER-Dfqsx", "config type %s not supported", c.Type) @@ -25,6 +31,7 @@ func (c *Config) NewMeter() error { return t(c.Config) } -func NoMetrics(_ map[string]interface{}) error { +func registerNoopMetrics(rawConfig map[string]interface{}) (err error) { + metrics.M = &metrics.NoopMetrics{} return nil } diff --git a/internal/telemetry/metrics/metrics.go b/internal/telemetry/metrics/metrics.go index 503ebc22de..b25dc619c0 100644 --- a/internal/telemetry/metrics/metrics.go +++ b/internal/telemetry/metrics/metrics.go @@ -22,8 +22,10 @@ type Metrics interface { GetMetricsProvider() metric.MeterProvider RegisterCounter(name, description string) error AddCount(ctx context.Context, name string, value int64, labels map[string]attribute.Value) error + AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error RegisterValueObserver(name, description string, callbackFunc metric.Int64Callback) error + RegisterHistogram(name, description, unit string, buckets []float64) error } var M Metrics @@ -56,6 +58,20 @@ func AddCount(ctx context.Context, name string, value int64, labels map[string]a return M.AddCount(ctx, name, value, labels) } +func AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + if M == nil { + return nil + } + return M.AddHistogramMeasurement(ctx, name, value, labels) +} + +func RegisterHistogram(name, description, unit string, buckets []float64) error { + if M == nil { + return nil + } + return M.RegisterHistogram(name, description, unit, buckets) +} + func RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { if M == nil { return nil diff --git a/internal/telemetry/metrics/mock.go b/internal/telemetry/metrics/mock.go new file mode 100644 index 0000000000..b28a6f5d40 --- /dev/null +++ b/internal/telemetry/metrics/mock.go @@ -0,0 +1,95 @@ +package metrics + +import ( + "context" + "net/http" + "sync" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// MockMetrics implements the metrics.Metrics interface for testing +type MockMetrics struct { + mu sync.RWMutex + histogramValues map[string][]float64 + counterValues map[string]int64 + histogramLabels map[string][]map[string]attribute.Value + counterLabels map[string][]map[string]attribute.Value +} + +var _ Metrics = new(MockMetrics) + +// NewMockMetrics creates a new Metrics instance for testing +func NewMockMetrics() *MockMetrics { + return &MockMetrics{ + histogramValues: make(map[string][]float64), + counterValues: make(map[string]int64), + histogramLabels: make(map[string][]map[string]attribute.Value), + counterLabels: make(map[string][]map[string]attribute.Value), + } +} + +func (m *MockMetrics) GetExporter() http.Handler { + return nil +} + +func (m *MockMetrics) GetMetricsProvider() metric.MeterProvider { + return nil +} + +func (m *MockMetrics) RegisterCounter(name, description string) error { + return nil +} + +func (m *MockMetrics) AddCount(ctx context.Context, name string, value int64, labels map[string]attribute.Value) error { + m.mu.Lock() + defer m.mu.Unlock() + m.counterValues[name] += value + m.counterLabels[name] = append(m.counterLabels[name], labels) + return nil +} + +func (m *MockMetrics) AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + m.mu.Lock() + defer m.mu.Unlock() + m.histogramValues[name] = append(m.histogramValues[name], value) + m.histogramLabels[name] = append(m.histogramLabels[name], labels) + return nil +} + +func (m *MockMetrics) RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (m *MockMetrics) RegisterValueObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (m *MockMetrics) RegisterHistogram(name, description, unit string, buckets []float64) error { + return nil +} + +func (m *MockMetrics) GetHistogramValues(name string) []float64 { + m.mu.RLock() + defer m.mu.RUnlock() + return m.histogramValues[name] +} + +func (m *MockMetrics) GetHistogramLabels(name string) []map[string]attribute.Value { + m.mu.RLock() + defer m.mu.RUnlock() + return m.histogramLabels[name] +} + +func (m *MockMetrics) GetCounterValue(name string) int64 { + m.mu.RLock() + defer m.mu.RUnlock() + return m.counterValues[name] +} + +func (m *MockMetrics) GetCounterLabels(name string) []map[string]attribute.Value { + m.mu.RLock() + defer m.mu.RUnlock() + return m.counterLabels[name] +} diff --git a/internal/telemetry/metrics/noop.go b/internal/telemetry/metrics/noop.go new file mode 100644 index 0000000000..954db1d2b9 --- /dev/null +++ b/internal/telemetry/metrics/noop.go @@ -0,0 +1,45 @@ +package metrics + +import ( + "context" + "net/http" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type NoopMetrics struct{} + +var _ Metrics = new(NoopMetrics) + +func (n *NoopMetrics) GetExporter() http.Handler { + return nil +} + +func (n *NoopMetrics) GetMetricsProvider() metric.MeterProvider { + return nil +} + +func (n *NoopMetrics) RegisterCounter(name, description string) error { + return nil +} + +func (n *NoopMetrics) AddCount(ctx context.Context, name string, value int64, labels map[string]attribute.Value) error { + return nil +} + +func (n *NoopMetrics) AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + return nil +} + +func (n *NoopMetrics) RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (n *NoopMetrics) RegisterValueObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (n *NoopMetrics) RegisterHistogram(name, description, unit string, buckets []float64) error { + return nil +} diff --git a/internal/telemetry/metrics/otel/open_telemetry.go b/internal/telemetry/metrics/otel/open_telemetry.go index 5335e65234..c4509ed5db 100644 --- a/internal/telemetry/metrics/otel/open_telemetry.go +++ b/internal/telemetry/metrics/otel/open_telemetry.go @@ -24,6 +24,7 @@ type Metrics struct { Counters sync.Map UpDownSumObserver sync.Map ValueObservers sync.Map + Histograms sync.Map } func NewMetrics(meterName string) (metrics.Metrics, error) { @@ -84,6 +85,33 @@ func (m *Metrics) AddCount(ctx context.Context, name string, value int64, labels return nil } +func (m *Metrics) AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + histogram, exists := m.Histograms.Load(name) + if !exists { + return zerrors.ThrowNotFound(nil, "METER-5wwb1", "Errors.Metrics.Histogram.NotFound") + } + histogram.(metric.Float64Histogram).Record(ctx, value, MapToRecordOption(labels)...) + return nil +} + +func (m *Metrics) RegisterHistogram(name, description, unit string, buckets []float64) error { + if _, exists := m.Histograms.Load(name); exists { + return nil + } + + histogram, err := m.Meter.Float64Histogram(name, + metric.WithDescription(description), + metric.WithUnit(unit), + metric.WithExplicitBucketBoundaries(buckets...), + ) + if err != nil { + return err + } + + m.Histograms.Store(name, histogram) + return nil +} + func (m *Metrics) RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { if _, exists := m.UpDownSumObserver.Load(name); exists { return nil @@ -113,15 +141,23 @@ func (m *Metrics) RegisterValueObserver(name, description string, callbackFunc m } func MapToAddOption(labels map[string]attribute.Value) []metric.AddOption { + return []metric.AddOption{metric.WithAttributes(labelsToAttributes(labels)...)} +} + +func MapToRecordOption(labels map[string]attribute.Value) []metric.RecordOption { + return []metric.RecordOption{metric.WithAttributes(labelsToAttributes(labels)...)} +} + +func labelsToAttributes(labels map[string]attribute.Value) []attribute.KeyValue { if labels == nil { return nil } - keyValues := make([]attribute.KeyValue, 0, len(labels)) + attributes := make([]attribute.KeyValue, 0, len(labels)) for key, value := range labels { - keyValues = append(keyValues, attribute.KeyValue{ + attributes = append(attributes, attribute.KeyValue{ Key: attribute.Key(key), Value: value, }) } - return []metric.AddOption{metric.WithAttributes(keyValues...)} + return attributes }