diff --git a/internal/notification/handlers/already_handled.go b/internal/notification/handlers/already_handled.go index 9745661ace..d4aca53aea 100644 --- a/internal/notification/handlers/already_handled.go +++ b/internal/notification/handlers/already_handled.go @@ -6,21 +6,45 @@ import ( "github.com/zitadel/zitadel/internal/eventstore" ) -func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, aggregateType eventstore.AggregateType, eventTypes ...eventstore.EventType) (bool, error) { - events, err := n.es.Filter( - ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - InstanceID(event.Aggregate().InstanceID). - SequenceGreater(event.Sequence()). - AddQuery(). - AggregateTypes(aggregateType). - AggregateIDs(event.Aggregate().ID). - EventTypes(eventTypes...). - EventData(data). - Builder(), - ) +type alreadyHandled struct { + event eventstore.Event + eventTypes []eventstore.EventType + data map[string]interface{} + + handled bool +} + +func (a *alreadyHandled) Reduce() error { + return nil +} + +func (a *alreadyHandled) AppendEvents(event ...eventstore.Event) { + if len(event) > 0 { + a.handled = true + } +} + +func (a *alreadyHandled) Query() *eventstore.SearchQueryBuilder { + return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + InstanceID(a.event.Aggregate().InstanceID). + SequenceGreater(a.event.Sequence()). + AddQuery(). + AggregateTypes(a.event.Aggregate().Type). + AggregateIDs(a.event.Aggregate().ID). + EventTypes(a.eventTypes...). + EventData(a.data). + Builder() +} + +func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) { + already := &alreadyHandled{ + event: event, + eventTypes: eventTypes, + data: data, + } + err := n.es.FilterToQueryReducer(ctx, already) if err != nil { return false, err } - return len(events) > 0, nil + return already.handled, nil } diff --git a/internal/notification/handlers/quota_notifier.go b/internal/notification/handlers/quota_notifier.go index d0ed12baa2..c455b9955c 100644 --- a/internal/notification/handlers/quota_notifier.go +++ b/internal/notification/handlers/quota_notifier.go @@ -65,7 +65,7 @@ func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler. return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.AggregateType, quota.NotifiedEventType) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType) if err != nil { return err } diff --git a/internal/notification/handlers/user_notifier.go b/internal/notification/handlers/user_notifier.go index 9208fbc4b5..f55345bff6 100644 --- a/internal/notification/handlers/user_notifier.go +++ b/internal/notification/handlers/user_notifier.go @@ -378,7 +378,7 @@ func (u *userNotifier) reduceOTPSMS( if err != nil { return nil, err } - err = sentCommand(ctx, userID, resourceOwner) + err = sentCommand(ctx, event.Aggregate().ID, event.Aggregate().ResourceOwner) if err != nil { return nil, err } @@ -514,7 +514,7 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta } return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.UserDomainClaimedType, user.UserDomainClaimedSentType) if err != nil { return err @@ -616,7 +616,7 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, user.HumanPasswordChangeSentType) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType) if err != nil { return err } @@ -722,5 +722,5 @@ func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, e if event.CreatedAt().Add(expiry).Before(time.Now().UTC()) { return true, nil } - return u.queries.IsAlreadyHandled(ctx, event, data, user.AggregateType, eventTypes...) + return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...) }