feat(eventstore): Precondition (#69)

* start org

* refactor(eventstore): filter in sql for querier

* feat(eventstore): Aggregate precondition

preconditions are checked right before insert. Insert is still transaction save

* feat(eventstore): check preconditions in repository

* test(eventstore): test precondition in models

* test(eventstore): precondition-tests

* refactor(eventstore): querier as type

* fix(precondition): rename validation from precondition to validation

* test(eventstore): isErr func instead of wantErr bool

* fix: delete org files

* remove comment

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Silvan 2020-04-28 16:01:00 +02:00 committed by GitHub
parent ff11cdba40
commit 33a4802425
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 451 additions and 133 deletions

View File

@ -46,3 +46,8 @@ func (err *CaosError) GetMessage() string {
func (err *CaosError) GetID() string {
return err.ID
}
func (err *CaosError) Is(target error) bool {
_, ok := target.(*CaosError)
return ok
}

View File

@ -2,6 +2,7 @@ package sql
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
@ -30,23 +31,18 @@ const (
" FROM eventstore.events"
)
type Querier interface {
Query(query string, args ...interface{}) (*sql.Rows, error)
}
func (db *SQL) Filter(ctx context.Context, searchQuery *es_models.SearchQuery) (events []*models.Event, err error) {
where, values := prepareWhere(searchQuery)
query := selectStmt + where
return filter(db.client, searchQuery)
}
query += " ORDER BY event_sequence"
if searchQuery.Desc {
query += " DESC"
}
func filter(querier Querier, searchQuery *es_models.SearchQuery) (events []*es_models.Event, err error) {
query, values := prepareQuery(searchQuery)
if searchQuery.Limit > 0 {
values = append(values, searchQuery.Limit)
query += " LIMIT ?"
}
query = numberPlaceholder(query, "?", "$")
rows, err := db.client.Query(query, values...)
rows, err := querier.Query(query, values...)
if err != nil {
logging.Log("SQL-HP3Uk").WithError(err).Info("query failed")
return nil, errors.ThrowInternal(err, "SQL-IJuyR", "unable to filter events")
@ -86,6 +82,25 @@ func (db *SQL) Filter(ctx context.Context, searchQuery *es_models.SearchQuery) (
return events, nil
}
func prepareQuery(searchQuery *es_models.SearchQuery) (query string, values []interface{}) {
where, values := prepareWhere(searchQuery)
query = selectStmt + where
query += " ORDER BY event_sequence"
if searchQuery.Desc {
query += " DESC"
}
if searchQuery.Limit > 0 {
values = append(values, searchQuery.Limit)
query += " LIMIT ?"
}
query = numberPlaceholder(query, "?", "$")
return query, values
}
func numberPlaceholder(query, old, new string) string {
for i, hasChanged := 1, true; hasChanged; i++ {
newQuery := strings.Replace(query, old, new+strconv.Itoa(i), 1)

View File

@ -3,9 +3,10 @@ package sql
import (
"context"
"database/sql"
"errors"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/cockroachdb/cockroach-go/crdb"
)
@ -31,11 +32,16 @@ func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggrega
if err != nil {
tx.Rollback()
logging.Log("SQL-9ctx5").WithError(err).Warn("prepare failed")
return errors.ThrowInternal(err, "SQL-juCgA", "prepare failed")
return caos_errs.ThrowInternal(err, "SQL-juCgA", "prepare failed")
}
for _, aggregate := range aggregates {
err = insertEvents(stmt, aggregate.Events)
err = precondtion(tx, aggregate)
if err != nil {
tx.Rollback()
return err
}
err = insertEvents(stmt, Sequence(aggregate.PreviousSequence), aggregate.Events)
if err != nil {
tx.Rollback()
return err
@ -44,15 +50,29 @@ func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggrega
return nil
})
if _, ok := err.(*errors.CaosError); !ok && err != nil {
err = errors.ThrowInternal(err, "SQL-DjgtG", "unable to store events")
if err != nil && !errors.Is(err, &caos_errs.CaosError{}) {
err = caos_errs.ThrowInternal(err, "SQL-DjgtG", "unable to store events")
}
return err
}
func insertEvents(stmt *sql.Stmt, events []*models.Event) error {
currentSequence := Sequence(events[0].PreviousSequence)
func precondtion(tx *sql.Tx, aggregate *models.Aggregate) error {
if aggregate.Precondition == nil {
return nil
}
events, err := filter(tx, aggregate.Precondition.Query)
if err != nil {
return caos_errs.ThrowPreconditionFailed(err, "SQL-oBPxB", "filter failed")
}
err = aggregate.Precondition.Validation(events...)
if err != nil {
return caos_errs.ThrowPreconditionFailed(err, "SQL-s6hqU", "validation failed")
}
return nil
}
func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Event) error {
for _, event := range events {
if event.Data == nil || len(event.Data) == 0 {
//json decoder failes with EOF if json text is empty
@ -62,27 +82,27 @@ func insertEvents(stmt *sql.Stmt, events []*models.Event) error {
rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, event.Data, event.EditorUser, event.EditorService, event.ResourceOwner,
event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID,
currentSequence, event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID, currentSequence)
previousSequence, event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID, previousSequence)
if err != nil {
logging.Log("SQL-EXA0q").WithError(err).Info("query failed")
return errors.ThrowInternal(err, "SQL-SBP37", "unable to create event")
return caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event")
}
defer rows.Close()
rowInserted := false
for rows.Next() {
rowInserted = true
err = rows.Scan(&event.ID, &currentSequence, &event.CreationDate)
err = rows.Scan(&event.ID, &previousSequence, &event.CreationDate)
logging.Log("SQL-rAvLD").OnError(err).Info("unable to scan result into event")
}
if !rowInserted {
return errors.ThrowAlreadyExists(nil, "SQL-GKcAa", "wrong sequence")
return caos_errs.ThrowAlreadyExists(nil, "SQL-GKcAa", "wrong sequence")
}
event.Sequence = uint64(currentSequence)
event.Sequence = uint64(previousSequence)
}
return nil

View File

@ -59,6 +59,21 @@ func TestSQL_PushAggregates(t *testing.T) {
isError: errors.IsInternal,
shouldCheckEvents: false,
},
{
name: "aggregate precondtion fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert().
expectFilterEventsError(errors.CreateCaosError(nil, "SQL-IzJOf", "err")).
expectRollback(nil),
},
args: args{aggregates: []*models.Aggregate{aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(1), nil)}},
isError: errors.IsPreconditionFailed,
shouldCheckEvents: false,
},
{
name: "one aggregate two events success",
fields: fields{
@ -94,9 +109,10 @@ func TestSQL_PushAggregates(t *testing.T) {
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
{
PreviousSequence: 34,
Events: []*models.Event{
&models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
@ -104,9 +120,8 @@ func TestSQL_PushAggregates(t *testing.T) {
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
&models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
@ -114,7 +129,6 @@ func TestSQL_PushAggregates(t *testing.T) {
EditorUser: "usr2",
ResourceOwner: "ro2",
Type: "eventTyp",
PreviousSequence: 0,
},
},
},
@ -157,9 +171,10 @@ func TestSQL_PushAggregates(t *testing.T) {
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
{
PreviousSequence: 34,
Events: []*models.Event{
&models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
@ -167,13 +182,13 @@ func TestSQL_PushAggregates(t *testing.T) {
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
},
},
&models.Aggregate{
{
PreviousSequence: 40,
Events: []*models.Event{
&models.Event{
{
AggregateID: "aggID2",
AggregateType: "aggType2",
AggregateVersion: "v0.0.1",
@ -181,7 +196,6 @@ func TestSQL_PushAggregates(t *testing.T) {
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 40,
},
},
},
@ -212,9 +226,9 @@ func TestSQL_PushAggregates(t *testing.T) {
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
{
Events: []*models.Event{
&models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
@ -224,7 +238,7 @@ func TestSQL_PushAggregates(t *testing.T) {
Type: "eventTyp",
PreviousSequence: 34,
},
&models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
@ -265,9 +279,9 @@ func TestSQL_PushAggregates(t *testing.T) {
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
{
Events: []*models.Event{
&models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
@ -321,3 +335,95 @@ func noErr(err error) bool {
func functionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}
func Test_precondtion(t *testing.T) {
type fields struct {
client *dbMock
}
type args struct {
aggregate *models.Aggregate
}
tests := []struct {
name string
fields fields
args args
isErr func(error) bool
}{
{
name: "no precondition",
fields: fields{
client: mockDB(t).
expectBegin(nil),
},
args: args{
aggregate: &models.Aggregate{},
},
},
{
name: "precondition fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsLimit(5, 0),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5), validationFunc(errors.CreateCaosError(nil, "SQL-LBIKm", "err"))),
},
isErr: errors.IsPreconditionFailed,
},
{
name: "precondition with filter error",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsError(errors.ThrowInternal(nil, "SQL-ac9EW", "err")),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5), validationFunc(errors.CreateCaosError(nil, "SQL-LBIKm", "err"))),
},
isErr: errors.IsPreconditionFailed,
},
{
name: "precondition no events",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsLimit(5, 0),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5), validationFunc(nil)),
},
},
{
name: "precondition with events",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsLimit(5, 3),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5), validationFunc(nil)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tx, err := tt.fields.client.sqlClient.Begin()
if err != nil {
t.Errorf("unable to start tx %v", err)
t.FailNow()
}
err = precondtion(tx, tt.args.aggregate)
if (tt.isErr != nil && err == nil) || (tt.isErr != nil && !tt.isErr(err)) {
t.Error("precondtion() wrong error", err)
}
})
}
}
func aggregateWithPrecondition(aggregate *models.Aggregate, query *models.SearchQuery, precondition func(...*models.Event) error) *models.Aggregate {
aggregate.SetPrecondition(query, precondition)
return aggregate
}
func validationFunc(err error) func(events ...*models.Event) error {
return func(events ...*models.Event) error {
return err
}
}

View File

@ -15,15 +15,21 @@ func (at AggregateType) String() string {
type Aggregates []*Aggregate
type Aggregate struct {
id string
typ AggregateType
latestSequence uint64
version Version
id string
typ AggregateType
PreviousSequence uint64
version Version
editorService string
editorUser string
resourceOwner string
Events []*Event
Precondition *precondition
}
type precondition struct {
Query *SearchQuery
Validation func(...*Event) error
}
func (a *Aggregate) AppendEvent(typ EventType, payload interface{}) (*Aggregate, error) {
@ -39,7 +45,6 @@ func (a *Aggregate) AppendEvent(typ EventType, payload interface{}) (*Aggregate,
CreationDate: time.Now(),
Data: data,
Type: typ,
PreviousSequence: a.latestSequence,
AggregateID: a.id,
AggregateType: a.typ,
AggregateVersion: a.version,
@ -52,6 +57,11 @@ func (a *Aggregate) AppendEvent(typ EventType, payload interface{}) (*Aggregate,
return a, nil
}
func (a *Aggregate) SetPrecondition(query *SearchQuery, validateFunc func(...*Event) error) *Aggregate {
a.Precondition = &precondition{Query: query, Validation: validateFunc}
return a
}
func (a *Aggregate) Validate() error {
if a == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-yi5AC", "aggregate is nil")
@ -75,6 +85,9 @@ func (a *Aggregate) Validate() error {
if a.resourceOwner == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-eBYUW", "resource owner not set")
}
if a.Precondition != nil && (a.Precondition.Query == nil || a.Precondition.Query.Validate() != nil || a.Precondition.Validation == nil) {
return errors.ThrowPreconditionFailed(nil, "MODEL-EEUvA", "invalid precondition")
}
return nil
}

View File

@ -16,20 +16,20 @@ func NewAggregateCreator(serviceName string) *AggregateCreator {
type option func(*Aggregate)
func (c *AggregateCreator) NewAggregate(ctx context.Context, id string, typ AggregateType, version Version, latestSequence uint64, opts ...option) (*Aggregate, error) {
func (c *AggregateCreator) NewAggregate(ctx context.Context, id string, typ AggregateType, version Version, previousSequence uint64, opts ...option) (*Aggregate, error) {
ctxData := auth.GetCtxData(ctx)
editorUser := ctxData.UserID
resourceOwner := ctxData.OrgID
aggregate := &Aggregate{
id: id,
typ: typ,
latestSequence: latestSequence,
version: version,
Events: make([]*Event, 0, 2),
editorService: c.serviceName,
editorUser: editorUser,
resourceOwner: resourceOwner,
id: id,
typ: typ,
PreviousSequence: previousSequence,
version: version,
Events: make([]*Event, 0, 2),
editorService: c.serviceName,
editorUser: editorUser,
resourceOwner: resourceOwner,
}
for _, opt := range opts {

View File

@ -2,6 +2,8 @@ package models
import (
"testing"
"github.com/caos/zitadel/internal/errors"
)
func TestAggregate_AppendEvent(t *testing.T) {
@ -34,17 +36,24 @@ func TestAggregate_AppendEvent(t *testing.T) {
wantErr: true,
},
{
name: "event added",
fields: fields{aggregate: &Aggregate{Events: []*Event{}}},
args: args{typ: "user.deactivated"},
want: &Aggregate{Events: []*Event{&Event{Type: "user.deactivated"}}},
name: "event added",
fields: fields{aggregate: &Aggregate{Events: []*Event{}}},
args: args{typ: "user.deactivated"},
want: &Aggregate{Events: []*Event{
{Type: "user.deactivated"},
}},
wantErr: false,
},
{
name: "event added",
fields: fields{aggregate: &Aggregate{Events: []*Event{&Event{}}}},
args: args{typ: "user.deactivated"},
want: &Aggregate{Events: []*Event{&Event{}, &Event{Type: "user.deactivated"}}},
name: "event added",
fields: fields{aggregate: &Aggregate{Events: []*Event{
{},
}}},
args: args{typ: "user.deactivated"},
want: &Aggregate{Events: []*Event{
{},
{Type: "user.deactivated"},
}},
wantErr: false,
},
}
@ -84,90 +93,218 @@ func TestAggregate_Validate(t *testing.T) {
name: "no id error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
PreviousSequence: 5,
Events: []*Event{
{
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "no type error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
id: "aggID",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateID: "hodor",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
id: "aggID",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
PreviousSequence: 5,
Events: []*Event{
{
AggregateID: "hodor",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "invalid version error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
id: "aggID",
typ: "user",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateID: "hodor",
AggregateType: "user",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
id: "aggID",
typ: "user",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
PreviousSequence: 5,
Events: []*Event{
{
AggregateID: "hodor",
AggregateType: "user",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "validation ok",
name: "no query in precondition error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
id: "aggID",
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
PreviousSequence: 5,
Precondition: &precondition{
Validation: func(...*Event) error { return nil },
},
Events: []*Event{
{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "no func in precondition error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
id: "aggID",
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
PreviousSequence: 5,
Precondition: &precondition{
Query: NewSearchQuery().AggregateIDFilter("hodor"),
},
Events: []*Event{
{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "validation without precondition ok",
wantErr: false,
fields: fields{aggregate: &Aggregate{
id: "aggID",
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
id: "aggID",
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
PreviousSequence: 5,
Events: []*Event{
{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "validation with precondition ok",
wantErr: false,
fields: fields{aggregate: &Aggregate{
id: "aggID",
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
PreviousSequence: 5,
Precondition: &precondition{
Validation: func(...*Event) error { return nil },
Query: NewSearchQuery().AggregateIDFilter("hodor"),
},
Events: []*Event{
{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tt.fields.aggregate.Validate(); (err != nil) != tt.wantErr {
err := tt.fields.aggregate.Validate()
if (err != nil) != tt.wantErr {
t.Errorf("Aggregate.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.wantErr && !errors.IsPreconditionFailed(err) {
t.Errorf("error must extend precondition failed: %v", err)
}
})
}
}
func TestAggregate_SetPrecondition(t *testing.T) {
type fields struct {
aggregate *Aggregate
}
type args struct {
query *SearchQuery
validateFunc func(...*Event) error
}
tests := []struct {
name string
fields fields
args args
want *Aggregate
}{
{
name: "set precondition",
fields: fields{aggregate: &Aggregate{}},
args: args{
query: &SearchQuery{},
validateFunc: func(...*Event) error { return nil },
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.fields.aggregate.SetPrecondition(tt.args.query, tt.args.validateFunc)
if got.Precondition == nil {
t.Error("precondition must not be nil")
t.FailNow()
}
if got.Precondition.Query == nil {
t.Error("query of precondition must not be nil")
}
if got.Precondition.Validation == nil {
t.Error("precondition func must not be nil")
}
})
}
}

View File

@ -64,6 +64,9 @@ func (q *SearchQuery) Validate() error {
if q == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-J5xQi", "search query is nil")
}
if len(q.Filters) == 0 {
return errors.ThrowPreconditionFailed(nil, "MODEL-pF3DR", "no filters set")
}
for _, filter := range q.Filters {
if err := filter.Validate(); err != nil {
return err

View File

@ -21,17 +21,23 @@ func TestSearchQuery_setFilter(t *testing.T) {
{
name: "set idFilter",
fields: fields{query: NewSearchQuery()},
args: args{filters: []*Filter{&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"}}},
want: &SearchQuery{Filters: []*Filter{&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"}}},
args: args{filters: []*Filter{
{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"},
}},
want: &SearchQuery{Filters: []*Filter{
{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"},
}},
},
{
name: "overwrite idFilter",
fields: fields{query: NewSearchQuery()},
args: args{filters: []*Filter{
&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"},
&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "ursli"},
{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"},
{field: Field_AggregateID, operation: Operation_Equals, value: "ursli"},
}},
want: &SearchQuery{Filters: []*Filter{
{field: Field_AggregateID, operation: Operation_Equals, value: "ursli"},
}},
want: &SearchQuery{Filters: []*Filter{&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "ursli"}}},
},
}
for _, tt := range tests {

View File

@ -22,12 +22,13 @@ func Filter(ctx context.Context, filter filterFunc, appender appendFunc, query *
return errors.ThrowNotFound(nil, "EVENT-8due3", "no events found")
}
err = appender(events...)
if err != nil{
if err != nil {
return ThrowAppendEventError(err, "SDK-awiWK", "appender failed")
}
return nil
}
// Push is Deprecated use PushAggregates
// Push creates the aggregates from aggregater
// and pushes the aggregates to the given pushFunc
// the given events are appended by the appender
@ -45,7 +46,19 @@ func Push(ctx context.Context, push pushFunc, appender appendFunc, aggregaters .
if err != nil {
return err
}
return appendAggregates(appender, aggregates)
}
func PushAggregates(ctx context.Context, push pushFunc, appender appendFunc, aggregates ...*models.Aggregate) (err error) {
if len(aggregates) < 1 {
return errors.ThrowPreconditionFailed(nil, "SDK-q9wjp", "no aggregaters passed")
}
err = push(ctx, aggregates...)
if err != nil {
return err
}
return appendAggregates(appender, aggregates)
}
@ -69,4 +82,4 @@ func makeAggregates(ctx context.Context, aggregaters []aggregateFunc) (aggregate
}
}
return aggregates, nil
}
}