fix: add prometheus metrics on projection handlers (#9561)

# Which Problems Are Solved

With current provided telemetry it's difficult to predict when a
projection handler is under increased load until it's too late and
causes downstream issues. Importantly, projection updating is in the
critical path for many login flows and increased latency there can
result in system downtime for users.

# How the Problems Are Solved

This PR adds three new prometheus-style metrics:
1. **projection_events_processed** (_labels: projection, success_) -
This metric gives us a counter of the number of events processed per
projection update run and whether they we're processed without error. A
high number of events being processed can let us know how busy a
particular projection handler is.

2. **projection_handle_timer** _(labels: projection)_ - This is the time
it takes to process a projection update given a batch of events - time
to take the current_states lock, query for new events, reduce,
update_the projection, and update current_states.

3. **projection_state_latency** _(labels: projection)_ - This is the
time from the last event processed in the current_states table for a
given projection. It tells us how old was the last event you processed?
Or, how far behind are you running for this projection? Higher latencies
could mean high load or stalled projection handling.

# Additional Changes

I also had to initialize the global otel metrics provider (`metrics.M`)
in the `setup` step additionally to `start` since projection handlers
are initialized at setup. The initialization checks if a metrics
provider is already set (in case of `start-from-setup` or
`start-from-init` to prevent overwriting, which causes the otel metrics
provider to stop working.

# Additional Context

## Example Dashboards

![image](https://github.com/user-attachments/assets/94ba5c2b-9c62-44cd-83ee-4db4a8859073)

![image](https://github.com/user-attachments/assets/60a1b406-a8c6-48dc-a925-575359f97e1e)

---------

Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com>
Co-authored-by: Livio Spring <livio.a@gmail.com>
(cherry picked from commit c1535b7b49)
This commit is contained in:
Zach Hirschtritt
2025-03-27 02:40:27 -04:00
committed by Livio Spring
parent 12b78e5a36
commit 61ddecee31
9 changed files with 428 additions and 7 deletions

View File

@@ -67,6 +67,8 @@ type Handler struct {
cacheInvalidations []func(ctx context.Context, aggregates []*eventstore.Aggregate)
queryInstances func() ([]string, error)
metrics *ProjectionMetrics
}
var _ migration.Migration = (*Handler)(nil)
@@ -159,6 +161,8 @@ func NewHandler(
aggregates[reducer.Aggregate] = eventTypes
}
metrics := NewProjectionMetrics()
handler := &Handler{
projection: projection,
client: config.Client,
@@ -178,6 +182,7 @@ func NewHandler(
}
return nil, nil
},
metrics: metrics,
}
return handler
@@ -483,6 +488,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
defer cancel()
}
start := time.Now()
tx, err := h.client.BeginTx(txCtx, nil)
if err != nil {
return false, err
@@ -502,7 +509,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
}
return additionalIteration, err
}
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt
// stop execution if currentState.position >= config.maxPosition
if config.maxPosition != 0 && currentState.position >= config.maxPosition {
return false, nil
}
@@ -518,7 +525,14 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
if err == nil {
err = commitErr
}
h.metrics.ProjectionEventsProcessed(ctx, h.ProjectionName(), int64(len(statements)), err == nil)
if err == nil && currentState.aggregateID != "" && len(statements) > 0 {
// Don't update projection timing or latency unless we successfully processed events
h.metrics.ProjectionUpdateTiming(ctx, h.ProjectionName(), float64(time.Since(start).Seconds()))
h.metrics.ProjectionStateLatency(ctx, h.ProjectionName(), time.Since(currentState.eventTimestamp).Seconds())
h.invalidateCaches(ctx, aggregatesFromStatements(statements))
}
}()
@@ -540,6 +554,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
currentState.aggregateType = statements[lastProcessedIndex].Aggregate.Type
currentState.sequence = statements[lastProcessedIndex].Sequence
currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate
err = h.setState(tx, currentState)
return additionalIteration, err

View File

@@ -0,0 +1,70 @@
package handler
import (
"context"
"github.com/zitadel/logging"
"go.opentelemetry.io/otel/attribute"
"github.com/zitadel/zitadel/internal/telemetry/metrics"
)
const (
ProjectionLabel = "projection"
SuccessLabel = "success"
ProjectionEventsProcessed = "projection_events_processed"
ProjectionHandleTimerMetric = "projection_handle_timer"
ProjectionStateLatencyMetric = "projection_state_latency"
)
type ProjectionMetrics struct {
provider metrics.Metrics
}
func NewProjectionMetrics() *ProjectionMetrics {
projectionMetrics := &ProjectionMetrics{provider: metrics.M}
err := projectionMetrics.provider.RegisterCounter(
ProjectionEventsProcessed,
"Number of events reduced to process projection updates",
)
logging.OnError(err).Error("failed to register projection events processed counter")
err = projectionMetrics.provider.RegisterHistogram(
ProjectionHandleTimerMetric,
"Time taken to process a projection update",
"s",
[]float64{0.005, 0.01, 0.05, 0.1, 1, 5, 10, 30, 60, 120},
)
logging.OnError(err).Error("failed to register projection handle timer metric")
err = projectionMetrics.provider.RegisterHistogram(
ProjectionStateLatencyMetric,
"When finishing processing a batch of events, this track the age of the last events seen from current time",
"s",
[]float64{0.1, 0.5, 1, 5, 10, 30, 60, 300, 600, 1800},
)
logging.OnError(err).Error("failed to register projection state latency metric")
return projectionMetrics
}
func (m *ProjectionMetrics) ProjectionUpdateTiming(ctx context.Context, projection string, duration float64) {
err := m.provider.AddHistogramMeasurement(ctx, ProjectionHandleTimerMetric, duration, map[string]attribute.Value{
ProjectionLabel: attribute.StringValue(projection),
})
logging.OnError(err).Error("failed to add projection trigger timing")
}
func (m *ProjectionMetrics) ProjectionEventsProcessed(ctx context.Context, projection string, count int64, success bool) {
err := m.provider.AddCount(ctx, ProjectionEventsProcessed, count, map[string]attribute.Value{
ProjectionLabel: attribute.StringValue(projection),
SuccessLabel: attribute.BoolValue(success),
})
logging.OnError(err).Error("failed to add projection events processed metric")
}
func (m *ProjectionMetrics) ProjectionStateLatency(ctx context.Context, projection string, latency float64) {
err := m.provider.AddHistogramMeasurement(ctx, ProjectionStateLatencyMetric, latency, map[string]attribute.Value{
ProjectionLabel: attribute.StringValue(projection),
})
logging.OnError(err).Error("failed to add projection state latency metric")
}

View File

@@ -0,0 +1,132 @@
package handler
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/internal/telemetry/metrics"
)
func TestNewProjectionMetrics(t *testing.T) {
mockMetrics := metrics.NewMockMetrics()
metrics.M = mockMetrics
metrics := NewProjectionMetrics()
require.NotNil(t, metrics)
assert.NotNil(t, metrics.provider)
}
func TestProjectionMetrics_ProjectionUpdateTiming(t *testing.T) {
mockMetrics := metrics.NewMockMetrics()
metrics.M = mockMetrics
projectionMetrics := NewProjectionMetrics()
ctx := context.Background()
projection := "test_projection"
duration := 0.5
projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration)
values := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric)
require.Len(t, values, 1)
assert.Equal(t, duration, values[0])
labels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric)
require.Len(t, labels, 1)
assert.Equal(t, projection, labels[0][ProjectionLabel].AsString())
}
func TestProjectionMetrics_ProjectionEventsProcessed(t *testing.T) {
mockMetrics := metrics.NewMockMetrics()
metrics.M = mockMetrics
projectionMetrics := NewProjectionMetrics()
ctx := context.Background()
projection := "test_projection"
count := int64(5)
success := true
projectionMetrics.ProjectionEventsProcessed(ctx, projection, count, success)
value := mockMetrics.GetCounterValue(ProjectionEventsProcessed)
assert.Equal(t, count, value)
labels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed)
require.Len(t, labels, 1)
assert.Equal(t, projection, labels[0][ProjectionLabel].AsString())
assert.Equal(t, success, labels[0][SuccessLabel].AsBool())
}
func TestProjectionMetrics_ProjectionStateLatency(t *testing.T) {
mockMetrics := metrics.NewMockMetrics()
metrics.M = mockMetrics
projectionMetrics := NewProjectionMetrics()
ctx := context.Background()
projection := "test_projection"
latency := 10.0
projectionMetrics.ProjectionStateLatency(ctx, projection, latency)
values := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric)
require.Len(t, values, 1)
assert.Equal(t, latency, values[0])
labels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric)
require.Len(t, labels, 1)
assert.Equal(t, projection, labels[0][ProjectionLabel].AsString())
}
func TestProjectionMetrics_Integration(t *testing.T) {
mockMetrics := metrics.NewMockMetrics()
metrics.M = mockMetrics
projectionMetrics := NewProjectionMetrics()
ctx := context.Background()
projection := "test_projection"
start := time.Now()
projectionMetrics.ProjectionEventsProcessed(ctx, projection, 3, true)
projectionMetrics.ProjectionEventsProcessed(ctx, projection, 1, false)
duration := time.Since(start).Seconds()
projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration)
latency := 5.0
projectionMetrics.ProjectionStateLatency(ctx, projection, latency)
value := mockMetrics.GetCounterValue(ProjectionEventsProcessed)
assert.Equal(t, int64(4), value)
timingValues := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric)
require.Len(t, timingValues, 1)
assert.Equal(t, duration, timingValues[0])
latencyValues := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric)
require.Len(t, latencyValues, 1)
assert.Equal(t, latency, latencyValues[0])
eventsLabels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed)
require.Len(t, eventsLabels, 2)
assert.Equal(t, projection, eventsLabels[0][ProjectionLabel].AsString())
assert.Equal(t, true, eventsLabels[0][SuccessLabel].AsBool())
assert.Equal(t, projection, eventsLabels[1][ProjectionLabel].AsString())
assert.Equal(t, false, eventsLabels[1][SuccessLabel].AsBool())
timingLabels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric)
require.Len(t, timingLabels, 1)
assert.Equal(t, projection, timingLabels[0][ProjectionLabel].AsString())
latencyLabels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric)
require.Len(t, latencyLabels, 1)
assert.Equal(t, projection, latencyLabels[0][ProjectionLabel].AsString())
}