From 35ce02665149f050d6888e8dc4e3d97f6ab2e271 Mon Sep 17 00:00:00 2001 From: adlerhurst Date: Mon, 19 Oct 2020 09:53:32 +0200 Subject: [PATCH] try with goroutines --- internal/eventstore/v2/repository/sql/crdb.go | 79 +-- .../eventstore/v2/repository/sql/crdb_test.go | 506 +++++++++++++++++- .../v2/repository/sql/local_crdb_test.go | 289 +++++----- 3 files changed, 676 insertions(+), 198 deletions(-) diff --git a/internal/eventstore/v2/repository/sql/crdb.go b/internal/eventstore/v2/repository/sql/crdb.go index 6f0e47c33b..1a6e72cbcf 100644 --- a/internal/eventstore/v2/repository/sql/crdb.go +++ b/internal/eventstore/v2/repository/sql/crdb.go @@ -6,6 +6,7 @@ import ( "errors" "regexp" "strconv" + "sync" "github.com/caos/logging" caos_errs "github.com/caos/zitadel/internal/errors" @@ -109,45 +110,59 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error { err := crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error { stmt, err := tx.PrepareContext(ctx, crdbInsert) if err != nil { - tx.Rollback() logging.Log("SQL-3to5p").WithError(err).Warn("prepare failed") return caos_errs.ThrowInternal(err, "SQL-OdXRE", "prepare failed") } + wg := sync.WaitGroup{} + errs := make(chan error, len(events)) + for _, event := range events { - previousSequence := Sequence(event.PreviousSequence) - if event.PreviousEvent != nil { - previousSequence = Sequence(event.PreviousEvent.Sequence) - } - err = stmt.QueryRowContext(ctx, - event.Type, - event.AggregateType, - event.AggregateID, - event.Version, - &sql.NullTime{ - Time: event.CreationDate, - Valid: !event.CreationDate.IsZero(), - }, - Data(event.Data), - event.EditorUser, - event.EditorService, - event.ResourceOwner, - previousSequence, - event.CheckPreviousSequence, - ).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate) + wg.Add(1) + go func(event *repository.Event) { + defer wg.Done() + previousSequence := Sequence(event.PreviousSequence) + if event.PreviousEvent != nil { + if event.PreviousEvent.AggregateType != event.AggregateType || event.PreviousEvent.AggregateID != event.AggregateID { + errs <- caos_errs.ThrowPreconditionFailed(nil, "SQL-J55uR", "aggregate of linked events unequal") + return + } + previousSequence = Sequence(event.PreviousEvent.Sequence) + } + err = stmt.QueryRowContext(ctx, + event.Type, + event.AggregateType, + event.AggregateID, + event.Version, + &sql.NullTime{ + Time: event.CreationDate, + Valid: !event.CreationDate.IsZero(), + }, + Data(event.Data), + event.EditorUser, + event.EditorService, + event.ResourceOwner, + previousSequence, + event.CheckPreviousSequence, + ).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate) - event.PreviousSequence = uint64(previousSequence) + event.PreviousSequence = uint64(previousSequence) - if err != nil { - tx.Rollback() - - logging.LogWithFields("SQL-IP3js", - "aggregate", event.AggregateType, - "aggregateId", event.AggregateID, - "aggregateType", event.AggregateType, - "eventType", event.Type).WithError(err).Info("query failed") - return caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event") - } + if err != nil { + logging.LogWithFields("SQL-IP3js", + "aggregate", event.AggregateType, + "aggregateId", event.AggregateID, + "aggregateType", event.AggregateType, + "eventType", event.Type).WithError(err).Info("query failed") + errs <- caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event") + } + }(event) } + wg.Wait() + close(errs) + for err := range errs { + return err + } + return nil }) if err != nil && !errors.Is(err, &caos_errs.CaosError{}) { diff --git a/internal/eventstore/v2/repository/sql/crdb_test.go b/internal/eventstore/v2/repository/sql/crdb_test.go index cf66d1b9d0..772c3f249b 100644 --- a/internal/eventstore/v2/repository/sql/crdb_test.go +++ b/internal/eventstore/v2/repository/sql/crdb_test.go @@ -2,6 +2,7 @@ package sql import ( "context" + "sync" "testing" "github.com/caos/zitadel/internal/eventstore/v2/repository" @@ -265,6 +266,7 @@ func TestCRDB_columnName(t *testing.T) { func TestCRDB_Push_OneAggregate(t *testing.T) { type args struct { + ctx context.Context events []*repository.Event } type eventsRes struct { @@ -281,23 +283,10 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { args args res res }{ - { - name: "push no events", - args: args{ - events: []*repository.Event{}, - }, - res: res{ - wantErr: false, - eventsRes: eventsRes{ - pushedEventsCount: 0, - aggID: []string{"0"}, - aggType: repository.AggregateType(t.Name()), - }, - }, - }, { name: "push 1 event with check previous", args: args{ + ctx: context.Background(), events: []*repository.Event{ generateEvent(t, "1", true, 0), }, @@ -313,6 +302,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { { name: "fail push 1 event with check previous wrong sequence", args: args{ + ctx: context.Background(), events: []*repository.Event{ generateEvent(t, "2", true, 5), }, @@ -329,6 +319,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { { name: "push 1 event without check previous", args: args{ + ctx: context.Background(), events: []*repository.Event{ generateEvent(t, "3", false, 0), }, @@ -345,6 +336,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { { name: "push 1 event without check previous wrong sequence", args: args{ + ctx: context.Background(), events: []*repository.Event{ generateEvent(t, "4", false, 5), }, @@ -361,6 +353,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { { name: "fail on push two events on agg without linking", args: args{ + ctx: context.Background(), events: []*repository.Event{ generateEvent(t, "5", true, 0), generateEvent(t, "5", true, 0), @@ -378,6 +371,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { { name: "push two events on agg with linking", args: args{ + ctx: context.Background(), events: linkEvents( generateEvent(t, "6", true, 0), generateEvent(t, "6", true, 0), @@ -395,6 +389,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { { name: "push two events on agg with linking without check previous", args: args{ + ctx: context.Background(), events: linkEvents( generateEvent(t, "7", false, 0), generateEvent(t, "7", false, 0), @@ -412,6 +407,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { { name: "push two events on agg with linking mixed check previous", args: args{ + ctx: context.Background(), events: linkEvents( generateEvent(t, "8", false, 0), generateEvent(t, "8", true, 0), @@ -429,13 +425,30 @@ func TestCRDB_Push_OneAggregate(t *testing.T) { }, }, }, + { + name: "failed push because context canceled", + args: args{ + ctx: canceledCtx(), + events: []*repository.Event{ + generateEvent(t, "9", true, 0), + }, + }, + res: res{ + wantErr: true, + eventsRes: eventsRes{ + pushedEventsCount: 0, + aggID: []string{"9"}, + aggType: repository.AggregateType(t.Name()), + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { db := &CRDB{ client: testCRDBClient, } - if err := db.Push(context.Background(), tt.args.events...); (err != nil) != tt.res.wantErr { + if err := db.Push(tt.args.ctx, tt.args.events...); (err != nil) != tt.res.wantErr { t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr) } @@ -516,14 +529,60 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) { args: args{ events: linkEvents( generateEvent(t, "104", false, 0), - generateEvent(t, "104", false, 0), + generateEvent(t, "105", false, 0), ), }, res: res{ - wantErr: false, + wantErr: true, eventsRes: eventsRes{ pushedEventsCount: 0, - aggID: []string{"104"}, + aggID: []string{"104", "105"}, + aggType: []repository.AggregateType{repository.AggregateType(t.Name())}, + }, + }, + }, + { + name: "push two aggregates mixed check previous multiple events", + args: args{ + events: combineEventLists( + linkEvents( + generateEvent(t, "106", true, 0), + generateEvent(t, "106", false, 0), + generateEvent(t, "106", false, 0), + generateEvent(t, "106", true, 0), + ), + linkEvents( + generateEvent(t, "107", false, 0), + generateEvent(t, "107", true, 0), + generateEvent(t, "107", false, 0), + generateEvent(t, "107", true, 0), + ), + linkEvents( + generateEvent(t, "108", true, 0), + generateEvent(t, "108", false, 0), + generateEvent(t, "108", false, 0), + generateEvent(t, "108", true, 0), + ), + ), + }, + }, + { + name: "failed push same aggregate in two transactions", + args: args{ + events: combineEventLists( + linkEvents( + generateEvent(t, "109", true, 0), + ), + linkEvents( + generateEvent(t, "109", true, 0), + ), + ), + }, + res: res{ + wantErr: true, + eventsRes: eventsRes{ + pushedEventsCount: 0, + aggID: []string{"109"}, aggType: []repository.AggregateType{repository.AggregateType(t.Name())}, }, }, @@ -552,8 +611,383 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) { } } -func combineEventLists(firstList []*repository.Event, secondList []*repository.Event) []*repository.Event { - return append(firstList, secondList...) +func TestCRDB_Push_Parallel(t *testing.T) { + type args struct { + events [][]*repository.Event + } + type eventsRes struct { + pushedEventsCount int + aggTypes []repository.AggregateType + aggIDs []string + } + type res struct { + errCount int + eventsRes eventsRes + } + tests := []struct { + name string + args args + res res + }{ + { + name: "clients push different aggregates", + args: args{ + events: [][]*repository.Event{ + linkEvents( + generateEvent(t, "200", false, 0), + generateEvent(t, "200", true, 0), + generateEvent(t, "200", false, 0), + ), + linkEvents( + generateEvent(t, "201", false, 0), + generateEvent(t, "201", true, 0), + generateEvent(t, "201", false, 0), + ), + combineEventLists( + linkEvents( + generateEvent(t, "202", false, 0), + ), + linkEvents( + generateEvent(t, "203", true, 0), + generateEvent(t, "203", false, 0), + ), + ), + }, + }, + res: res{ + errCount: 0, + eventsRes: eventsRes{ + aggIDs: []string{"200", "201", "202", "203"}, + pushedEventsCount: 9, + aggTypes: []repository.AggregateType{repository.AggregateType(t.Name())}, + }, + }, + }, + { + name: "clients push same aggregates no check previous", + args: args{ + events: [][]*repository.Event{ + linkEvents( + generateEvent(t, "204", false, 0), + generateEvent(t, "204", false, 0), + ), + linkEvents( + generateEvent(t, "204", false, 0), + generateEvent(t, "204", false, 0), + ), + combineEventLists( + linkEvents( + generateEvent(t, "205", false, 0), + generateEvent(t, "205", false, 0), + generateEvent(t, "205", false, 0), + ), + linkEvents( + generateEvent(t, "206", false, 0), + generateEvent(t, "206", false, 0), + generateEvent(t, "206", false, 0), + ), + ), + combineEventLists( + linkEvents( + generateEvent(t, "204", false, 0), + ), + linkEvents( + generateEvent(t, "205", false, 0), + generateEvent(t, "205", false, 0), + ), + linkEvents( + generateEvent(t, "206", false, 0), + ), + ), + }, + }, + res: res{ + errCount: 0, + eventsRes: eventsRes{ + aggIDs: []string{"204", "205", "206"}, + pushedEventsCount: 14, + aggTypes: []repository.AggregateType{repository.AggregateType(t.Name())}, + }, + }, + }, + { + name: "clients push different aggregates one with check previous", + args: args{ + events: [][]*repository.Event{ + linkEvents( + generateEvent(t, "207", false, 0), + generateEvent(t, "207", false, 0), + generateEvent(t, "207", false, 0), + generateEvent(t, "207", false, 0), + generateEvent(t, "207", false, 0), + generateEvent(t, "207", false, 0), + ), + linkEvents( + generateEvent(t, "208", true, 0), + generateEvent(t, "208", true, 0), + generateEvent(t, "208", true, 0), + generateEvent(t, "208", true, 0), + generateEvent(t, "208", true, 0), + ), + }, + }, + res: res{ + errCount: 0, + eventsRes: eventsRes{ + aggIDs: []string{"207", "208"}, + pushedEventsCount: 11, + aggTypes: []repository.AggregateType{repository.AggregateType(t.Name())}, + }, + }, + }, + { + name: "clients push different aggregates all with check previous on first event fail", + args: args{ + events: [][]*repository.Event{ + linkEvents( + generateEventWithData(t, "210", true, 0, []byte(`{ "transaction": 1 }`)), + generateEventWithData(t, "210", false, 0, []byte(`{ "transaction": 1.1 }`)), + ), + linkEvents( + generateEventWithData(t, "210", true, 0, []byte(`{ "transaction": 2 }`)), + generateEventWithData(t, "210", false, 0, []byte(`{ "transaction": 2.1 }`)), + ), + linkEvents( + generateEventWithData(t, "210", true, 0, []byte(`{ "transaction": 3 }`)), + generateEventWithData(t, "210", false, 0, []byte(`{ "transaction": 30.1 }`)), + ), + }, + }, + res: res{ + errCount: 2, + eventsRes: eventsRes{ + aggIDs: []string{"210"}, + pushedEventsCount: 2, + aggTypes: []repository.AggregateType{repository.AggregateType(t.Name())}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := &CRDB{ + client: testCRDBClient, + } + wg := sync.WaitGroup{} + + errs := make([]error, 0, tt.res.errCount) + errsMu := sync.Mutex{} + for _, events := range tt.args.events { + wg.Add(1) + go func(events []*repository.Event) { + err := db.Push(context.Background(), events...) + if err != nil { + errsMu.Lock() + errs = append(errs, err) + errsMu.Unlock() + } + + wg.Done() + }(events) + } + wg.Wait() + + if len(errs) != tt.res.errCount { + t.Errorf("CRDB.Push() error count = %d, wanted err count %d, errs: %v", len(errs), tt.res.errCount, errs) + } + + rows, err := testCRDBClient.Query("SELECT event_data FROM eventstore.events where aggregate_type = ANY($1) AND aggregate_id = ANY($2) order by event_sequence", pq.Array(tt.res.eventsRes.aggTypes), pq.Array(tt.res.eventsRes.aggIDs)) + if err != nil { + t.Error("unable to query inserted rows: ", err) + return + } + var count int + + for rows.Next() { + count++ + data := make(Data, 0) + + err := rows.Scan(&data) + if err != nil { + t.Error("unable to query inserted rows: ", err) + return + } + t.Logf("inserted data: %v", string(data)) + } + if count != tt.res.eventsRes.pushedEventsCount { + t.Errorf("expected push count %d got %d", tt.res.eventsRes.pushedEventsCount, count) + } + }) + } +} + +func TestCRDB_query_events(t *testing.T) { + type args struct { + searchQuery *repository.SearchQuery + } + type fields struct { + existingEvents []*repository.Event + } + type res struct { + events []*repository.Event + } + tests := []struct { + name string + fields fields + args args + res res + wantErr bool + }{ + { + name: "aggregate type filter no events", + args: args{ + searchQuery: &repository.SearchQuery{}, + }, + fields: fields{ + existingEvents: []*repository.Event{}, + }, + res: res{ + events: []*repository.Event{}, + }, + wantErr: false, + }, + // { + // name: "aggregate type filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + // { + // name: "aggregate type and id filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + // { + // name: "sequence filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + // { + // name: "resource owner filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + // { + // name: "editor service filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + // { + // name: "editor user filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + // { + // name: "event type filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + // { + // name: "no filter events found", + // args: args{ + // searchQuery: &repository.SearchQuery{}, + // }, + // fields: fields{ + // existingEvents: []*repository.Event{}, + // }, + // res: res{ + // events: []*repository.Event{}, + // }, + // wantErr: false, + // }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := &CRDB{ + client: testCRDBClient, + } + + // setup initial data for query + if err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil { + t.Errorf("error in setup = %v", err) + return + } + + events := []*repository.Event{} + if err := db.query(tt.args.searchQuery, &events); (err != nil) != tt.wantErr { + t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func canceledCtx() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx +} + +func combineEventLists(lists ...[]*repository.Event) []*repository.Event { + combined := make([]*repository.Event, 0) + for _, list := range lists { + combined = append(combined, list...) + } + return combined } func linkEvents(events ...*repository.Event) []*repository.Event { @@ -563,6 +997,21 @@ func linkEvents(events ...*repository.Event) []*repository.Event { return events } +func generateEventForAggregate(aggregateType repository.AggregateType, aggregateID string, checkPrevious bool, previousSeq uint64) *repository.Event { + return &repository.Event{ + AggregateID: aggregateID, + AggregateType: aggregateType, + CheckPreviousSequence: checkPrevious, + EditorService: "svc", + EditorUser: "user", + PreviousEvent: nil, + PreviousSequence: previousSeq, + ResourceOwner: "ro", + Type: "test.created", + Version: "v1", + } +} + func generateEvent(t *testing.T, aggregateID string, checkPrevious bool, previousSeq uint64) *repository.Event { t.Helper() return &repository.Event{ @@ -578,3 +1027,20 @@ func generateEvent(t *testing.T, aggregateID string, checkPrevious bool, previou Version: "v1", } } + +func generateEventWithData(t *testing.T, aggregateID string, checkPrevious bool, previousSeq uint64, data []byte) *repository.Event { + t.Helper() + return &repository.Event{ + AggregateID: aggregateID, + AggregateType: repository.AggregateType(t.Name()), + CheckPreviousSequence: checkPrevious, + EditorService: "svc", + EditorUser: "user", + PreviousEvent: nil, + PreviousSequence: previousSeq, + ResourceOwner: "ro", + Type: "test.created", + Version: "v1", + Data: data, + } +} diff --git a/internal/eventstore/v2/repository/sql/local_crdb_test.go b/internal/eventstore/v2/repository/sql/local_crdb_test.go index df45be8230..478222acbb 100644 --- a/internal/eventstore/v2/repository/sql/local_crdb_test.go +++ b/internal/eventstore/v2/repository/sql/local_crdb_test.go @@ -1,7 +1,6 @@ package sql import ( - "context" "database/sql" "fmt" "io/ioutil" @@ -11,10 +10,8 @@ import ( "strconv" "strings" "testing" - "time" "github.com/caos/logging" - "github.com/caos/zitadel/internal/eventstore/v2/repository" "github.com/cockroachdb/cockroach-go/v2/testserver" ) @@ -46,156 +43,156 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func TestInsert(t *testing.T) { - crdb := &CRDB{client: testCRDBClient} - e1 := &repository.Event{ - AggregateID: "agg.id", - AggregateType: "agg.type", - CheckPreviousSequence: true, - EditorService: "edi.svc", - EditorUser: "edi", - ResourceOwner: "edit", - Type: "type", - Version: "v1", - } - events := []*repository.Event{ - e1, - { - AggregateID: "agg.id", - AggregateType: "agg.type", - CheckPreviousSequence: true, - EditorService: "edi.svc", - EditorUser: "edi", - ResourceOwner: "edit", - Type: "type", - Version: "v1", - CreationDate: time.Now().Add(-2 * time.Second), - PreviousEvent: e1, - }, - { - AggregateID: "agg.id", - AggregateType: "agg.type", - CheckPreviousSequence: false, - EditorService: "edi.svc", - EditorUser: "edi", - ResourceOwner: "edit", - Type: "type", - Version: "v1", - CreationDate: time.Now().Add(-500 * time.Millisecond), - }, - { - AggregateID: "agg.id2", - AggregateType: "agg.type", - CheckPreviousSequence: true, - EditorService: "edi.svc", - EditorUser: "edi", - ResourceOwner: "edit", - Type: "type", - Version: "v1", - CreationDate: time.Now().Add(-500 * time.Millisecond), - }, - { - AggregateID: "agg.id3", - AggregateType: "agg.type", - CheckPreviousSequence: false, - EditorService: "edi.svc", - EditorUser: "edi", - ResourceOwner: "edit", - Type: "type", - Version: "v1", - CreationDate: time.Now().Add(-500 * time.Millisecond), - }, - { - AggregateID: "agg.id", - AggregateType: "agg.type", - CheckPreviousSequence: false, - EditorService: "edi.svc", - EditorUser: "edi", - ResourceOwner: "edit", - Type: "type", - Version: "v1", - CreationDate: time.Now().Add(-500 * time.Millisecond), - }, - // { - // AggregateID: "agg.id4", - // AggregateType: "agg.type", - // CheckPreviousSequence: false, - // EditorService: "edi.svc", - // EditorUser: "edi", - // ResourceOwner: "edit", - // Type: "type", - // Version: "v1", - // CreationDate: time.Now().Add(-500 * time.Millisecond), - // PreviousEvent: e1, - // }, - //fail because wrong previous event - // { - // AggregateID: "agg.id2", - // AggregateType: "agg.type", - // CheckPreviousSequence: true, - // EditorService: "edi.svc", - // EditorUser: "edi", - // ResourceOwner: "edit", - // Type: "type", - // Version: "v1", - // CreationDate: time.Now().Add(-500 * time.Millisecond), - // PreviousEvent: e1, - // }, - } - fmt.Println("==============") - err := crdb.Push(context.Background(), events...) - if err != nil { - t.Error(err) - return - } +// func TestInsert(t *testing.T) { +// crdb := &CRDB{client: testCRDBClient} +// e1 := &repository.Event{ +// AggregateID: "agg.id", +// AggregateType: "agg.type", +// CheckPreviousSequence: true, +// EditorService: "edi.svc", +// EditorUser: "edi", +// ResourceOwner: "edit", +// Type: "type", +// Version: "v1", +// } +// events := []*repository.Event{ +// e1, +// { +// AggregateID: "agg.id", +// AggregateType: "agg.type", +// CheckPreviousSequence: true, +// EditorService: "edi.svc", +// EditorUser: "edi", +// ResourceOwner: "edit", +// Type: "type", +// Version: "v1", +// CreationDate: time.Now().Add(-2 * time.Second), +// PreviousEvent: e1, +// }, +// { +// AggregateID: "agg.id", +// AggregateType: "agg.type", +// CheckPreviousSequence: false, +// EditorService: "edi.svc", +// EditorUser: "edi", +// ResourceOwner: "edit", +// Type: "type", +// Version: "v1", +// CreationDate: time.Now().Add(-500 * time.Millisecond), +// }, +// { +// AggregateID: "agg.id2", +// AggregateType: "agg.type", +// CheckPreviousSequence: true, +// EditorService: "edi.svc", +// EditorUser: "edi", +// ResourceOwner: "edit", +// Type: "type", +// Version: "v1", +// CreationDate: time.Now().Add(-500 * time.Millisecond), +// }, +// { +// AggregateID: "agg.id3", +// AggregateType: "agg.type", +// CheckPreviousSequence: false, +// EditorService: "edi.svc", +// EditorUser: "edi", +// ResourceOwner: "edit", +// Type: "type", +// Version: "v1", +// CreationDate: time.Now().Add(-500 * time.Millisecond), +// }, +// { +// AggregateID: "agg.id", +// AggregateType: "agg.type", +// CheckPreviousSequence: false, +// EditorService: "edi.svc", +// EditorUser: "edi", +// ResourceOwner: "edit", +// Type: "type", +// Version: "v1", +// CreationDate: time.Now().Add(-500 * time.Millisecond), +// }, +// // { +// // AggregateID: "agg.id4", +// // AggregateType: "agg.type", +// // CheckPreviousSequence: false, +// // EditorService: "edi.svc", +// // EditorUser: "edi", +// // ResourceOwner: "edit", +// // Type: "type", +// // Version: "v1", +// // CreationDate: time.Now().Add(-500 * time.Millisecond), +// // PreviousEvent: e1, +// // }, +// //fail because wrong previous event +// // { +// // AggregateID: "agg.id2", +// // AggregateType: "agg.type", +// // CheckPreviousSequence: true, +// // EditorService: "edi.svc", +// // EditorUser: "edi", +// // ResourceOwner: "edit", +// // Type: "type", +// // Version: "v1", +// // CreationDate: time.Now().Add(-500 * time.Millisecond), +// // PreviousEvent: e1, +// // }, +// } +// fmt.Println("==============") +// err := crdb.Push(context.Background(), events...) +// if err != nil { +// t.Error(err) +// return +// } - fmt.Println("inserted ts:") - for _, event := range events { - fmt.Printf("%+v\n", event) - } +// fmt.Println("inserted ts:") +// for _, event := range events { +// fmt.Printf("%+v\n", event) +// } - fmt.Println("====================") - rows, err := testCRDBClient.Query("select * from eventstore.events order by event_sequence") - defer rows.Close() - fmt.Println(err) +// fmt.Println("====================") +// rows, err := testCRDBClient.Query("select * from eventstore.events order by event_sequence") +// defer rows.Close() +// fmt.Println(err) - for rows.Next() { - i := make([]interface{}, 12) - var id string - rows.Scan(&id, &i[1], &i[2], &i[3], &i[4], &i[5], &i[6], &i[7], &i[8], &i[9], &i[10], &i[11]) - i[0] = id +// for rows.Next() { +// i := make([]interface{}, 12) +// var id string +// rows.Scan(&id, &i[1], &i[2], &i[3], &i[4], &i[5], &i[6], &i[7], &i[8], &i[9], &i[10], &i[11]) +// i[0] = id - fmt.Println(i) - } - fmt.Println("====================") - filtededEvents, err := crdb.Filter(context.Background(), &repository.SearchQuery{ - Columns: repository.ColumnsEvent, - Filters: []*repository.Filter{ - { - Field: repository.FieldAggregateType, - Operation: repository.OperationEquals, - Value: repository.AggregateType("agg.type"), - }, - }, - }) - fmt.Println(err) +// fmt.Println(i) +// } +// fmt.Println("====================") +// filtededEvents, err := crdb.Filter(context.Background(), &repository.SearchQuery{ +// Columns: repository.ColumnsEvent, +// Filters: []*repository.Filter{ +// { +// Field: repository.FieldAggregateType, +// Operation: repository.OperationEquals, +// Value: repository.AggregateType("agg.type"), +// }, +// }, +// }) +// fmt.Println(err) - for _, event := range filtededEvents { - fmt.Printf("%+v\n", event) - } - fmt.Println("====================") - rows, err = testCRDBClient.Query("select max(event_sequence), count(*) from eventstore.events where aggregate_type = 'agg.type' and aggregate_id = 'agg.id'") - defer rows.Close() - fmt.Println(err) +// for _, event := range filtededEvents { +// fmt.Printf("%+v\n", event) +// } +// fmt.Println("====================") +// rows, err = testCRDBClient.Query("select max(event_sequence), count(*) from eventstore.events where aggregate_type = 'agg.type' and aggregate_id = 'agg.id'") +// defer rows.Close() +// fmt.Println(err) - for rows.Next() { - i := make([]interface{}, 2) - rows.Scan(&i[0], &i[1]) +// for rows.Next() { +// i := make([]interface{}, 2) +// rows.Scan(&i[0], &i[1]) - fmt.Println(i) - } - t.Fail() -} +// fmt.Println(i) +// } +// t.Fail() +// } func executeMigrations() error { files, err := migrationFilePaths()