mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 01:37:31 +00:00
Merge branch 'main' into package-structure
This commit is contained in:
@@ -67,6 +67,8 @@ type Handler struct {
|
||||
cacheInvalidations []func(ctx context.Context, aggregates []*eventstore.Aggregate)
|
||||
|
||||
queryInstances func() ([]string, error)
|
||||
|
||||
metrics *ProjectionMetrics
|
||||
}
|
||||
|
||||
var _ migration.Migration = (*Handler)(nil)
|
||||
@@ -159,6 +161,8 @@ func NewHandler(
|
||||
aggregates[reducer.Aggregate] = eventTypes
|
||||
}
|
||||
|
||||
metrics := NewProjectionMetrics()
|
||||
|
||||
handler := &Handler{
|
||||
projection: projection,
|
||||
client: config.Client,
|
||||
@@ -178,6 +182,7 @@ func NewHandler(
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
metrics: metrics,
|
||||
}
|
||||
|
||||
return handler
|
||||
@@ -263,12 +268,16 @@ func (h *Handler) triggerInstances(ctx context.Context, instances []string, trig
|
||||
|
||||
// simple implementation of do while
|
||||
_, err := h.Trigger(instanceCtx, triggerOpts...)
|
||||
h.log().WithField("instance", instance).OnError(err).Debug("trigger failed")
|
||||
// skip retry if everything is fine
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
h.log().WithField("instance", instance).WithError(err).Debug("trigger failed")
|
||||
time.Sleep(h.retryFailedAfter)
|
||||
// retry if trigger failed
|
||||
for ; err != nil; _, err = h.Trigger(instanceCtx, triggerOpts...) {
|
||||
time.Sleep(h.retryFailedAfter)
|
||||
h.log().WithField("instance", instance).OnError(err).Debug("trigger failed")
|
||||
h.log().WithField("instance", instance).WithError(err).Debug("trigger failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -479,6 +488,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
tx, err := h.client.BeginTx(txCtx, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -498,7 +509,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
}
|
||||
return additionalIteration, err
|
||||
}
|
||||
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt
|
||||
// stop execution if currentState.position >= config.maxPosition
|
||||
if config.maxPosition != 0 && currentState.position >= config.maxPosition {
|
||||
return false, nil
|
||||
}
|
||||
@@ -514,7 +525,14 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
if err == nil {
|
||||
err = commitErr
|
||||
}
|
||||
|
||||
h.metrics.ProjectionEventsProcessed(ctx, h.ProjectionName(), int64(len(statements)), err == nil)
|
||||
|
||||
if err == nil && currentState.aggregateID != "" && len(statements) > 0 {
|
||||
// Don't update projection timing or latency unless we successfully processed events
|
||||
h.metrics.ProjectionUpdateTiming(ctx, h.ProjectionName(), float64(time.Since(start).Seconds()))
|
||||
h.metrics.ProjectionStateLatency(ctx, h.ProjectionName(), time.Since(currentState.eventTimestamp).Seconds())
|
||||
|
||||
h.invalidateCaches(ctx, aggregatesFromStatements(statements))
|
||||
}
|
||||
}()
|
||||
@@ -536,6 +554,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
currentState.aggregateType = statements[lastProcessedIndex].Aggregate.Type
|
||||
currentState.sequence = statements[lastProcessedIndex].Sequence
|
||||
currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate
|
||||
|
||||
err = h.setState(tx, currentState)
|
||||
|
||||
return additionalIteration, err
|
||||
@@ -646,7 +665,6 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
|
||||
builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
AwaitOpenTransactions().
|
||||
Limit(uint64(h.bulkLimit)).
|
||||
AllowTimeTravel().
|
||||
OrderAsc().
|
||||
InstanceID(currentState.instanceID)
|
||||
|
||||
@@ -658,15 +676,15 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
|
||||
}
|
||||
}
|
||||
|
||||
for aggregateType, eventTypes := range h.eventTypes {
|
||||
builder = builder.
|
||||
AddQuery().
|
||||
AggregateTypes(aggregateType).
|
||||
EventTypes(eventTypes...).
|
||||
Builder()
|
||||
aggregateTypes := make([]eventstore.AggregateType, 0, len(h.eventTypes))
|
||||
eventTypes := make([]eventstore.EventType, 0, len(h.eventTypes))
|
||||
|
||||
for aggregate, events := range h.eventTypes {
|
||||
aggregateTypes = append(aggregateTypes, aggregate)
|
||||
eventTypes = append(eventTypes, events...)
|
||||
}
|
||||
|
||||
return builder
|
||||
return builder.AddQuery().AggregateTypes(aggregateTypes...).EventTypes(eventTypes...).Builder()
|
||||
}
|
||||
|
||||
// ProjectionName returns the name of the underlying projection.
|
||||
|
70
internal/eventstore/handler/v2/metrics.go
Normal file
70
internal/eventstore/handler/v2/metrics.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/telemetry/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
ProjectionLabel = "projection"
|
||||
SuccessLabel = "success"
|
||||
|
||||
ProjectionEventsProcessed = "projection_events_processed"
|
||||
ProjectionHandleTimerMetric = "projection_handle_timer"
|
||||
ProjectionStateLatencyMetric = "projection_state_latency"
|
||||
)
|
||||
|
||||
type ProjectionMetrics struct {
|
||||
provider metrics.Metrics
|
||||
}
|
||||
|
||||
func NewProjectionMetrics() *ProjectionMetrics {
|
||||
projectionMetrics := &ProjectionMetrics{provider: metrics.M}
|
||||
|
||||
err := projectionMetrics.provider.RegisterCounter(
|
||||
ProjectionEventsProcessed,
|
||||
"Number of events reduced to process projection updates",
|
||||
)
|
||||
logging.OnError(err).Error("failed to register projection events processed counter")
|
||||
err = projectionMetrics.provider.RegisterHistogram(
|
||||
ProjectionHandleTimerMetric,
|
||||
"Time taken to process a projection update",
|
||||
"s",
|
||||
[]float64{0.005, 0.01, 0.05, 0.1, 1, 5, 10, 30, 60, 120},
|
||||
)
|
||||
logging.OnError(err).Error("failed to register projection handle timer metric")
|
||||
err = projectionMetrics.provider.RegisterHistogram(
|
||||
ProjectionStateLatencyMetric,
|
||||
"When finishing processing a batch of events, this track the age of the last events seen from current time",
|
||||
"s",
|
||||
[]float64{0.1, 0.5, 1, 5, 10, 30, 60, 300, 600, 1800},
|
||||
)
|
||||
logging.OnError(err).Error("failed to register projection state latency metric")
|
||||
return projectionMetrics
|
||||
}
|
||||
|
||||
func (m *ProjectionMetrics) ProjectionUpdateTiming(ctx context.Context, projection string, duration float64) {
|
||||
err := m.provider.AddHistogramMeasurement(ctx, ProjectionHandleTimerMetric, duration, map[string]attribute.Value{
|
||||
ProjectionLabel: attribute.StringValue(projection),
|
||||
})
|
||||
logging.OnError(err).Error("failed to add projection trigger timing")
|
||||
}
|
||||
|
||||
func (m *ProjectionMetrics) ProjectionEventsProcessed(ctx context.Context, projection string, count int64, success bool) {
|
||||
err := m.provider.AddCount(ctx, ProjectionEventsProcessed, count, map[string]attribute.Value{
|
||||
ProjectionLabel: attribute.StringValue(projection),
|
||||
SuccessLabel: attribute.BoolValue(success),
|
||||
})
|
||||
logging.OnError(err).Error("failed to add projection events processed metric")
|
||||
}
|
||||
|
||||
func (m *ProjectionMetrics) ProjectionStateLatency(ctx context.Context, projection string, latency float64) {
|
||||
err := m.provider.AddHistogramMeasurement(ctx, ProjectionStateLatencyMetric, latency, map[string]attribute.Value{
|
||||
ProjectionLabel: attribute.StringValue(projection),
|
||||
})
|
||||
logging.OnError(err).Error("failed to add projection state latency metric")
|
||||
}
|
132
internal/eventstore/handler/v2/metrics_test.go
Normal file
132
internal/eventstore/handler/v2/metrics_test.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/telemetry/metrics"
|
||||
)
|
||||
|
||||
func TestNewProjectionMetrics(t *testing.T) {
|
||||
mockMetrics := metrics.NewMockMetrics()
|
||||
metrics.M = mockMetrics
|
||||
|
||||
metrics := NewProjectionMetrics()
|
||||
require.NotNil(t, metrics)
|
||||
assert.NotNil(t, metrics.provider)
|
||||
}
|
||||
|
||||
func TestProjectionMetrics_ProjectionUpdateTiming(t *testing.T) {
|
||||
|
||||
mockMetrics := metrics.NewMockMetrics()
|
||||
metrics.M = mockMetrics
|
||||
projectionMetrics := NewProjectionMetrics()
|
||||
|
||||
ctx := context.Background()
|
||||
projection := "test_projection"
|
||||
duration := 0.5
|
||||
|
||||
projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration)
|
||||
|
||||
values := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric)
|
||||
require.Len(t, values, 1)
|
||||
assert.Equal(t, duration, values[0])
|
||||
|
||||
labels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric)
|
||||
require.Len(t, labels, 1)
|
||||
assert.Equal(t, projection, labels[0][ProjectionLabel].AsString())
|
||||
}
|
||||
|
||||
func TestProjectionMetrics_ProjectionEventsProcessed(t *testing.T) {
|
||||
|
||||
mockMetrics := metrics.NewMockMetrics()
|
||||
metrics.M = mockMetrics
|
||||
projectionMetrics := NewProjectionMetrics()
|
||||
|
||||
ctx := context.Background()
|
||||
projection := "test_projection"
|
||||
count := int64(5)
|
||||
success := true
|
||||
|
||||
projectionMetrics.ProjectionEventsProcessed(ctx, projection, count, success)
|
||||
|
||||
value := mockMetrics.GetCounterValue(ProjectionEventsProcessed)
|
||||
assert.Equal(t, count, value)
|
||||
|
||||
labels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed)
|
||||
require.Len(t, labels, 1)
|
||||
assert.Equal(t, projection, labels[0][ProjectionLabel].AsString())
|
||||
assert.Equal(t, success, labels[0][SuccessLabel].AsBool())
|
||||
}
|
||||
|
||||
func TestProjectionMetrics_ProjectionStateLatency(t *testing.T) {
|
||||
|
||||
mockMetrics := metrics.NewMockMetrics()
|
||||
metrics.M = mockMetrics
|
||||
projectionMetrics := NewProjectionMetrics()
|
||||
|
||||
ctx := context.Background()
|
||||
projection := "test_projection"
|
||||
latency := 10.0
|
||||
|
||||
projectionMetrics.ProjectionStateLatency(ctx, projection, latency)
|
||||
|
||||
values := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric)
|
||||
require.Len(t, values, 1)
|
||||
assert.Equal(t, latency, values[0])
|
||||
|
||||
labels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric)
|
||||
require.Len(t, labels, 1)
|
||||
assert.Equal(t, projection, labels[0][ProjectionLabel].AsString())
|
||||
}
|
||||
|
||||
func TestProjectionMetrics_Integration(t *testing.T) {
|
||||
|
||||
mockMetrics := metrics.NewMockMetrics()
|
||||
metrics.M = mockMetrics
|
||||
projectionMetrics := NewProjectionMetrics()
|
||||
|
||||
ctx := context.Background()
|
||||
projection := "test_projection"
|
||||
|
||||
start := time.Now()
|
||||
|
||||
projectionMetrics.ProjectionEventsProcessed(ctx, projection, 3, true)
|
||||
projectionMetrics.ProjectionEventsProcessed(ctx, projection, 1, false)
|
||||
|
||||
duration := time.Since(start).Seconds()
|
||||
projectionMetrics.ProjectionUpdateTiming(ctx, projection, duration)
|
||||
|
||||
latency := 5.0
|
||||
projectionMetrics.ProjectionStateLatency(ctx, projection, latency)
|
||||
|
||||
value := mockMetrics.GetCounterValue(ProjectionEventsProcessed)
|
||||
assert.Equal(t, int64(4), value)
|
||||
|
||||
timingValues := mockMetrics.GetHistogramValues(ProjectionHandleTimerMetric)
|
||||
require.Len(t, timingValues, 1)
|
||||
assert.Equal(t, duration, timingValues[0])
|
||||
|
||||
latencyValues := mockMetrics.GetHistogramValues(ProjectionStateLatencyMetric)
|
||||
require.Len(t, latencyValues, 1)
|
||||
assert.Equal(t, latency, latencyValues[0])
|
||||
|
||||
eventsLabels := mockMetrics.GetCounterLabels(ProjectionEventsProcessed)
|
||||
require.Len(t, eventsLabels, 2)
|
||||
assert.Equal(t, projection, eventsLabels[0][ProjectionLabel].AsString())
|
||||
assert.Equal(t, true, eventsLabels[0][SuccessLabel].AsBool())
|
||||
assert.Equal(t, projection, eventsLabels[1][ProjectionLabel].AsString())
|
||||
assert.Equal(t, false, eventsLabels[1][SuccessLabel].AsBool())
|
||||
|
||||
timingLabels := mockMetrics.GetHistogramLabels(ProjectionHandleTimerMetric)
|
||||
require.Len(t, timingLabels, 1)
|
||||
assert.Equal(t, projection, timingLabels[0][ProjectionLabel].AsString())
|
||||
|
||||
latencyLabels := mockMetrics.GetHistogramLabels(ProjectionStateLatencyMetric)
|
||||
require.Len(t, latencyLabels, 1)
|
||||
assert.Equal(t, projection, latencyLabels[0][ProjectionLabel].AsString())
|
||||
}
|
Reference in New Issue
Block a user