diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 2b74fb2dbc..2a94b7919e 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -26,6 +26,7 @@ import ( "github.com/zitadel/zitadel/internal/notification/handlers" "github.com/zitadel/zitadel/internal/query/projection" static_config "github.com/zitadel/zitadel/internal/static/config" + metrics "github.com/zitadel/zitadel/internal/telemetry/metrics/config" ) type Config struct { @@ -38,6 +39,7 @@ type Config struct { ExternalPort uint16 ExternalSecure bool Log *logging.Config + Metrics metrics.Config EncryptionKeys *encryption.EncryptionKeyConfig DefaultInstance command.InstanceSetup Machine *id.Config @@ -85,6 +87,9 @@ func MustNewConfig(v *viper.Viper) *Config { err = config.Log.SetLogger() logging.OnError(err).Fatal("unable to set logger") + err = config.Metrics.NewMeter() + logging.OnError(err).Fatal("unable to set meter") + id.Configure(config.Machine) // Copy the global role permissions mappings to the instance until we allow instance-level configuration over the API. diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 052f965e22..5c1eeef95d 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -67,6 +67,8 @@ type Handler struct { cacheInvalidations []func(ctx context.Context, aggregates []*eventstore.Aggregate) queryInstances func() ([]string, error) + + metrics *ProjectionMetrics } var _ migration.Migration = (*Handler)(nil) @@ -159,6 +161,8 @@ func NewHandler( aggregates[reducer.Aggregate] = eventTypes } + metrics := NewProjectionMetrics() + handler := &Handler{ projection: projection, client: config.Client, @@ -178,6 +182,7 @@ func NewHandler( } return nil, nil }, + metrics: metrics, } return handler @@ -483,6 +488,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add defer cancel() } + start := time.Now() + tx, err := h.client.BeginTx(txCtx, nil) if err != nil { return false, err @@ -502,7 +509,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add } return additionalIteration, err } - // stop execution if currentState.eventTimestamp >= config.maxCreatedAt + // stop execution if currentState.position >= config.maxPosition if config.maxPosition != 0 && currentState.position >= config.maxPosition { return false, nil } @@ -518,7 +525,14 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add if err == nil { err = commitErr } + + h.metrics.ProjectionEventsProcessed(ctx, h.ProjectionName(), int64(len(statements)), err == nil) + if err == nil && currentState.aggregateID != "" && len(statements) > 0 { + // Don't update projection timing or latency unless we successfully processed events + h.metrics.ProjectionUpdateTiming(ctx, h.ProjectionName(), float64(time.Since(start).Seconds())) + h.metrics.ProjectionStateLatency(ctx, h.ProjectionName(), time.Since(currentState.eventTimestamp).Seconds()) + h.invalidateCaches(ctx, aggregatesFromStatements(statements)) } }() @@ -540,6 +554,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add currentState.aggregateType = statements[lastProcessedIndex].Aggregate.Type currentState.sequence = statements[lastProcessedIndex].Sequence currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate + err = h.setState(tx, currentState) return additionalIteration, err diff --git a/internal/eventstore/handler/v2/metrics.go b/internal/eventstore/handler/v2/metrics.go new file mode 100644 index 0000000000..6876bb3aa4 --- /dev/null +++ b/internal/eventstore/handler/v2/metrics.go @@ -0,0 +1,70 @@ +package handler + +import ( + "context" + + "github.com/zitadel/logging" + "go.opentelemetry.io/otel/attribute" + + "github.com/zitadel/zitadel/internal/telemetry/metrics" +) + +const ( + ProjectionLabel = "projection" + SuccessLabel = "success" + + ProjectionEventsProcessed = "projection_events_processed" + ProjectionHandleTimerMetric = "projection_handle_timer" + ProjectionStateLatencyMetric = "projection_state_latency" +) + +type ProjectionMetrics struct { + provider metrics.Metrics +} + +func NewProjectionMetrics() *ProjectionMetrics { + projectionMetrics := &ProjectionMetrics{provider: metrics.M} + + err := projectionMetrics.provider.RegisterCounter( + ProjectionEventsProcessed, + "Number of events reduced to process projection updates", + ) + logging.OnError(err).Error("failed to register projection events processed counter") + err = projectionMetrics.provider.RegisterHistogram( + ProjectionHandleTimerMetric, + "Time taken to process a projection update", + "s", + []float64{0.005, 0.01, 0.05, 0.1, 1, 5, 10, 30, 60, 120}, + ) + logging.OnError(err).Error("failed to register projection handle timer metric") + err = projectionMetrics.provider.RegisterHistogram( + ProjectionStateLatencyMetric, + "When finishing processing a batch of events, this track the age of the last events seen from current time", + "s", + []float64{0.1, 0.5, 1, 5, 10, 30, 60, 300, 600, 1800}, + ) + logging.OnError(err).Error("failed to register projection state latency metric") + return projectionMetrics +} + +func (m *ProjectionMetrics) ProjectionUpdateTiming(ctx context.Context, projection string, duration float64) { + err := m.provider.AddHistogramMeasurement(ctx, ProjectionHandleTimerMetric, duration, map[string]attribute.Value{ + ProjectionLabel: attribute.StringValue(projection), + }) + logging.OnError(err).Error("failed to add projection trigger timing") +} + +func (m *ProjectionMetrics) ProjectionEventsProcessed(ctx context.Context, projection string, count int64, success bool) { + err := m.provider.AddCount(ctx, ProjectionEventsProcessed, count, map[string]attribute.Value{ + ProjectionLabel: attribute.StringValue(projection), + SuccessLabel: attribute.BoolValue(success), + }) + logging.OnError(err).Error("failed to add projection events processed metric") +} + +func (m *ProjectionMetrics) ProjectionStateLatency(ctx context.Context, projection string, latency float64) { + err := m.provider.AddHistogramMeasurement(ctx, ProjectionStateLatencyMetric, latency, map[string]attribute.Value{ + ProjectionLabel: attribute.StringValue(projection), + }) + logging.OnError(err).Error("failed to add projection state latency metric") +} diff --git a/internal/eventstore/handler/v2/metrics_test.go b/internal/eventstore/handler/v2/metrics_test.go new file mode 100644 index 0000000000..54e7623462 --- /dev/null +++ b/internal/eventstore/handler/v2/metrics_test.go @@ -0,0 +1,132 @@ +package handler + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zitadel/zitadel/internal/telemetry/metrics" +) + +func TestNewProjectionMetrics(t *testing.T) { + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + + metrics := NewProjectionMetrics() + require.NotNil(t, metrics) + assert.NotNil(t, metrics.provider) +} + +func TestProjectionMetrics_ProjectionUpdateTiming(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + duration := 0.5 + + projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration) + + values := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric) + require.Len(t, values, 1) + assert.Equal(t, duration, values[0]) + + labels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric) + require.Len(t, labels, 1) + assert.Equal(t, projection, labels[0][ProjectionLabel].AsString()) +} + +func TestProjectionMetrics_ProjectionEventsProcessed(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + count := int64(5) + success := true + + projectionMetrics.ProjectionEventsProcessed(ctx, projection, count, success) + + value := mockMetrics.GetCounterValue(ProjectionEventsProcessed) + assert.Equal(t, count, value) + + labels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed) + require.Len(t, labels, 1) + assert.Equal(t, projection, labels[0][ProjectionLabel].AsString()) + assert.Equal(t, success, labels[0][SuccessLabel].AsBool()) +} + +func TestProjectionMetrics_ProjectionStateLatency(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + latency := 10.0 + + projectionMetrics.ProjectionStateLatency(ctx, projection, latency) + + values := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric) + require.Len(t, values, 1) + assert.Equal(t, latency, values[0]) + + labels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric) + require.Len(t, labels, 1) + assert.Equal(t, projection, labels[0][ProjectionLabel].AsString()) +} + +func TestProjectionMetrics_Integration(t *testing.T) { + + mockMetrics := metrics.NewMockMetrics() + metrics.M = mockMetrics + projectionMetrics := NewProjectionMetrics() + + ctx := context.Background() + projection := "test_projection" + + start := time.Now() + + projectionMetrics.ProjectionEventsProcessed(ctx, projection, 3, true) + projectionMetrics.ProjectionEventsProcessed(ctx, projection, 1, false) + + duration := time.Since(start).Seconds() + projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration) + + latency := 5.0 + projectionMetrics.ProjectionStateLatency(ctx, projection, latency) + + value := mockMetrics.GetCounterValue(ProjectionEventsProcessed) + assert.Equal(t, int64(4), value) + + timingValues := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric) + require.Len(t, timingValues, 1) + assert.Equal(t, duration, timingValues[0]) + + latencyValues := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric) + require.Len(t, latencyValues, 1) + assert.Equal(t, latency, latencyValues[0]) + + eventsLabels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed) + require.Len(t, eventsLabels, 2) + assert.Equal(t, projection, eventsLabels[0][ProjectionLabel].AsString()) + assert.Equal(t, true, eventsLabels[0][SuccessLabel].AsBool()) + assert.Equal(t, projection, eventsLabels[1][ProjectionLabel].AsString()) + assert.Equal(t, false, eventsLabels[1][SuccessLabel].AsBool()) + + timingLabels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric) + require.Len(t, timingLabels, 1) + assert.Equal(t, projection, timingLabels[0][ProjectionLabel].AsString()) + + latencyLabels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric) + require.Len(t, latencyLabels, 1) + assert.Equal(t, projection, latencyLabels[0][ProjectionLabel].AsString()) +} diff --git a/internal/telemetry/metrics/config/config.go b/internal/telemetry/metrics/config/config.go index e9bcbe45c2..9e9ebec52b 100644 --- a/internal/telemetry/metrics/config/config.go +++ b/internal/telemetry/metrics/config/config.go @@ -1,6 +1,7 @@ package config import ( + "github.com/zitadel/zitadel/internal/telemetry/metrics" "github.com/zitadel/zitadel/internal/telemetry/metrics/otel" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -12,11 +13,16 @@ type Config struct { var meter = map[string]func(map[string]interface{}) error{ "otel": otel.NewTracerFromConfig, - "none": NoMetrics, - "": NoMetrics, + "none": registerNoopMetrics, + "": registerNoopMetrics, } func (c *Config) NewMeter() error { + // When using start-from-init or start-from-setup the metric provider + // was already set in the setup phase and the start phase must not overwrite it. + if metrics.M != nil { + return nil + } t, ok := meter[c.Type] if !ok { return zerrors.ThrowInternalf(nil, "METER-Dfqsx", "config type %s not supported", c.Type) @@ -25,6 +31,7 @@ func (c *Config) NewMeter() error { return t(c.Config) } -func NoMetrics(_ map[string]interface{}) error { +func registerNoopMetrics(rawConfig map[string]interface{}) (err error) { + metrics.M = &metrics.NoopMetrics{} return nil } diff --git a/internal/telemetry/metrics/metrics.go b/internal/telemetry/metrics/metrics.go index 503ebc22de..b25dc619c0 100644 --- a/internal/telemetry/metrics/metrics.go +++ b/internal/telemetry/metrics/metrics.go @@ -22,8 +22,10 @@ type Metrics interface { GetMetricsProvider() metric.MeterProvider RegisterCounter(name, description string) error AddCount(ctx context.Context, name string, value int64, labels map[string]attribute.Value) error + AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error RegisterValueObserver(name, description string, callbackFunc metric.Int64Callback) error + RegisterHistogram(name, description, unit string, buckets []float64) error } var M Metrics @@ -56,6 +58,20 @@ func AddCount(ctx context.Context, name string, value int64, labels map[string]a return M.AddCount(ctx, name, value, labels) } +func AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + if M == nil { + return nil + } + return M.AddHistogramMeasurement(ctx, name, value, labels) +} + +func RegisterHistogram(name, description, unit string, buckets []float64) error { + if M == nil { + return nil + } + return M.RegisterHistogram(name, description, unit, buckets) +} + func RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { if M == nil { return nil diff --git a/internal/telemetry/metrics/mock.go b/internal/telemetry/metrics/mock.go new file mode 100644 index 0000000000..b28a6f5d40 --- /dev/null +++ b/internal/telemetry/metrics/mock.go @@ -0,0 +1,95 @@ +package metrics + +import ( + "context" + "net/http" + "sync" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// MockMetrics implements the metrics.Metrics interface for testing +type MockMetrics struct { + mu sync.RWMutex + histogramValues map[string][]float64 + counterValues map[string]int64 + histogramLabels map[string][]map[string]attribute.Value + counterLabels map[string][]map[string]attribute.Value +} + +var _ Metrics = new(MockMetrics) + +// NewMockMetrics creates a new Metrics instance for testing +func NewMockMetrics() *MockMetrics { + return &MockMetrics{ + histogramValues: make(map[string][]float64), + counterValues: make(map[string]int64), + histogramLabels: make(map[string][]map[string]attribute.Value), + counterLabels: make(map[string][]map[string]attribute.Value), + } +} + +func (m *MockMetrics) GetExporter() http.Handler { + return nil +} + +func (m *MockMetrics) GetMetricsProvider() metric.MeterProvider { + return nil +} + +func (m *MockMetrics) RegisterCounter(name, description string) error { + return nil +} + +func (m *MockMetrics) AddCount(ctx context.Context, name string, value int64, labels map[string]attribute.Value) error { + m.mu.Lock() + defer m.mu.Unlock() + m.counterValues[name] += value + m.counterLabels[name] = append(m.counterLabels[name], labels) + return nil +} + +func (m *MockMetrics) AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + m.mu.Lock() + defer m.mu.Unlock() + m.histogramValues[name] = append(m.histogramValues[name], value) + m.histogramLabels[name] = append(m.histogramLabels[name], labels) + return nil +} + +func (m *MockMetrics) RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (m *MockMetrics) RegisterValueObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (m *MockMetrics) RegisterHistogram(name, description, unit string, buckets []float64) error { + return nil +} + +func (m *MockMetrics) GetHistogramValues(name string) []float64 { + m.mu.RLock() + defer m.mu.RUnlock() + return m.histogramValues[name] +} + +func (m *MockMetrics) GetHistogramLabels(name string) []map[string]attribute.Value { + m.mu.RLock() + defer m.mu.RUnlock() + return m.histogramLabels[name] +} + +func (m *MockMetrics) GetCounterValue(name string) int64 { + m.mu.RLock() + defer m.mu.RUnlock() + return m.counterValues[name] +} + +func (m *MockMetrics) GetCounterLabels(name string) []map[string]attribute.Value { + m.mu.RLock() + defer m.mu.RUnlock() + return m.counterLabels[name] +} diff --git a/internal/telemetry/metrics/noop.go b/internal/telemetry/metrics/noop.go new file mode 100644 index 0000000000..954db1d2b9 --- /dev/null +++ b/internal/telemetry/metrics/noop.go @@ -0,0 +1,45 @@ +package metrics + +import ( + "context" + "net/http" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type NoopMetrics struct{} + +var _ Metrics = new(NoopMetrics) + +func (n *NoopMetrics) GetExporter() http.Handler { + return nil +} + +func (n *NoopMetrics) GetMetricsProvider() metric.MeterProvider { + return nil +} + +func (n *NoopMetrics) RegisterCounter(name, description string) error { + return nil +} + +func (n *NoopMetrics) AddCount(ctx context.Context, name string, value int64, labels map[string]attribute.Value) error { + return nil +} + +func (n *NoopMetrics) AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + return nil +} + +func (n *NoopMetrics) RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (n *NoopMetrics) RegisterValueObserver(name, description string, callbackFunc metric.Int64Callback) error { + return nil +} + +func (n *NoopMetrics) RegisterHistogram(name, description, unit string, buckets []float64) error { + return nil +} diff --git a/internal/telemetry/metrics/otel/open_telemetry.go b/internal/telemetry/metrics/otel/open_telemetry.go index 5335e65234..c4509ed5db 100644 --- a/internal/telemetry/metrics/otel/open_telemetry.go +++ b/internal/telemetry/metrics/otel/open_telemetry.go @@ -24,6 +24,7 @@ type Metrics struct { Counters sync.Map UpDownSumObserver sync.Map ValueObservers sync.Map + Histograms sync.Map } func NewMetrics(meterName string) (metrics.Metrics, error) { @@ -84,6 +85,33 @@ func (m *Metrics) AddCount(ctx context.Context, name string, value int64, labels return nil } +func (m *Metrics) AddHistogramMeasurement(ctx context.Context, name string, value float64, labels map[string]attribute.Value) error { + histogram, exists := m.Histograms.Load(name) + if !exists { + return zerrors.ThrowNotFound(nil, "METER-5wwb1", "Errors.Metrics.Histogram.NotFound") + } + histogram.(metric.Float64Histogram).Record(ctx, value, MapToRecordOption(labels)...) + return nil +} + +func (m *Metrics) RegisterHistogram(name, description, unit string, buckets []float64) error { + if _, exists := m.Histograms.Load(name); exists { + return nil + } + + histogram, err := m.Meter.Float64Histogram(name, + metric.WithDescription(description), + metric.WithUnit(unit), + metric.WithExplicitBucketBoundaries(buckets...), + ) + if err != nil { + return err + } + + m.Histograms.Store(name, histogram) + return nil +} + func (m *Metrics) RegisterUpDownSumObserver(name, description string, callbackFunc metric.Int64Callback) error { if _, exists := m.UpDownSumObserver.Load(name); exists { return nil @@ -113,15 +141,23 @@ func (m *Metrics) RegisterValueObserver(name, description string, callbackFunc m } func MapToAddOption(labels map[string]attribute.Value) []metric.AddOption { + return []metric.AddOption{metric.WithAttributes(labelsToAttributes(labels)...)} +} + +func MapToRecordOption(labels map[string]attribute.Value) []metric.RecordOption { + return []metric.RecordOption{metric.WithAttributes(labelsToAttributes(labels)...)} +} + +func labelsToAttributes(labels map[string]attribute.Value) []attribute.KeyValue { if labels == nil { return nil } - keyValues := make([]attribute.KeyValue, 0, len(labels)) + attributes := make([]attribute.KeyValue, 0, len(labels)) for key, value := range labels { - keyValues = append(keyValues, attribute.KeyValue{ + attributes = append(attributes, attribute.KeyValue{ Key: attribute.Key(key), Value: value, }) } - return []metric.AddOption{metric.WithAttributes(keyValues...)} + return attributes }