From dfb8c266d760f96cb01a198c7ab35047c31a39ee Mon Sep 17 00:00:00 2001 From: adlerhurst Date: Fri, 23 Oct 2020 16:16:46 +0200 Subject: [PATCH] test: example for eventstore --- internal/eventstore/v2/eventstore.go | 53 +++- internal/eventstore/v2/eventstore_test.go | 8 +- internal/eventstore/v2/example_test.go | 282 +++++++++++++++--- internal/eventstore/v2/local_crdb_test.go | 128 ++++++++ internal/eventstore/v2/read_model.go | 74 ++++- internal/eventstore/v2/repository/sql/crdb.go | 4 + 6 files changed, 484 insertions(+), 65 deletions(-) create mode 100644 internal/eventstore/v2/local_crdb_test.go diff --git a/internal/eventstore/v2/eventstore.go b/internal/eventstore/v2/eventstore.go index 369313103d..b9e3fb212c 100644 --- a/internal/eventstore/v2/eventstore.go +++ b/internal/eventstore/v2/eventstore.go @@ -5,6 +5,7 @@ import ( "encoding/json" "reflect" "sync" + "time" "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/v2/repository" @@ -28,6 +29,32 @@ type Event interface { // * struct which can be marshalled to json // * pointer to struct which can be marshalled to json Data() interface{} + //MetaData returns all data saved on a event + // It must not be set on push + // The event mapper function must set this struct + MetaData() *EventMetaData +} + +func MetaDataFromRepo(event *repository.Event) *EventMetaData { + return &EventMetaData{ + AggregateID: event.AggregateID, + AggregateType: AggregateType(event.AggregateType), + AggregateVersion: Version(event.Version), + PreviouseSequence: event.PreviousSequence, + ResourceOwner: event.ResourceOwner, + Sequence: event.Sequence, + CreationDate: event.CreationDate, + } +} + +type EventMetaData struct { + AggregateID string + AggregateType AggregateType + ResourceOwner string + AggregateVersion Version + Sequence uint64 + PreviouseSequence uint64 + CreationDate time.Time } //Eventstore abstracts all functions needed to store valid events @@ -39,7 +66,15 @@ type Eventstore struct { } type eventTypeInterceptors struct { - filterMapper func(*repository.Event) (Event, error) + eventMapper func(*repository.Event) (Event, error) +} + +func NewEventstore(repo repository.Repository) *Eventstore { + return &Eventstore{ + repo: repo, + eventMapper: map[EventType]eventTypeInterceptors{}, + interceptorMutex: sync.Mutex{}, + } } //Health checks if the eventstore can properly work @@ -56,6 +91,8 @@ type aggregater interface { //Events returns the events which will be pushed Events() []Event //ResourceOwner returns the organisation id which manages this aggregate + // resource owner is only on the inital push needed + // afterwards the resource owner of the previous event is taken ResourceOwner() string //Version represents the semantic version of the aggregate Version() Version @@ -133,10 +170,10 @@ func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Even for i, event := range events { interceptors, ok := es.eventMapper[EventType(event.Type)] - if !ok || interceptors.filterMapper == nil { + if !ok || interceptors.eventMapper == nil { return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined") } - mappedEvents[i], err = interceptors.filterMapper(event) + mappedEvents[i], err = interceptors.eventMapper(event) if err != nil { return nil, err } @@ -176,19 +213,15 @@ func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQu } //RegisterFilterEventMapper registers a function for mapping an eventstore event to an event -func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) error { - if eventType == "" || mapper == nil { - return errors.ThrowInvalidArgument(nil, "V2-IPpUR", "eventType and mapper must be filled") - } - +func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) *Eventstore { es.interceptorMutex.Lock() defer es.interceptorMutex.Unlock() interceptor := es.eventMapper[eventType] - interceptor.filterMapper = mapper + interceptor.eventMapper = mapper es.eventMapper[eventType] = interceptor - return nil + return es } func eventData(event Event) ([]byte, error) { diff --git a/internal/eventstore/v2/eventstore_test.go b/internal/eventstore/v2/eventstore_test.go index 5e3957a3ea..1248509ce4 100644 --- a/internal/eventstore/v2/eventstore_test.go +++ b/internal/eventstore/v2/eventstore_test.go @@ -69,6 +69,10 @@ func (e *testEvent) PreviousSequence() uint64 { return 0 } +func (e *testEvent) MetaData() *EventMetaData { + return nil +} + func testFilterMapper(*repository.Event) (Event, error) { return &testEvent{description: "hodor"}, nil } @@ -151,7 +155,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) { fields: fields{ eventMapper: map[EventType]eventTypeInterceptors{ "event.type": { - filterMapper: func(*repository.Event) (Event, error) { + eventMapper: func(*repository.Event) (Event, error) { return nil, errors.ThrowUnimplemented(nil, "V2-1qPvn", "unimplemented") }, }, @@ -181,7 +185,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) { } mapper := es.eventMapper[tt.args.eventType] - event, err := mapper.filterMapper(nil) + event, err := mapper.eventMapper(nil) if err != nil { t.Errorf("unexpected error %v", err) } diff --git a/internal/eventstore/v2/example_test.go b/internal/eventstore/v2/example_test.go index cf5ee32703..e475478fcb 100644 --- a/internal/eventstore/v2/example_test.go +++ b/internal/eventstore/v2/example_test.go @@ -2,60 +2,34 @@ package eventstore_test import ( "context" + "encoding/json" + "errors" + "fmt" + "testing" + "time" - "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/v2" "github.com/caos/zitadel/internal/eventstore/v2/repository" + "github.com/caos/zitadel/internal/eventstore/v2/repository/sql" ) -type singleAggregateRepo struct { - events []*repository.Event -} - -//Health checks if the connection to the storage is available -func (r *singleAggregateRepo) Health(ctx context.Context) error { - return nil -} - -// PushEvents adds all events of the given aggregates to the eventstreams of the aggregates. -// This call is transaction save. The transaction will be rolled back if one event fails -func (r *singleAggregateRepo) Push(ctx context.Context, events ...*repository.Event) error { - for _, event := range events { - if event.AggregateType != "test.agg" || event.AggregateID != "test" { - return errors.ThrowPreconditionFailed(nil, "V2-ZVDcA", "wrong aggregate") - } - } - - r.events = append(r.events, events...) - - return nil -} - -// Filter returns all events matching the given search query -func (r *singleAggregateRepo) Filter(ctx context.Context, searchQuery *repository.SearchQuery) (events []*repository.Event, err error) { - return r.events, nil -} - -//LatestSequence returns the latests sequence found by the the search query -func (r *singleAggregateRepo) LatestSequence(ctx context.Context, queryFactory *repository.SearchQuery) (uint64, error) { - if len(r.events) == 0 { - return 0, nil - } - return r.events[len(r.events)-1].Sequence, nil -} +// ------------------------------------------------------------ +// User aggregate start +// ------------------------------------------------------------ type UserAggregate struct { + eventstore.Aggregate FirstName string } func (a *UserAggregate) ID() string { - return "test" + return a.Aggregate.ID } func (a *UserAggregate) Type() eventstore.AggregateType { - return "test.agg" + return "test.user" } func (a *UserAggregate) Events() []eventstore.Event { - return nil + return a.Aggregate.Events } func (a *UserAggregate) ResourceOwner() string { return "caos" @@ -64,15 +38,56 @@ func (a *UserAggregate) Version() eventstore.Version { return "v1" } func (a *UserAggregate) PreviousSequence() uint64 { - return 0 + return a.Aggregate.PreviousSequence } +func NewUserAggregate(id string) *UserAggregate { + return &UserAggregate{ + Aggregate: *eventstore.NewAggregate(id), + } +} + +func (rm *UserAggregate) AppendEvents(events ...eventstore.Event) *UserAggregate { + rm.Aggregate.AppendEvents(events...) + return rm +} + +func (rm *UserAggregate) Reduce() error { + for _, event := range rm.Aggregate.Events { + switch e := event.(type) { + case *UserAddedEvent: + rm.FirstName = e.FirstName + case *UserFirstNameChangedEvent: + rm.FirstName = e.FirstName + } + } + return rm.Aggregate.Reduce() +} + +// ------------------------------------------------------------ +// User added event start +// ------------------------------------------------------------ + type UserAddedEvent struct { - FirstName string + FirstName string `json:"firstName"` + metaData *eventstore.EventMetaData +} + +func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { + return "user.added", func(event *repository.Event) (eventstore.Event, error) { + e := &UserAddedEvent{ + metaData: eventstore.MetaDataFromRepo(event), + } + err := json.Unmarshal(event.Data, e) + if err != nil { + return nil, err + } + return e, nil + } } func (e *UserAddedEvent) CheckPrevious() bool { - return false + return true } func (e *UserAddedEvent) EditorService() string { @@ -86,16 +101,39 @@ func (e *UserAddedEvent) EditorUser() string { func (e *UserAddedEvent) Type() eventstore.EventType { return "user.added" } + func (e *UserAddedEvent) Data() interface{} { return e } +func (e *UserAddedEvent) MetaData() *eventstore.EventMetaData { + return e.metaData +} + +// ------------------------------------------------------------ +// User first name changed event start +// ------------------------------------------------------------ + type UserFirstNameChangedEvent struct { - FirstName string + FirstName string `json:"firstName"` + metaData *eventstore.EventMetaData `json:"-"` +} + +func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { + return "user.firstName.changed", func(event *repository.Event) (eventstore.Event, error) { + e := &UserFirstNameChangedEvent{ + metaData: eventstore.MetaDataFromRepo(event), + } + err := json.Unmarshal(event.Data, e) + if err != nil { + return nil, err + } + return e, nil + } } func (e *UserFirstNameChangedEvent) CheckPrevious() bool { - return false + return true } func (e *UserFirstNameChangedEvent) EditorService() string { @@ -107,19 +145,135 @@ func (e *UserFirstNameChangedEvent) EditorUser() string { } func (e *UserFirstNameChangedEvent) Type() eventstore.EventType { - return "user.changed" + return "user.firstName.changed" } + func (e *UserFirstNameChangedEvent) Data() interface{} { return e } +func (e *UserFirstNameChangedEvent) MetaData() *eventstore.EventMetaData { + return e.metaData +} + +// ------------------------------------------------------------ +// User password checked event start +// ------------------------------------------------------------ + +type UserPasswordCheckedEvent struct { + metaData *eventstore.EventMetaData `json:"-"` +} + +func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { + return "user.password.checked", func(event *repository.Event) (eventstore.Event, error) { + return &UserPasswordCheckedEvent{ + metaData: eventstore.MetaDataFromRepo(event), + }, nil + } +} + +func (e *UserPasswordCheckedEvent) CheckPrevious() bool { + return false +} + +func (e *UserPasswordCheckedEvent) EditorService() string { + return "test.suite" +} + +func (e *UserPasswordCheckedEvent) EditorUser() string { + return "adlerhurst" +} + +func (e *UserPasswordCheckedEvent) Type() eventstore.EventType { + return "user.password.checked" +} + +func (e *UserPasswordCheckedEvent) Data() interface{} { + return nil +} + +func (e *UserPasswordCheckedEvent) MetaData() *eventstore.EventMetaData { + return e.metaData +} + +// ------------------------------------------------------------ +// Users read model start +// ------------------------------------------------------------ + +type UsersReadModel struct { + eventstore.ReadModel + Users []*UserReadModel +} + +func NewUsersReadModel() *UsersReadModel { + return &UsersReadModel{ + ReadModel: *eventstore.NewReadModel(""), + Users: []*UserReadModel{}, + } +} + +func (rm *UsersReadModel) AppendEvents(events ...eventstore.Event) (err error) { + rm.ReadModel.AppendEvents(events...) + for _, event := range events { + switch e := event.(type) { + case *UserAddedEvent: + user := NewUserReadModel(e.MetaData().AggregateID) + rm.Users = append(rm.Users, user) + err = user.AppendEvents(e) + case *UserFirstNameChangedEvent, *UserPasswordCheckedEvent: + _, user := rm.userByID(e.MetaData().AggregateID) + if user == nil { + return errors.New("user not found") + } + err = user.AppendEvents(e) + } + if err != nil { + return err + } + } + return nil +} + +func (rm *UsersReadModel) Reduce() error { + for _, user := range rm.Users { + err := user.Reduce() + if err != nil { + return err + } + } + rm.ReadModel.Reduce() + return nil +} + +func (rm *UsersReadModel) userByID(id string) (idx int, user *UserReadModel) { + for idx, user = range rm.Users { + if user.ReadModel.ID == id { + return idx, user + } + } + + return -1, nil +} + +// ------------------------------------------------------------ +// User read model start +// ------------------------------------------------------------ + type UserReadModel struct { eventstore.ReadModel - FirstName string + FirstName string + pwCheckCount int + lastPasswordCheck time.Time +} + +func NewUserReadModel(id string) *UserReadModel { + return &UserReadModel{ + ReadModel: *eventstore.NewReadModel(id), + } } func (rm *UserReadModel) AppendEvents(events ...eventstore.Event) error { - rm.ReadModel.Append(events...) + rm.ReadModel.AppendEvents(events...) return nil } @@ -130,7 +284,41 @@ func (rm *UserReadModel) Reduce() error { rm.FirstName = e.FirstName case *UserFirstNameChangedEvent: rm.FirstName = e.FirstName + case *UserPasswordCheckedEvent: + rm.pwCheckCount++ + rm.lastPasswordCheck = e.metaData.CreationDate } } + rm.ReadModel.Reduce() return nil } + +// ------------------------------------------------------------ +// Tests +// ------------------------------------------------------------ + +func TestUserReadModel(t *testing.T) { + es := eventstore.NewEventstore(sql.NewCRDB(testCRDBClient)) + es.RegisterFilterEventMapper(UserAddedEventMapper()). + RegisterFilterEventMapper(UserFirstNameChangedMapper()). + RegisterFilterEventMapper(UserPasswordCheckedMapper()) + + events, err := es.PushAggregates(context.Background(), + NewUserAggregate("1").AppendEvents(&UserAddedEvent{FirstName: "hodor"}), + NewUserAggregate("2").AppendEvents(&UserAddedEvent{FirstName: "hodor"}, &UserPasswordCheckedEvent{}, &UserPasswordCheckedEvent{}, &UserFirstNameChangedEvent{FirstName: "ueli"}), + ) + if err != nil { + t.Errorf("unexpected error on push aggregates: %v", err) + } + + events = append(events, nil) + + fmt.Printf("%+v\n", events) + + users := NewUsersReadModel() + err = es.FilterToReducer(context.Background(), eventstore.NewSearchQueryFactory(eventstore.ColumnsEvent, "test.user"), users) + if err != nil { + t.Errorf("unexpected error on filter to reducer: %v", err) + } + fmt.Printf("%+v", users) +} diff --git a/internal/eventstore/v2/local_crdb_test.go b/internal/eventstore/v2/local_crdb_test.go new file mode 100644 index 0000000000..ceaec04b4b --- /dev/null +++ b/internal/eventstore/v2/local_crdb_test.go @@ -0,0 +1,128 @@ +package eventstore_test + +import ( + "database/sql" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "testing" + + "github.com/caos/logging" + "github.com/cockroachdb/cockroach-go/v2/testserver" +) + +var ( + migrationsPath = os.ExpandEnv("${GOPATH}/src/github.com/caos/zitadel/migrations/cockroach") + testCRDBClient *sql.DB +) + +func TestMain(m *testing.M) { + ts, err := testserver.NewTestServer() + if err != nil { + logging.LogWithFields("REPOS-RvjLG", "error", err).Fatal("unable to start db") + } + + testCRDBClient, err = sql.Open("postgres", ts.PGURL().String()) + if err != nil { + logging.LogWithFields("REPOS-CF6dQ", "error", err).Fatal("unable to connect to db") + } + + defer func() { + testCRDBClient.Close() + ts.Stop() + }() + + if err = executeMigrations(); err != nil { + logging.LogWithFields("REPOS-jehDD", "error", err).Fatal("migrations failed") + } + + os.Exit(m.Run()) +} + +func executeMigrations() error { + files, err := migrationFilePaths() + if err != nil { + return err + } + sort.Sort(files) + for _, file := range files { + migration, err := ioutil.ReadFile(string(file)) + if err != nil { + return err + } + transactionInMigration := strings.Contains(string(migration), "BEGIN;") + exec := testCRDBClient.Exec + var tx *sql.Tx + if !transactionInMigration { + tx, err = testCRDBClient.Begin() + if err != nil { + return fmt.Errorf("begin file: %v || err: %w", file, err) + } + exec = tx.Exec + } + if _, err = exec(string(migration)); err != nil { + return fmt.Errorf("exec file: %v || err: %w", file, err) + } + if !transactionInMigration { + if err = tx.Commit(); err != nil { + return fmt.Errorf("commit file: %v || err: %w", file, err) + } + } + } + return nil +} + +type migrationPaths []string + +type version struct { + major int + minor int +} + +func versionFromPath(s string) version { + v := s[strings.Index(s, "/V")+2 : strings.Index(s, "__")] + splitted := strings.Split(v, ".") + res := version{} + var err error + if len(splitted) >= 1 { + res.major, err = strconv.Atoi(splitted[0]) + if err != nil { + panic(err) + } + } + + if len(splitted) >= 2 { + res.minor, err = strconv.Atoi(splitted[1]) + if err != nil { + panic(err) + } + } + + return res +} + +func (a migrationPaths) Len() int { return len(a) } +func (a migrationPaths) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a migrationPaths) Less(i, j int) bool { + versionI := versionFromPath(a[i]) + versionJ := versionFromPath(a[j]) + + return versionI.major < versionJ.major || + (versionI.major == versionJ.major && versionI.minor < versionJ.minor) +} + +func migrationFilePaths() (migrationPaths, error) { + files := make(migrationPaths, 0) + err := filepath.Walk(migrationsPath, func(path string, info os.FileInfo, err error) error { + if err != nil || info.IsDir() || !strings.HasSuffix(info.Name(), ".sql") { + return err + } + files = append(files, path) + return nil + }) + return files, err +} diff --git a/internal/eventstore/v2/read_model.go b/internal/eventstore/v2/read_model.go index 289f462826..127f8c1639 100644 --- a/internal/eventstore/v2/read_model.go +++ b/internal/eventstore/v2/read_model.go @@ -1,15 +1,77 @@ package eventstore +import "time" + +func NewReadModel(id string) *ReadModel { + return &ReadModel{ + ID: id, + Events: []Event{}, + } +} + //ReadModel is the minimum representation of a View model. // it might be saved in a database or in memory type ReadModel struct { - ProcessedSequence uint64 - ID string - Events []Event + ProcessedSequence uint64 `json:"-"` + ID string `json:"-"` + CreationDate time.Time `json:"-"` + ChangeDate time.Time `json:"-"` + Events []Event `json:"-"` } -//Append adds all the events to the aggregate. +//AppendEvents adds all the events to the read model. // The function doesn't compute the new state of the read model -func (a *ReadModel) Append(events ...Event) { - a.Events = append(a.Events, events...) +func (rm *ReadModel) AppendEvents(events ...Event) *ReadModel { + rm.Events = append(rm.Events, events...) + return rm +} + +//Reduce must be the last step in the reduce function of the extension +func (rm *ReadModel) Reduce() error { + if len(rm.Events) == 0 { + return nil + } + + if rm.CreationDate.IsZero() { + rm.CreationDate = rm.Events[0].MetaData().CreationDate + } + rm.ChangeDate = rm.Events[len(rm.Events)-1].MetaData().CreationDate + rm.ProcessedSequence = rm.Events[len(rm.Events)-1].MetaData().Sequence + // all events processed and not needed anymore + rm.Events = nil + rm.Events = []Event{} + return nil +} + +func NewAggregate(id string) *Aggregate { + return &Aggregate{ + ID: id, + Events: []Event{}, + } +} + +type Aggregate struct { + PreviousSequence uint64 `json:"-"` + ID string `json:"-"` + Events []Event `json:"-"` +} + +//AppendEvents adds all the events to the aggregate. +// The function doesn't compute the new state of the aggregate +func (a *Aggregate) AppendEvents(events ...Event) *Aggregate { + a.Events = append(a.Events, events...) + return a +} + +//Reduce must be the last step in the reduce function of the extension +func (a *Aggregate) Reduce() error { + if len(a.Events) == 0 { + return nil + } + + a.PreviousSequence = a.Events[len(a.Events)-1].MetaData().Sequence + // all events processed and not needed anymore + a.Events = nil + a.Events = []Event{} + return nil } diff --git a/internal/eventstore/v2/repository/sql/crdb.go b/internal/eventstore/v2/repository/sql/crdb.go index 928bc2ed45..fadc0fb84c 100644 --- a/internal/eventstore/v2/repository/sql/crdb.go +++ b/internal/eventstore/v2/repository/sql/crdb.go @@ -122,6 +122,10 @@ type CRDB struct { client *sql.DB } +func NewCRDB(client *sql.DB) *CRDB { + return &CRDB{client} +} + func (db *CRDB) Health(ctx context.Context) error { return db.client.Ping() } // Push adds all events to the eventstreams of the aggregates.