diff --git a/cmd/mirror/projections.go b/cmd/mirror/projections.go index 24ac0e0289..5d1c523cef 100644 --- a/cmd/mirror/projections.go +++ b/cmd/mirror/projections.go @@ -248,7 +248,7 @@ func projections( }() for i := 0; i < int(config.Projections.ConcurrentInstances); i++ { - go execProjections(ctx, instances, failedInstances, &wg) + go execProjections(ctx, es, instances, failedInstances, &wg) } existingInstances := queryInstanceIDs(ctx, client) @@ -264,7 +264,7 @@ func projections( logging.WithFields("took", time.Since(start)).Info("projections executed") } -func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) { +func execProjections(ctx context.Context, es *eventstore.Eventstore, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) { for instance := range instances { logging.WithFields("instance", instance).Info("starting projections") ctx = internal_authz.WithInstanceID(ctx, instance) @@ -290,7 +290,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc continue } - err = notification.ProjectInstance(ctx) + err = notification.SetCurrentState(ctx, es) if err != nil { logging.WithFields("instance", instance).OnError(err).Info("trigger notification failed") failedInstances <- instance diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go index 8b71f32519..6f09c05d74 100644 --- a/internal/eventstore/handler/v2/field_handler.go +++ b/internal/eventstore/handler/v2/field_handler.go @@ -127,6 +127,11 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) return false, nil } + if config.minPosition > 0 { + currentState.position = config.minPosition + currentState.offset = 0 + } + events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState) if err != nil { return additionalIteration, err @@ -147,7 +152,7 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) } func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) { - events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx)) + events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx)) if err != nil || len(events) == 0 { h.log().OnError(err).Debug("filter eventstore failed") return nil, false, err diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index c2e2b2a355..2eec16a545 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -381,6 +381,7 @@ func (h *Handler) existingInstances(ctx context.Context) ([]string, error) { type triggerConfig struct { awaitRunning bool maxPosition float64 + minPosition float64 } type TriggerOpt func(conf *triggerConfig) @@ -397,6 +398,12 @@ func WithMaxPosition(position float64) TriggerOpt { } } +func WithMinPosition(position float64) TriggerOpt { + return func(conf *triggerConfig) { + conf.minPosition = position + } +} + func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) { config := new(triggerConfig) for _, opt := range opts { @@ -507,6 +514,11 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add return false, nil } + if config.minPosition > 0 { + currentState.position = config.minPosition + currentState.offset = 0 + } + var statements []*Statement statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState) if err != nil { @@ -554,7 +566,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta return []*Statement{stmt}, false, nil } - events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx)) + events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx)) if err != nil { h.log().WithError(err).Debug("filter eventstore failed") return nil, false, err @@ -646,7 +658,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *S return nil } -func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder { +func (h *Handler) EventQuery(currentState *state) *eventstore.SearchQueryBuilder { builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AwaitOpenTransactions(). Limit(uint64(h.bulkLimit)). diff --git a/internal/notification/projections.go b/internal/notification/projections.go index 4383a6b7b1..bc12fc64a7 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -59,6 +59,26 @@ func Start(ctx context.Context) { } } +func SetCurrentState(ctx context.Context, es *eventstore.Eventstore) error { + if len(projections) == 0 { + return nil + } + position, err := es.LatestSequence(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1)) + if err != nil { + return err + } + + for i, projection := range projections { + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("set current state of notification projection") + _, err = projection.Trigger(ctx, handler.WithMinPosition(position)) + if err != nil { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("current state of notification projection set") + } + return nil +} + func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection")