2023-03-28 22:09:06 +00:00
|
|
|
package handlers
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
|
|
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
|
|
)
|
|
|
|
|
2024-03-27 17:48:14 +00:00
|
|
|
type alreadyHandled struct {
|
|
|
|
event eventstore.Event
|
|
|
|
eventTypes []eventstore.EventType
|
|
|
|
data map[string]interface{}
|
|
|
|
|
|
|
|
handled bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *alreadyHandled) Reduce() error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *alreadyHandled) AppendEvents(event ...eventstore.Event) {
|
|
|
|
if len(event) > 0 {
|
|
|
|
a.handled = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *alreadyHandled) Query() *eventstore.SearchQueryBuilder {
|
|
|
|
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
|
|
|
InstanceID(a.event.Aggregate().InstanceID).
|
|
|
|
SequenceGreater(a.event.Sequence()).
|
|
|
|
AddQuery().
|
|
|
|
AggregateTypes(a.event.Aggregate().Type).
|
|
|
|
AggregateIDs(a.event.Aggregate().ID).
|
|
|
|
EventTypes(a.eventTypes...).
|
|
|
|
EventData(a.data).
|
|
|
|
Builder()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
|
|
|
|
already := &alreadyHandled{
|
|
|
|
event: event,
|
|
|
|
eventTypes: eventTypes,
|
|
|
|
data: data,
|
|
|
|
}
|
|
|
|
err := n.es.FilterToQueryReducer(ctx, already)
|
2023-03-28 22:09:06 +00:00
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
2024-03-27 17:48:14 +00:00
|
|
|
return already.handled, nil
|
2023-03-28 22:09:06 +00:00
|
|
|
}
|