fix(mirror): skip notification projections (#9878)

# Which Problems Are Solved

During the mirror command execution we saw high wait times for
notification projections.

# How the Problems Are Solved

As the events are skipped anyways because the notifications are sent out
by the source Zitadel we skip the projections and just set the current
state.
This commit is contained in:
Silvan
2025-05-12 15:58:23 +02:00
committed by GitHub
parent e302591f09
commit 5331841675
4 changed files with 43 additions and 6 deletions

View File

@@ -248,7 +248,7 @@ func projections(
}()
for i := 0; i < int(config.Projections.ConcurrentInstances); i++ {
go execProjections(ctx, instances, failedInstances, &wg)
go execProjections(ctx, es, instances, failedInstances, &wg)
}
existingInstances := queryInstanceIDs(ctx, client)
@@ -264,7 +264,7 @@ func projections(
logging.WithFields("took", time.Since(start)).Info("projections executed")
}
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
func execProjections(ctx context.Context, es *eventstore.Eventstore, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
for instance := range instances {
logging.WithFields("instance", instance).Info("starting projections")
ctx = internal_authz.WithInstanceID(ctx, instance)
@@ -290,7 +290,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
continue
}
err = notification.ProjectInstance(ctx)
err = notification.SetCurrentState(ctx, es)
if err != nil {
logging.WithFields("instance", instance).OnError(err).Info("trigger notification failed")
failedInstances <- instance

View File

@@ -127,6 +127,11 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig)
return false, nil
}
if config.minPosition > 0 {
currentState.position = config.minPosition
currentState.offset = 0
}
events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState)
if err != nil {
return additionalIteration, err
@@ -147,7 +152,7 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig)
}
func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) {
events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx))
events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx))
if err != nil || len(events) == 0 {
h.log().OnError(err).Debug("filter eventstore failed")
return nil, false, err

View File

@@ -381,6 +381,7 @@ func (h *Handler) existingInstances(ctx context.Context) ([]string, error) {
type triggerConfig struct {
awaitRunning bool
maxPosition float64
minPosition float64
}
type TriggerOpt func(conf *triggerConfig)
@@ -397,6 +398,12 @@ func WithMaxPosition(position float64) TriggerOpt {
}
}
func WithMinPosition(position float64) TriggerOpt {
return func(conf *triggerConfig) {
conf.minPosition = position
}
}
func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) {
config := new(triggerConfig)
for _, opt := range opts {
@@ -507,6 +514,11 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
return false, nil
}
if config.minPosition > 0 {
currentState.position = config.minPosition
currentState.offset = 0
}
var statements []*Statement
statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState)
if err != nil {
@@ -554,7 +566,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
return []*Statement{stmt}, false, nil
}
events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx))
events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx))
if err != nil {
h.log().WithError(err).Debug("filter eventstore failed")
return nil, false, err
@@ -646,7 +658,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *S
return nil
}
func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder {
func (h *Handler) EventQuery(currentState *state) *eventstore.SearchQueryBuilder {
builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AwaitOpenTransactions().
Limit(uint64(h.bulkLimit)).

View File

@@ -59,6 +59,26 @@ func Start(ctx context.Context) {
}
}
func SetCurrentState(ctx context.Context, es *eventstore.Eventstore) error {
if len(projections) == 0 {
return nil
}
position, err := es.LatestSequence(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1))
if err != nil {
return err
}
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("set current state of notification projection")
_, err = projection.Trigger(ctx, handler.WithMinPosition(position))
if err != nil {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("current state of notification projection set")
}
return nil
}
func ProjectInstance(ctx context.Context) error {
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection")