2020-04-07 16:36:37 +00:00
|
|
|
package sdk
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
|
|
|
|
"github.com/caos/zitadel/internal/errors"
|
|
|
|
"github.com/caos/zitadel/internal/eventstore/models"
|
|
|
|
es_models "github.com/caos/zitadel/internal/eventstore/models"
|
|
|
|
)
|
|
|
|
|
|
|
|
type filterFunc func(context.Context, *es_models.SearchQuery) ([]*es_models.Event, error)
|
|
|
|
type appendFunc func(...*es_models.Event) error
|
2020-05-18 10:06:36 +00:00
|
|
|
type AggregateFunc func(context.Context) (*es_models.Aggregate, error)
|
2020-04-07 16:36:37 +00:00
|
|
|
type pushFunc func(context.Context, ...*es_models.Aggregate) error
|
|
|
|
|
|
|
|
func Filter(ctx context.Context, filter filterFunc, appender appendFunc, query *es_models.SearchQuery) error {
|
|
|
|
events, err := filter(ctx, query)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
|
|
return errors.ThrowNotFound(nil, "EVENT-8due3", "no events found")
|
|
|
|
}
|
|
|
|
err = appender(events...)
|
2020-04-28 14:01:00 +00:00
|
|
|
if err != nil {
|
2020-04-07 16:36:37 +00:00
|
|
|
return ThrowAppendEventError(err, "SDK-awiWK", "appender failed")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-04-28 14:01:00 +00:00
|
|
|
// Push is Deprecated use PushAggregates
|
2020-04-07 16:36:37 +00:00
|
|
|
// Push creates the aggregates from aggregater
|
|
|
|
// and pushes the aggregates to the given pushFunc
|
|
|
|
// the given events are appended by the appender
|
2020-05-18 10:06:36 +00:00
|
|
|
func Push(ctx context.Context, push pushFunc, appender appendFunc, aggregaters ...AggregateFunc) (err error) {
|
2020-04-07 16:36:37 +00:00
|
|
|
if len(aggregaters) < 1 {
|
|
|
|
return errors.ThrowPreconditionFailed(nil, "SDK-q9wjp", "no aggregaters passed")
|
|
|
|
}
|
|
|
|
|
|
|
|
aggregates, err := makeAggregates(ctx, aggregaters)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = push(ctx, aggregates...)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-04-28 14:01:00 +00:00
|
|
|
|
|
|
|
return appendAggregates(appender, aggregates)
|
|
|
|
}
|
|
|
|
|
|
|
|
func PushAggregates(ctx context.Context, push pushFunc, appender appendFunc, aggregates ...*models.Aggregate) (err error) {
|
|
|
|
if len(aggregates) < 1 {
|
|
|
|
return errors.ThrowPreconditionFailed(nil, "SDK-q9wjp", "no aggregaters passed")
|
|
|
|
}
|
|
|
|
|
|
|
|
err = push(ctx, aggregates...)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-04-07 16:36:37 +00:00
|
|
|
|
|
|
|
return appendAggregates(appender, aggregates)
|
|
|
|
}
|
|
|
|
|
|
|
|
func appendAggregates(appender appendFunc, aggregates []*models.Aggregate) error {
|
|
|
|
for _, aggregate := range aggregates {
|
|
|
|
err := appender(aggregate.Events...)
|
|
|
|
if err != nil {
|
|
|
|
return ThrowAppendEventError(err, "SDK-o6kzK", "aggregator failed")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-05-18 10:06:36 +00:00
|
|
|
func makeAggregates(ctx context.Context, aggregaters []AggregateFunc) (aggregates []*models.Aggregate, err error) {
|
2020-04-07 16:36:37 +00:00
|
|
|
aggregates = make([]*models.Aggregate, len(aggregaters))
|
|
|
|
for i, aggregater := range aggregaters {
|
|
|
|
aggregates[i], err = aggregater(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return aggregates, nil
|
2020-04-28 14:01:00 +00:00
|
|
|
}
|