test: eventstore

This commit is contained in:
adlerhurst 2020-10-21 19:00:41 +02:00
parent 83121ab44d
commit 3c9c2806c8
8 changed files with 1138 additions and 348 deletions

View File

@ -92,16 +92,17 @@ func (es *Eventstore) aggregatesToEvents(aggregates []aggregater) ([]*repository
return nil, err
}
events = append(events, &repository.Event{
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
PreviousEvent: previousEvent,
PreviousSequence: aggregate.PreviousSequence(),
Data: data,
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
PreviousEvent: previousEvent,
PreviousSequence: aggregate.PreviousSequence(),
Data: data,
CheckPreviousSequence: event.CheckPrevious(),
})
previousEvent = events[len(events)-1]
}

View File

@ -1,13 +1,46 @@
package eventstore
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
type testAggregate struct {
id string
events []Event
previousSequence uint64
}
func (a *testAggregate) ID() string {
return a.id
}
func (a *testAggregate) Type() AggregateType {
return "test.aggregate"
}
func (a *testAggregate) Events() []Event {
return a.events
}
func (a *testAggregate) ResourceOwner() string {
return "ro"
}
func (a *testAggregate) Version() Version {
return "v1"
}
func (a *testAggregate) PreviousSequence() uint64 {
return a.previousSequence
}
// testEvent implements the Event interface
type testEvent struct {
description string
@ -306,3 +339,574 @@ func Test_eventData(t *testing.T) {
})
}
}
func TestEventstore_aggregatesToEvents(t *testing.T) {
type args struct {
aggregates []aggregater
}
type res struct {
wantErr bool
events []*repository.Event
}
tests := []struct {
name string
args args
res res
}{
{
name: "one aggregate one event",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
},
},
},
},
res: res{
wantErr: false,
events: []*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
},
},
{
name: "one aggregate multiple events",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
},
},
},
},
res: res{
wantErr: false,
events: linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
},
},
{
name: "invalid data",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return `{"data":""` },
shouldCheckPrevious: false,
},
},
},
},
},
res: res{
wantErr: true,
},
},
{
name: "multiple aggregates",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
},
},
&testAggregate{
id: "2",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: true,
},
},
},
},
},
res: res{
wantErr: false,
events: combineEventLists(
linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
[]*repository.Event{
{
AggregateID: "2",
AggregateType: "test.aggregate",
CheckPreviousSequence: true,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := &Eventstore{}
events, err := es.aggregatesToEvents(tt.args.aggregates)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
return
}
if err != nil {
return
}
if len(tt.res.events) != len(events) {
t.Errorf("length of events unequal want: %d got %d", len(tt.res.events), len(events))
return
}
for i := 0; i < len(tt.res.events); i++ {
compareEvents(t, tt.res.events[i], events[i])
}
})
}
}
type testRepo struct {
events []*repository.Event
sequence uint64
err error
t *testing.T
}
func (repo *testRepo) Health(ctx context.Context) error {
return nil
}
func (repo *testRepo) Push(ctx context.Context, events ...*repository.Event) error {
if repo.err != nil {
return repo.err
}
if len(repo.events) != len(events) {
repo.t.Errorf("length of events unequal want: %d got %d", len(repo.events), len(events))
return fmt.Errorf("")
}
for i := 0; i < len(repo.events); i++ {
compareEvents(repo.t, repo.events[i], events[i])
}
return nil
}
func (repo *testRepo) Filter(ctx context.Context, searchQuery *repository.SearchQuery) (events []*repository.Event, err error) {
if repo.err != nil {
return nil, err
}
return repo.events, nil
}
func (repo *testRepo) LatestSequence(ctx context.Context, queryFactory *repository.SearchQuery) (uint64, error) {
if repo.err != nil {
return 0, repo.err
}
return repo.sequence, nil
}
func TestEventstore_Push(t *testing.T) {
type args struct {
aggregates []aggregater
}
type fields struct {
repo *testRepo
eventMapper map[EventType]func(*repository.Event) (Event, error)
}
type res struct {
wantErr bool
}
tests := []struct {
name string
args args
fields fields
res res
}{
{
name: "one aggregate one event",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
},
},
},
},
fields: fields{
repo: &testRepo{
t: t,
events: []*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
},
},
{
name: "one aggregate multiple events",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
},
},
},
},
fields: fields{
repo: &testRepo{
t: t,
events: linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
},
res: res{
wantErr: false,
},
},
{
name: "multiple aggregates",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
},
},
&testAggregate{
id: "2",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: true,
},
},
},
},
},
fields: fields{
repo: &testRepo{
t: t,
events: combineEventLists(
linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
[]*repository.Event{
{
AggregateID: "2",
AggregateType: "test.aggregate",
CheckPreviousSequence: true,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
),
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
},
res: res{
wantErr: false,
},
},
{
name: "push fails",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return nil },
shouldCheckPrevious: false,
},
},
},
},
},
fields: fields{
repo: &testRepo{
t: t,
err: errors.ThrowInternal(nil, "V2-qaa4S", "test err"),
},
},
res: res{
wantErr: true,
},
},
{
name: "aggreagtes to events mapping fails",
args: args{
aggregates: []aggregater{
&testAggregate{
id: "1",
events: []Event{
&testEvent{
data: func() interface{} { return `{"data":""` },
shouldCheckPrevious: false,
},
},
},
},
},
fields: fields{
repo: &testRepo{
t: t,
err: errors.ThrowInternal(nil, "V2-qaa4S", "test err"),
},
},
res: res{
wantErr: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := &Eventstore{
repo: tt.fields.repo,
interceptorMutex: sync.Mutex{},
eventMapper: map[EventType]eventTypeInterceptors{},
}
for eventType, mapper := range tt.fields.eventMapper {
err := es.RegisterFilterEventMapper(eventType, mapper)
if err != nil {
t.Errorf("register event mapper failed: %v", err)
t.FailNow()
}
}
_, err := es.PushAggregates(context.Background(), tt.args.aggregates...)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
})
}
}
func combineEventLists(lists ...[]*repository.Event) []*repository.Event {
events := []*repository.Event{}
for _, list := range lists {
events = append(events, list...)
}
return events
}
func linkEvents(events ...*repository.Event) []*repository.Event {
for i := 1; i < len(events); i++ {
events[i].PreviousEvent = events[i-1]
}
return events
}
func compareEvents(t *testing.T, want, got *repository.Event) {
t.Helper()
if want.AggregateID != got.AggregateID {
t.Errorf("wrong aggregateID got %q want %q", want.AggregateID, got.AggregateID)
}
if want.AggregateType != got.AggregateType {
t.Errorf("wrong aggregateType got %q want %q", want.AggregateType, got.AggregateType)
}
if want.CheckPreviousSequence != got.CheckPreviousSequence {
t.Errorf("wrong check previous got %v want %v", want.CheckPreviousSequence, got.CheckPreviousSequence)
}
if !reflect.DeepEqual(want.Data, got.Data) {
t.Errorf("wrong data got %s want %s", string(want.Data), string(got.Data))
}
if want.EditorService != got.EditorService {
t.Errorf("wrong editor service got %q want %q", got.EditorService, want.EditorService)
}
if want.EditorUser != got.EditorUser {
t.Errorf("wrong editor user got %q want %q", got.EditorUser, want.EditorUser)
}
if want.ResourceOwner != got.ResourceOwner {
t.Errorf("wrong resource owner got %q want %q", got.ResourceOwner, want.ResourceOwner)
}
if want.Type != got.Type {
t.Errorf("wrong event type got %q want %q", got.Type, want.Type)
}
if want.Version != got.Version {
t.Errorf("wrong version got %q want %q", got.Version, want.Version)
}
if (want.PreviousEvent == nil) != (got.PreviousEvent == nil) {
t.Errorf("linking failed got was linked: %v want was linked: %v", (got.PreviousEvent != nil), (want.PreviousEvent != nil))
}
if want.PreviousSequence != got.PreviousSequence {
t.Errorf("wrong previous sequence got %d want %d", got.PreviousSequence, want.PreviousSequence)
}
}

View File

@ -162,7 +162,7 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
// Filter returns all events matching the given search query
func (db *CRDB) Filter(ctx context.Context, searchQuery *repository.SearchQuery) (events []*repository.Event, err error) {
events = []*repository.Event{}
err = db.query(searchQuery, &events)
err = query(ctx, db, searchQuery, &events)
if err != nil {
return nil, err
}
@ -173,34 +173,23 @@ func (db *CRDB) Filter(ctx context.Context, searchQuery *repository.SearchQuery)
//LatestSequence returns the latests sequence found by the the search query
func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *repository.SearchQuery) (uint64, error) {
var seq Sequence
err := db.query(searchQuery, &seq)
err := query(ctx, db, searchQuery, &seq)
if err != nil {
return 0, err
}
return uint64(seq), nil
}
func (db *CRDB) query(searchQuery *repository.SearchQuery, dest interface{}) error {
query, values, rowScanner := buildQuery(db, searchQuery)
if query == "" {
return caos_errs.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory")
func (db *CRDB) db() *sql.DB {
return db.client
}
func (db *CRDB) orderByEventSequence(desc bool) string {
if desc {
return " ORDER BY event_sequence DESC"
}
rows, err := db.client.Query(query, values...)
if err != nil {
logging.Log("SQL-HP3Uk").WithError(err).Info("query failed")
return caos_errs.ThrowInternal(err, "SQL-IJuyR", "unable to filter events")
}
defer rows.Close()
for rows.Next() {
err = rowScanner(rows.Scan, dest)
if err != nil {
return err
}
}
return nil
return " ORDER BY event_sequence"
}
func (db *CRDB) eventQuery() string {

View File

@ -821,7 +821,7 @@ func TestCRDB_Push_Parallel(t *testing.T) {
}
}
func TestCRDB_query_events(t *testing.T) {
func TestCRDB_Filter(t *testing.T) {
type args struct {
searchQuery *repository.SearchQuery
}
@ -860,29 +860,6 @@ func TestCRDB_query_events(t *testing.T) {
},
wantErr: false,
},
{
name: "aggregate type filter events found",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldAggregateType, t.Name(), repository.OperationEquals),
},
},
},
fields: fields{
existingEvents: []*repository.Event{
generateEvent(t, "301", false, 0),
generateEvent(t, "302", false, 0),
generateEvent(t, "302", false, 0),
generateEventForAggregate("not in list", "303", false, 0),
},
},
res: res{
eventCount: 3,
},
wantErr: false,
},
{
name: "aggregate type and id filter events found",
args: args{
@ -899,7 +876,6 @@ func TestCRDB_query_events(t *testing.T) {
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEventForAggregate("not in list", "304", false, 0),
generateEvent(t, "305", false, 0),
},
},
@ -908,84 +884,6 @@ func TestCRDB_query_events(t *testing.T) {
},
wantErr: false,
},
{
name: "sequence filter events found",
args: args{
searchQuery: &repository.SearchQuery{},
},
fields: fields{
existingEvents: []*repository.Event{},
},
res: res{
events: []*repository.Event{},
},
wantErr: false,
},
// {
// name: "resource owner filter events found",
// args: args{
// searchQuery: &repository.SearchQuery{},
// },
// fields: fields{
// existingEvents: []*repository.Event{},
// },
// res: res{
// events: []*repository.Event{},
// },
// wantErr: false,
// },
// {
// name: "editor service filter events found",
// args: args{
// searchQuery: &repository.SearchQuery{},
// },
// fields: fields{
// existingEvents: []*repository.Event{},
// },
// res: res{
// events: []*repository.Event{},
// },
// wantErr: false,
// },
// {
// name: "editor user filter events found",
// args: args{
// searchQuery: &repository.SearchQuery{},
// },
// fields: fields{
// existingEvents: []*repository.Event{},
// },
// res: res{
// events: []*repository.Event{},
// },
// wantErr: false,
// },
// {
// name: "event type filter events found",
// args: args{
// searchQuery: &repository.SearchQuery{},
// },
// fields: fields{
// existingEvents: []*repository.Event{},
// },
// res: res{
// events: []*repository.Event{},
// },
// wantErr: false,
// },
// {
// name: "no filter events found",
// args: args{
// searchQuery: &repository.SearchQuery{},
// },
// fields: fields{
// existingEvents: []*repository.Event{},
// },
// res: res{
// events: []*repository.Event{},
// },
// wantErr: false,
// },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -999,10 +897,100 @@ func TestCRDB_query_events(t *testing.T) {
return
}
events := []*repository.Event{}
if err := db.query(tt.args.searchQuery, &events); (err != nil) != tt.wantErr {
events, err := db.Filter(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
}
if len(events) != tt.res.eventCount {
t.Errorf("CRDB.query() expected event count: %d got %d", tt.res.eventCount, len(events))
}
})
}
}
func TestCRDB_LatestSequence(t *testing.T) {
type args struct {
searchQuery *repository.SearchQuery
}
type fields struct {
existingEvents []*repository.Event
}
type res struct {
sequence uint64
}
tests := []struct {
name string
fields fields
args args
res res
wantErr bool
}{
{
name: "aggregate type filter no sequence",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsMaxSequence,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldAggregateType, "not found", repository.OperationEquals),
},
},
},
fields: fields{
existingEvents: []*repository.Event{
generateEvent(t, "400", false, 0),
generateEvent(t, "400", false, 0),
generateEvent(t, "400", false, 0),
},
},
res: res{
sequence: 0,
},
wantErr: false,
},
{
name: "aggregate type filter sequence",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsMaxSequence,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldAggregateType, t.Name(), repository.OperationEquals),
},
},
},
fields: fields{
existingEvents: []*repository.Event{
generateEvent(t, "401", false, 0),
generateEvent(t, "401", false, 0),
generateEvent(t, "401", false, 0),
},
},
res: res{
sequence: 3,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
t.Errorf("error in setup = %v", err)
return
}
sequence, err := db.LatestSequence(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
}
if sequence < tt.res.sequence {
t.Errorf("CRDB.query() expected sequence: %d got %d", tt.res.sequence, sequence)
}
})
}
}
@ -1028,24 +1016,9 @@ func linkEvents(events ...*repository.Event) []*repository.Event {
return events
}
func generateEventForAggregate(aggregateType repository.AggregateType, aggregateID string, checkPrevious bool, previousSeq uint64) *repository.Event {
return &repository.Event{
AggregateID: aggregateID,
AggregateType: aggregateType,
CheckPreviousSequence: checkPrevious,
EditorService: "svc",
EditorUser: "user",
PreviousEvent: nil,
PreviousSequence: previousSeq,
ResourceOwner: "ro",
Type: "test.created",
Version: "v1",
}
}
func generateEvent(t *testing.T, aggregateID string, checkPrevious bool, previousSeq uint64) *repository.Event {
func generateEvent(t *testing.T, aggregateID string, checkPrevious bool, previousSeq uint64, opts ...func(*repository.Event)) *repository.Event {
t.Helper()
return &repository.Event{
e := &repository.Event{
AggregateID: aggregateID,
AggregateType: repository.AggregateType(t.Name()),
CheckPreviousSequence: checkPrevious,
@ -1057,6 +1030,12 @@ func generateEvent(t *testing.T, aggregateID string, checkPrevious bool, previou
Type: "test.created",
Version: "v1",
}
for _, opt := range opts {
opt(e)
}
return e
}
func generateEventWithData(t *testing.T, aggregateID string, checkPrevious bool, previousSeq uint64, data []byte) *repository.Event {

View File

@ -43,157 +43,6 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
// func TestInsert(t *testing.T) {
// crdb := &CRDB{client: testCRDBClient}
// e1 := &repository.Event{
// AggregateID: "agg.id",
// AggregateType: "agg.type",
// CheckPreviousSequence: true,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// }
// events := []*repository.Event{
// e1,
// {
// AggregateID: "agg.id",
// AggregateType: "agg.type",
// CheckPreviousSequence: true,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// CreationDate: time.Now().Add(-2 * time.Second),
// PreviousEvent: e1,
// },
// {
// AggregateID: "agg.id",
// AggregateType: "agg.type",
// CheckPreviousSequence: false,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// CreationDate: time.Now().Add(-500 * time.Millisecond),
// },
// {
// AggregateID: "agg.id2",
// AggregateType: "agg.type",
// CheckPreviousSequence: true,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// CreationDate: time.Now().Add(-500 * time.Millisecond),
// },
// {
// AggregateID: "agg.id3",
// AggregateType: "agg.type",
// CheckPreviousSequence: false,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// CreationDate: time.Now().Add(-500 * time.Millisecond),
// },
// {
// AggregateID: "agg.id",
// AggregateType: "agg.type",
// CheckPreviousSequence: false,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// CreationDate: time.Now().Add(-500 * time.Millisecond),
// },
// // {
// // AggregateID: "agg.id4",
// // AggregateType: "agg.type",
// // CheckPreviousSequence: false,
// // EditorService: "edi.svc",
// // EditorUser: "edi",
// // ResourceOwner: "edit",
// // Type: "type",
// // Version: "v1",
// // CreationDate: time.Now().Add(-500 * time.Millisecond),
// // PreviousEvent: e1,
// // },
// //fail because wrong previous event
// // {
// // AggregateID: "agg.id2",
// // AggregateType: "agg.type",
// // CheckPreviousSequence: true,
// // EditorService: "edi.svc",
// // EditorUser: "edi",
// // ResourceOwner: "edit",
// // Type: "type",
// // Version: "v1",
// // CreationDate: time.Now().Add(-500 * time.Millisecond),
// // PreviousEvent: e1,
// // },
// }
// fmt.Println("==============")
// err := crdb.Push(context.Background(), events...)
// if err != nil {
// t.Error(err)
// return
// }
// fmt.Println("inserted ts:")
// for _, event := range events {
// fmt.Printf("%+v\n", event)
// }
// fmt.Println("====================")
// rows, err := testCRDBClient.Query("select * from eventstore.events order by event_sequence")
// defer rows.Close()
// fmt.Println(err)
// for rows.Next() {
// i := make([]interface{}, 12)
// var id string
// rows.Scan(&id, &i[1], &i[2], &i[3], &i[4], &i[5], &i[6], &i[7], &i[8], &i[9], &i[10], &i[11])
// i[0] = id
// fmt.Println(i)
// }
// fmt.Println("====================")
// filtededEvents, err := crdb.Filter(context.Background(), &repository.SearchQuery{
// Columns: repository.ColumnsEvent,
// Filters: []*repository.Filter{
// {
// Field: repository.FieldAggregateType,
// Operation: repository.OperationEquals,
// Value: repository.AggregateType("agg.type"),
// },
// },
// })
// fmt.Println(err)
// for _, event := range filtededEvents {
// fmt.Printf("%+v\n", event)
// }
// fmt.Println("====================")
// rows, err = testCRDBClient.Query("select max(event_sequence), count(*) from eventstore.events where aggregate_type = 'agg.type' and aggregate_id = 'agg.id'")
// defer rows.Close()
// fmt.Println(err)
// for rows.Next() {
// i := make([]interface{}, 2)
// rows.Scan(&i[0], &i[1])
// fmt.Println(i)
// }
// t.Fail()
// }
func executeMigrations() error {
files, err := migrationFilePaths()
if err != nil {

View File

@ -1,12 +1,14 @@
package sql
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"github.com/caos/logging"
caos_errs "github.com/caos/zitadel/internal/errors"
z_errors "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
"github.com/lib/pq"
@ -19,24 +21,23 @@ type querier interface {
placeholder(query string) string
eventQuery() string
maxSequenceQuery() string
db() *sql.DB
orderByEventSequence(desc bool) string
}
type rowScan func(scan, interface{}) error
type scan func(dest ...interface{}) error
func buildQuery(criteria querier, searchQuery *repository.SearchQuery) (query string, values []interface{}, rowScanner rowScan) {
query, rowScanner = prepareColumns(criteria, searchQuery.Columns)
func query(ctx context.Context, criteria querier, searchQuery *repository.SearchQuery, dest interface{}) error {
query, rowScanner := prepareColumns(criteria, searchQuery.Columns)
where, values := prepareCondition(criteria, searchQuery.Filters)
if where == "" || query == "" {
return "", nil, nil
return caos_errs.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory")
}
query += where
if searchQuery.Columns != repository.ColumnsMaxSequence {
query += " ORDER BY event_sequence"
if searchQuery.Desc {
query += " DESC"
}
query += criteria.orderByEventSequence(searchQuery.Desc)
}
if searchQuery.Limit > 0 {
@ -46,7 +47,21 @@ func buildQuery(criteria querier, searchQuery *repository.SearchQuery) (query st
query = criteria.placeholder(query)
return query, values, rowScanner
rows, err := criteria.db().QueryContext(ctx, query, values...)
if err != nil {
logging.Log("SQL-HP3Uk").WithError(err).Info("query failed")
return caos_errs.ThrowInternal(err, "SQL-IJuyR", "unable to filter events")
}
defer rows.Close()
for rows.Next() {
err = rowScanner(rows.Scan, dest)
if err != nil {
return err
}
}
return nil
}
func prepareColumns(criteria querier, columns repository.Columns) (string, func(s scan, dest interface{}) error) {

View File

@ -1,11 +1,14 @@
package sql
import (
"context"
"database/sql"
"database/sql/driver"
"reflect"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
"github.com/lib/pq"
@ -295,24 +298,263 @@ func Test_prepareCondition(t *testing.T) {
}
}
func Test_buildQuery(t *testing.T) {
func Test_query_events_with_crdb(t *testing.T) {
type args struct {
query *repository.SearchQuery
searchQuery *repository.SearchQuery
}
type fields struct {
existingEvents []*repository.Event
client *sql.DB
}
type res struct {
query string
values []interface{}
rowScanner bool
eventCount int
}
tests := []struct {
name string
args args
res res
name string
fields fields
args args
res res
wantErr bool
}{
{
name: "aggregate type filter no events",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldAggregateType, "not found", repository.OperationEquals),
},
},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "300", false, 0),
generateEvent(t, "300", false, 0),
generateEvent(t, "300", false, 0),
},
},
res: res{
eventCount: 0,
},
wantErr: false,
},
{
name: "aggregate type filter events found",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldAggregateType, t.Name(), repository.OperationEquals),
},
},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "301", false, 0),
generateEvent(t, "302", false, 0),
generateEvent(t, "302", false, 0),
generateEvent(t, "303", false, 0, func(e *repository.Event) { e.AggregateType = "not in list" }),
},
},
res: res{
eventCount: 3,
},
wantErr: false,
},
{
name: "aggregate type and id filter events found",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldAggregateType, t.Name(), repository.OperationEquals),
repository.NewFilter(repository.FieldAggregateID, "303", repository.OperationEquals),
},
},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEvent(t, "304", false, 0, func(e *repository.Event) { e.AggregateType = "not in list" }),
generateEvent(t, "305", false, 0),
},
},
res: res{
eventCount: 3,
},
wantErr: false,
},
{
name: "resource owner filter events found",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldResourceOwner, "caos", repository.OperationEquals),
},
},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "306", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "307", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "308", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.ResourceOwner = "orgID" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.ResourceOwner = "orgID" }),
},
},
res: res{
eventCount: 3,
},
wantErr: false,
},
{
name: "editor service filter events found",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldEditorService, "MANAGEMENT-API", repository.OperationEquals),
repository.NewFilter(repository.FieldEditorService, "ADMIN-API", repository.OperationEquals),
},
},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "307", false, 0, func(e *repository.Event) { e.EditorService = "MANAGEMENT-API" }),
generateEvent(t, "307", false, 0, func(e *repository.Event) { e.EditorService = "MANAGEMENT-API" }),
generateEvent(t, "308", false, 0, func(e *repository.Event) { e.EditorService = "ADMIN-API" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.EditorService = "AUTHAPI" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.EditorService = "AUTHAPI" }),
},
},
res: res{
eventCount: 3,
},
wantErr: false,
},
{
name: "editor user filter events found",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldEditorUser, "adlerhurst", repository.OperationEquals),
repository.NewFilter(repository.FieldEditorUser, "nobody", repository.OperationEquals),
repository.NewFilter(repository.FieldEditorUser, "", repository.OperationEquals),
},
},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "310", false, 0, func(e *repository.Event) { e.EditorUser = "adlerhurst" }),
generateEvent(t, "310", false, 0, func(e *repository.Event) { e.EditorUser = "adlerhurst" }),
generateEvent(t, "310", false, 0, func(e *repository.Event) { e.EditorUser = "nobody" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.EditorUser = "" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.EditorUser = "" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.EditorUser = "fforootd" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.EditorUser = "fforootd" }),
},
},
res: res{
eventCount: 5,
},
wantErr: false,
},
{
name: "event type filter events found",
args: args{
searchQuery: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Filters: []*repository.Filter{
repository.NewFilter(repository.FieldEventType, repository.EventType("user.created"), repository.OperationEquals),
repository.NewFilter(repository.FieldEventType, repository.EventType("user.updated"), repository.OperationEquals),
},
},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.created" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.updated" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.deactivated" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.locked" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.created" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.updated" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.deactivated" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.reactivated" }),
generateEvent(t, "313", false, 0, func(e *repository.Event) { e.Type = "user.locked" }),
},
},
res: res{
eventCount: 7,
},
wantErr: false,
},
{
name: "fail because no filter",
args: args{
searchQuery: &repository.SearchQuery{},
},
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{},
},
res: res{
eventCount: 0,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: tt.fields.client,
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
t.Errorf("error in setup = %v", err)
return
}
events := []*repository.Event{}
if err := query(context.Background(), db, tt.args.searchQuery, &events); (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_query_events_mocked(t *testing.T) {
type args struct {
query *repository.SearchQuery
dest interface{}
}
type res struct {
wantErr bool
}
type fields struct {
mock *dbMock
}
tests := []struct {
name string
args args
fields fields
res res
}{
{
name: "with order by desc",
args: args{
// NewSearchQueryFactory("user").OrderDesc()
dest: &[]*repository.Event{},
query: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Desc: true,
@ -325,15 +567,20 @@ func Test_buildQuery(t *testing.T) {
},
},
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC`,
[]driver.Value{repository.AggregateType("user")},
),
},
res: res{
query: "SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC",
rowScanner: true,
values: []interface{}{repository.AggregateType("user")},
wantErr: false,
},
},
{
name: "with limit",
args: args{
dest: &[]*repository.Event{},
query: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Desc: false,
@ -347,15 +594,20 @@ func Test_buildQuery(t *testing.T) {
},
},
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence LIMIT \$2`,
[]driver.Value{repository.AggregateType("user"), uint64(5)},
),
},
res: res{
query: "SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence LIMIT $2",
rowScanner: true,
values: []interface{}{repository.AggregateType("user"), uint64(5)},
wantErr: false,
},
},
{
name: "with limit and order by desc",
args: args{
dest: &[]*repository.Event{},
query: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Desc: true,
@ -369,10 +621,68 @@ func Test_buildQuery(t *testing.T) {
},
},
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC LIMIT \$2`,
[]driver.Value{repository.AggregateType("user"), uint64(5)},
),
},
res: res{
query: "SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2",
rowScanner: true,
values: []interface{}{repository.AggregateType("user"), uint64(5)},
wantErr: false,
},
},
{
name: "error sql conn closed",
args: args{
dest: &[]*repository.Event{},
query: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Desc: true,
Limit: 0,
Filters: []*repository.Filter{
{
Field: repository.FieldAggregateType,
Value: repository.AggregateType("user"),
Operation: repository.OperationEquals,
},
},
},
},
fields: fields{
mock: newMockClient(t).expectQueryErr(t,
`SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC`,
[]driver.Value{repository.AggregateType("user")},
sql.ErrConnDone),
},
res: res{
wantErr: true,
},
},
{
name: "error unexpected dest",
args: args{
dest: nil,
query: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Desc: true,
Limit: 0,
Filters: []*repository.Filter{
{
Field: repository.FieldAggregateType,
Value: repository.AggregateType("user"),
Operation: repository.OperationEquals,
},
},
},
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC`,
[]driver.Value{repository.AggregateType("user")},
&repository.Event{Sequence: 100}),
},
res: res{
wantErr: true,
},
},
{
@ -383,9 +693,7 @@ func Test_buildQuery(t *testing.T) {
},
},
res: res{
query: "",
rowScanner: false,
values: []interface{}(nil),
wantErr: true,
},
},
{
@ -399,29 +707,64 @@ func Test_buildQuery(t *testing.T) {
},
},
res: res{
query: "",
rowScanner: false,
values: []interface{}(nil),
wantErr: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
crdb := &CRDB{}
gotQuery, gotValues, gotRowScanner := buildQuery(crdb, tt.args.query)
if gotQuery != tt.res.query {
t.Errorf("buildQuery() gotQuery = %v, want %v", gotQuery, tt.res.query)
if tt.fields.mock != nil {
crdb.client = tt.fields.mock.client
}
if len(gotValues) != len(tt.res.values) {
t.Errorf("wrong length of gotten values got = %d, want %d", len(gotValues), len(tt.res.values))
err := query(context.Background(), crdb, tt.args.query, tt.args.dest)
if (err != nil) != tt.res.wantErr {
t.Errorf("query() error = %v, wantErr %v", err, tt.res.wantErr)
}
if tt.fields.mock == nil {
return
}
if !reflect.DeepEqual(gotValues, tt.res.values) {
t.Errorf("prepareCondition() gotValues = %T: %v, want %T: %v", gotValues, gotValues, tt.res.values, tt.res.values)
}
if (tt.res.rowScanner && gotRowScanner == nil) || (!tt.res.rowScanner && gotRowScanner != nil) {
t.Errorf("rowScanner should be nil==%v got nil==%v", tt.res.rowScanner, gotRowScanner == nil)
if err := tt.fields.mock.mock.ExpectationsWereMet(); err != nil {
t.Errorf("not all expectaions met: %v", err)
}
})
}
}
type dbMock struct {
mock sqlmock.Sqlmock
client *sql.DB
}
func (m *dbMock) expectQuery(t *testing.T, expectedQuery string, args []driver.Value, events ...*repository.Event) *dbMock {
query := m.mock.ExpectQuery(expectedQuery).WithArgs(args...)
rows := sqlmock.NewRows([]string{"event_sequence"})
for _, event := range events {
rows = rows.AddRow(event.Sequence)
}
query.WillReturnRows(rows).RowsWillBeClosed()
return m
}
func (m *dbMock) expectQueryErr(t *testing.T, expectedQuery string, args []driver.Value, err error) *dbMock {
m.mock.ExpectQuery(expectedQuery).WithArgs(args...).WillReturnError(err)
return m
}
func newMockClient(t *testing.T) *dbMock {
t.Helper()
db, mock, err := sqlmock.New()
if err != nil {
t.Errorf("unable to create mock client: %v", err)
t.FailNow()
return nil
}
return &dbMock{
mock: mock,
client: db,
}
}

View File

@ -460,6 +460,16 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
},
},
{
name: "column invalid",
args: args{
columns: Columns(-1),
aggregateTypes: []AggregateType{"user"},
},
res: res{
isErr: errors.IsPreconditionFailed,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -469,7 +479,7 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
}
query, err := factory.build()
if tt.res.isErr != nil && !tt.res.isErr(err) {
t.Errorf("wrong error: %v", err)
t.Errorf("wrong error(%T): %v", err, err)
return
}
if err != nil && tt.res.isErr == nil {
@ -478,7 +488,7 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
}
if !reflect.DeepEqual(query, tt.res.query) {
t.Errorf("NewSearchQueryFactory() = %v, want %v", factory, tt.res)
t.Errorf("NewSearchQueryFactory() = %+v, want %+v", factory, tt.res)
}
})
}