iam events

This commit is contained in:
adlerhurst
2020-11-06 17:25:07 +01:00
parent f4bd5ddcbc
commit f7f810caa5
17 changed files with 496 additions and 107 deletions

View File

@@ -1,8 +1,10 @@
package eventstore
import (
"context"
"time"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
@@ -75,4 +77,10 @@ func BaseEventFromRepo(event *repository.Event) *BaseEvent {
}
}
func NewBaseEvent()
func NewBaseEventForPush(ctx context.Context, service string, typ EventType) *BaseEvent {
return &BaseEvent{
User: authz.GetCtxData(ctx).UserID,
Service: service,
EventType: typ,
}
}

View File

@@ -19,7 +19,7 @@ type Eventstore struct {
}
type eventTypeInterceptors struct {
eventMapper func(*repository.Event) (Event, error)
eventMapper func(*repository.Event) (EventReader, error)
}
func NewEventstore(repo repository.Repository) *Eventstore {
@@ -166,7 +166,7 @@ func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQu
}
//RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) *Eventstore {
func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (EventReader, error)) *Eventstore {
if mapper == nil || eventType == "" {
return es
}

View File

@@ -55,11 +55,11 @@ func newTestEvent(description string, data func() interface{}, checkPrevious boo
description: description,
data: data,
shouldCheckPrevious: checkPrevious,
BaseEvent: BaseEvent{
User: "editorUser",
Service: "editorService",
EventType: "test.event",
},
BaseEvent: *NewBaseEventForPush(
"editorUser",
"editorService",
"test.event",
),
}
}
@@ -71,7 +71,7 @@ func (e *testEvent) Data() interface{} {
return e.data()
}
func testFilterMapper(event *repository.Event) (Event, error) {
func testFilterMapper(event *repository.Event) (EventReader, error) {
if event == nil {
return newTestEvent("hodor", nil, false), nil
}
@@ -84,7 +84,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
}
type args struct {
eventType EventType
mapper func(*repository.Event) (Event, error)
mapper func(*repository.Event) (EventReader, error)
}
type res struct {
event Event
@@ -158,7 +158,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"event.type": {
eventMapper: func(*repository.Event) (Event, error) {
eventMapper: func(*repository.Event) (EventReader, error) {
return nil, errors.ThrowUnimplemented(nil, "V2-1qPvn", "unimplemented")
},
},
@@ -627,7 +627,7 @@ func TestEventstore_Push(t *testing.T) {
}
type fields struct {
repo *testRepo
eventMapper map[EventType]func(*repository.Event) (Event, error)
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
}
type res struct {
wantErr bool
@@ -672,8 +672,8 @@ func TestEventstore_Push(t *testing.T) {
},
},
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -730,8 +730,8 @@ func TestEventstore_Push(t *testing.T) {
},
),
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -817,8 +817,8 @@ func TestEventstore_Push(t *testing.T) {
},
),
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -911,7 +911,7 @@ func TestEventstore_FilterEvents(t *testing.T) {
}
type fields struct {
repo *testRepo
eventMapper map[EventType]func(*repository.Event) (Event, error)
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
}
type res struct {
wantErr bool
@@ -944,8 +944,8 @@ func TestEventstore_FilterEvents(t *testing.T) {
events: []*repository.Event{},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -967,8 +967,8 @@ func TestEventstore_FilterEvents(t *testing.T) {
t: t,
err: errors.ThrowInternal(nil, "V2-RfkBa", "test err"),
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -995,8 +995,8 @@ func TestEventstore_FilterEvents(t *testing.T) {
},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -1126,7 +1126,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
type testReducer struct {
t *testing.T
events []Event
events []EventReader
expectedLength int
err error
}
@@ -1142,7 +1142,7 @@ func (r *testReducer) Reduce() error {
return nil
}
func (r *testReducer) AppendEvents(e ...Event) error {
func (r *testReducer) AppendEvents(e ...EventReader) error {
if r.err != nil {
return r.err
}
@@ -1157,7 +1157,7 @@ func TestEventstore_FilterToReducer(t *testing.T) {
}
type fields struct {
repo *testRepo
eventMapper map[EventType]func(*repository.Event) (Event, error)
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
}
type res struct {
wantErr bool
@@ -1194,8 +1194,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
events: []*repository.Event{},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -1221,8 +1221,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
t: t,
err: errors.ThrowInternal(nil, "V2-RfkBa", "test err"),
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -1253,8 +1253,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -1282,8 +1282,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
@@ -1371,13 +1371,13 @@ func compareEvents(t *testing.T, want, got *repository.Event) {
func TestEventstore_mapEvents(t *testing.T) {
type fields struct {
eventMapper map[EventType]func(*repository.Event) (Event, error)
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
}
type args struct {
events []*repository.Event
}
type res struct {
events []Event
events []EventReader
wantErr bool
}
tests := []struct {
@@ -1396,7 +1396,7 @@ func TestEventstore_mapEvents(t *testing.T) {
},
},
fields: fields{
eventMapper: map[EventType]func(*repository.Event) (Event, error){},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){},
},
res: res{
wantErr: true,
@@ -1412,8 +1412,8 @@ func TestEventstore_mapEvents(t *testing.T) {
},
},
fields: fields{
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(*repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(*repository.Event) (EventReader, error) {
return nil, errors.ThrowInternal(nil, "V2-8FbQk", "test err")
},
},
@@ -1432,14 +1432,14 @@ func TestEventstore_mapEvents(t *testing.T) {
},
},
fields: fields{
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(*repository.Event) (Event, error) {
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(*repository.Event) (EventReader, error) {
return &testEvent{}, nil
},
},
},
res: res{
events: []Event{
events: []EventReader{
&testEvent{},
},
wantErr: false,

View File

@@ -92,8 +92,8 @@ func NewUserAddedEvent(firstName string) *UserAddedEvent {
}
}
func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.added", func(event *repository.Event) (eventstore.Event, error) {
func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.added", func(event *repository.Event) (eventstore.EventReader, error) {
e := &UserAddedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
@@ -134,8 +134,8 @@ func NewUserFirstNameChangedEvent(firstName string) *UserFirstNameChangedEvent {
}
}
func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.firstName.changed", func(event *repository.Event) (eventstore.Event, error) {
func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.firstName.changed", func(event *repository.Event) (eventstore.EventReader, error) {
e := &UserFirstNameChangedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
@@ -173,8 +173,8 @@ func NewUserPasswordCheckedEvent() *UserPasswordCheckedEvent {
}
}
func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.password.checked", func(event *repository.Event) (eventstore.Event, error) {
func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.password.checked", func(event *repository.Event) (eventstore.EventReader, error) {
return &UserPasswordCheckedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}, nil
@@ -207,8 +207,8 @@ func NewUserDeletedEvent() *UserDeletedEvent {
}
}
func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.deleted", func(event *repository.Event) (eventstore.Event, error) {
func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.deleted", func(event *repository.Event) (eventstore.EventReader, error) {
return &UserDeletedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}, nil
@@ -239,24 +239,24 @@ func NewUsersReadModel() *UsersReadModel {
}
}
func (rm *UsersReadModel) AppendEvents(events ...eventstore.Event) (err error) {
func (rm *UsersReadModel) AppendEvents(events ...eventstore.EventReader) (err error) {
rm.ReadModel.AppendEvents(events...)
for _, event := range events {
switch e := event.(type) {
case *UserAddedEvent:
//insert
user := NewUserReadModel(e.Base().AggregateID)
user := NewUserReadModel(e.AggregateID())
rm.Users = append(rm.Users, user)
err = user.AppendEvents(e)
case *UserFirstNameChangedEvent, *UserPasswordCheckedEvent:
//update
_, user := rm.userByID(e.Base().aggregateID)
_, user := rm.userByID(e.AggregateID())
if user == nil {
return errors.New("user not found")
}
err = user.AppendEvents(e)
case *UserDeletedEvent:
idx, _ := rm.userByID(e.Base().AggregateID)
idx, _ := rm.userByID(e.AggregateID())
if idx < 0 {
return nil
}
@@ -310,7 +310,7 @@ func NewUserReadModel(id string) *UserReadModel {
}
}
func (rm *UserReadModel) AppendEvents(events ...eventstore.Event) error {
func (rm *UserReadModel) AppendEvents(events ...eventstore.EventReader) error {
rm.ReadModel.AppendEvents(events...)
return nil
}
@@ -324,7 +324,7 @@ func (rm *UserReadModel) Reduce() error {
rm.FirstName = e.FirstName
case *UserPasswordCheckedEvent:
rm.pwCheckCount++
rm.lastPasswordCheck = e.Base().CreationDate
rm.lastPasswordCheck = e.CreationDate()
}
}
rm.ReadModel.Reduce()

View File

@@ -5,23 +5,23 @@ import "time"
func NewReadModel(id string) *ReadModel {
return &ReadModel{
ID: id,
Events: []Event{},
Events: []EventReader{},
}
}
//ReadModel is the minimum representation of a View model.
// it might be saved in a database or in memory
type ReadModel struct {
ProcessedSequence uint64 `json:"-"`
ID string `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []Event `json:"-"`
ProcessedSequence uint64 `json:"-"`
ID string `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []EventReader `json:"-"`
}
//AppendEvents adds all the events to the read model.
// The function doesn't compute the new state of the read model
func (rm *ReadModel) AppendEvents(events ...Event) *ReadModel {
func (rm *ReadModel) AppendEvents(events ...EventReader) *ReadModel {
rm.Events = append(rm.Events, events...)
return rm
}
@@ -33,13 +33,13 @@ func (rm *ReadModel) Reduce() error {
}
if rm.CreationDate.IsZero() {
rm.CreationDate = rm.Events[0].Base().creationDate
rm.CreationDate = rm.Events[0].CreationDate()
}
rm.ChangeDate = rm.Events[len(rm.Events)-1].Base().creationDate
rm.ProcessedSequence = rm.Events[len(rm.Events)-1].Base().sequence
rm.ChangeDate = rm.Events[len(rm.Events)-1].CreationDate()
rm.ProcessedSequence = rm.Events[len(rm.Events)-1].Sequence()
// all events processed and not needed anymore
rm.Events = nil
rm.Events = []Event{}
rm.Events = []EventReader{}
return nil
}
@@ -69,7 +69,7 @@ func (a *Aggregate) Reduce() error {
return nil
}
a.PreviousSequence = a.Events[len(a.Events)-1].Base().sequence
a.PreviousSequence = a.Events[len(a.Events)-1].Sequence()
// all events processed and not needed anymore
a.Events = nil
a.Events = []Event{}