From 533184167579fe4b91e1f35feca80530a653a473 Mon Sep 17 00:00:00 2001 From: Silvan <27845747+adlerhurst@users.noreply.github.com> Date: Mon, 12 May 2025 15:58:23 +0200 Subject: [PATCH] fix(mirror): skip notification projections (#9878) # Which Problems Are Solved During the mirror command execution we saw high wait times for notification projections. # How the Problems Are Solved As the events are skipped anyways because the notifications are sent out by the source Zitadel we skip the projections and just set the current state. --- cmd/mirror/projections.go | 6 +++--- .../eventstore/handler/v2/field_handler.go | 7 ++++++- internal/eventstore/handler/v2/handler.go | 16 +++++++++++++-- internal/notification/projections.go | 20 +++++++++++++++++++ 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/cmd/mirror/projections.go b/cmd/mirror/projections.go index 24ac0e0289..5d1c523cef 100644 --- a/cmd/mirror/projections.go +++ b/cmd/mirror/projections.go @@ -248,7 +248,7 @@ func projections( }() for i := 0; i < int(config.Projections.ConcurrentInstances); i++ { - go execProjections(ctx, instances, failedInstances, &wg) + go execProjections(ctx, es, instances, failedInstances, &wg) } existingInstances := queryInstanceIDs(ctx, client) @@ -264,7 +264,7 @@ func projections( logging.WithFields("took", time.Since(start)).Info("projections executed") } -func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) { +func execProjections(ctx context.Context, es *eventstore.Eventstore, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) { for instance := range instances { logging.WithFields("instance", instance).Info("starting projections") ctx = internal_authz.WithInstanceID(ctx, instance) @@ -290,7 +290,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc continue } - err = notification.ProjectInstance(ctx) + err = notification.SetCurrentState(ctx, es) if err != nil { logging.WithFields("instance", instance).OnError(err).Info("trigger notification failed") failedInstances <- instance diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go index 8b71f32519..6f09c05d74 100644 --- a/internal/eventstore/handler/v2/field_handler.go +++ b/internal/eventstore/handler/v2/field_handler.go @@ -127,6 +127,11 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) return false, nil } + if config.minPosition > 0 { + currentState.position = config.minPosition + currentState.offset = 0 + } + events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState) if err != nil { return additionalIteration, err @@ -147,7 +152,7 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) } func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) { - events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx)) + events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx)) if err != nil || len(events) == 0 { h.log().OnError(err).Debug("filter eventstore failed") return nil, false, err diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index c2e2b2a355..2eec16a545 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -381,6 +381,7 @@ func (h *Handler) existingInstances(ctx context.Context) ([]string, error) { type triggerConfig struct { awaitRunning bool maxPosition float64 + minPosition float64 } type TriggerOpt func(conf *triggerConfig) @@ -397,6 +398,12 @@ func WithMaxPosition(position float64) TriggerOpt { } } +func WithMinPosition(position float64) TriggerOpt { + return func(conf *triggerConfig) { + conf.minPosition = position + } +} + func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) { config := new(triggerConfig) for _, opt := range opts { @@ -507,6 +514,11 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add return false, nil } + if config.minPosition > 0 { + currentState.position = config.minPosition + currentState.offset = 0 + } + var statements []*Statement statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState) if err != nil { @@ -554,7 +566,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta return []*Statement{stmt}, false, nil } - events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx)) + events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx)) if err != nil { h.log().WithError(err).Debug("filter eventstore failed") return nil, false, err @@ -646,7 +658,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *S return nil } -func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder { +func (h *Handler) EventQuery(currentState *state) *eventstore.SearchQueryBuilder { builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AwaitOpenTransactions(). Limit(uint64(h.bulkLimit)). diff --git a/internal/notification/projections.go b/internal/notification/projections.go index 4383a6b7b1..bc12fc64a7 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -59,6 +59,26 @@ func Start(ctx context.Context) { } } +func SetCurrentState(ctx context.Context, es *eventstore.Eventstore) error { + if len(projections) == 0 { + return nil + } + position, err := es.LatestSequence(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1)) + if err != nil { + return err + } + + for i, projection := range projections { + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("set current state of notification projection") + _, err = projection.Trigger(ctx, handler.WithMinPosition(position)) + if err != nil { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("current state of notification projection set") + } + return nil +} + func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection")