diff --git a/internal/eventstore/v2/event.go b/internal/eventstore/v2/event.go new file mode 100644 index 0000000000..2883f95e89 --- /dev/null +++ b/internal/eventstore/v2/event.go @@ -0,0 +1,53 @@ +package eventstore + +import ( + "time" + + "github.com/caos/zitadel/internal/eventstore/v2/repository" +) + +//Event is the representation of a state change +type Event interface { + //CheckPrevious ensures the event order if true + // if false the previous sequence is not checked on push + CheckPrevious() bool + //EditorService must return the name of the service which creates the new event + EditorService() string + //EditorUser must return the id of the user who created the event + EditorUser() string + //Type must return an event type which should be unique in the aggregate + Type() EventType + //Data returns the payload of the event. It represent the changed fields by the event + // valid types are: + // * nil (no payload), + // * json byte array + // * struct which can be marshalled to json + // * pointer to struct which can be marshalled to json + Data() interface{} + //MetaData returns all data saved on a event + // It must not be set on push + // The event mapper function must set this struct + MetaData() *EventMetaData +} + +func MetaDataFromRepo(event *repository.Event) *EventMetaData { + return &EventMetaData{ + AggregateID: event.AggregateID, + AggregateType: AggregateType(event.AggregateType), + AggregateVersion: Version(event.Version), + PreviouseSequence: event.PreviousSequence, + ResourceOwner: event.ResourceOwner, + Sequence: event.Sequence, + CreationDate: event.CreationDate, + } +} + +type EventMetaData struct { + AggregateID string + AggregateType AggregateType + ResourceOwner string + AggregateVersion Version + Sequence uint64 + PreviouseSequence uint64 + CreationDate time.Time +} diff --git a/internal/eventstore/v2/eventstore.go b/internal/eventstore/v2/eventstore.go index 78db97afb0..0d48cb860c 100644 --- a/internal/eventstore/v2/eventstore.go +++ b/internal/eventstore/v2/eventstore.go @@ -5,64 +5,17 @@ import ( "encoding/json" "reflect" "sync" - "time" "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/v2/repository" ) -//Event is the representation of a state change -type Event interface { - //CheckPrevious ensures the event order if true - // if false the previous sequence is not checked on push - CheckPrevious() bool - //EditorService must return the name of the service which creates the new event - EditorService() string - //EditorUser must return the id of the user who created the event - EditorUser() string - //Type must return an event type which should be unique in the aggregate - Type() EventType - //Data returns the payload of the event. It represent the changed fields by the event - // valid types are: - // * nil (no payload), - // * json byte array - // * struct which can be marshalled to json - // * pointer to struct which can be marshalled to json - Data() interface{} - //MetaData returns all data saved on a event - // It must not be set on push - // The event mapper function must set this struct - MetaData() *EventMetaData -} - -func MetaDataFromRepo(event *repository.Event) *EventMetaData { - return &EventMetaData{ - AggregateID: event.AggregateID, - AggregateType: AggregateType(event.AggregateType), - AggregateVersion: Version(event.Version), - PreviouseSequence: event.PreviousSequence, - ResourceOwner: event.ResourceOwner, - Sequence: event.Sequence, - CreationDate: event.CreationDate, - } -} - -type EventMetaData struct { - AggregateID string - AggregateType AggregateType - ResourceOwner string - AggregateVersion Version - Sequence uint64 - PreviouseSequence uint64 - CreationDate time.Time -} - //Eventstore abstracts all functions needed to store valid events // and filters the stored events type Eventstore struct { - repo repository.Repository - interceptorMutex sync.Mutex - eventMapper map[EventType]eventTypeInterceptors + repo repository.Repository + interceptorMutex sync.Mutex + eventInterceptors map[EventType]eventTypeInterceptors } type eventTypeInterceptors struct { @@ -71,9 +24,9 @@ type eventTypeInterceptors struct { func NewEventstore(repo repository.Repository) *Eventstore { return &Eventstore{ - repo: repo, - eventMapper: map[EventType]eventTypeInterceptors{}, - interceptorMutex: sync.Mutex{}, + repo: repo, + eventInterceptors: map[EventType]eventTypeInterceptors{}, + interceptorMutex: sync.Mutex{}, } } @@ -169,7 +122,7 @@ func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Even defer es.interceptorMutex.Unlock() for i, event := range events { - interceptors, ok := es.eventMapper[EventType(event.Type)] + interceptors, ok := es.eventInterceptors[EventType(event.Type)] if !ok || interceptors.eventMapper == nil { return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined") } @@ -220,9 +173,9 @@ func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func es.interceptorMutex.Lock() defer es.interceptorMutex.Unlock() - interceptor := es.eventMapper[eventType] + interceptor := es.eventInterceptors[eventType] interceptor.eventMapper = mapper - es.eventMapper[eventType] = interceptor + es.eventInterceptors[eventType] = interceptor return es } diff --git a/internal/eventstore/v2/eventstore_test.go b/internal/eventstore/v2/eventstore_test.go index 07c4de80bd..e0256c6beb 100644 --- a/internal/eventstore/v2/eventstore_test.go +++ b/internal/eventstore/v2/eventstore_test.go @@ -176,11 +176,11 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ - eventMapper: tt.fields.eventMapper, + eventInterceptors: tt.fields.eventMapper, } es = es.RegisterFilterEventMapper(tt.args.eventType, tt.args.mapper) - if len(es.eventMapper) != tt.res.mapperCount { - t.Errorf("unexpected mapper count: want %d, got %d", tt.res.mapperCount, len(es.eventMapper)) + if len(es.eventInterceptors) != tt.res.mapperCount { + t.Errorf("unexpected mapper count: want %d, got %d", tt.res.mapperCount, len(es.eventInterceptors)) return } @@ -188,7 +188,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) { return } - mapper := es.eventMapper[tt.args.eventType] + mapper := es.eventInterceptors[tt.args.eventType] event, err := mapper.eventMapper(nil) if err != nil { t.Errorf("unexpected error %v", err) @@ -846,15 +846,15 @@ func TestEventstore_Push(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ - repo: tt.fields.repo, - interceptorMutex: sync.Mutex{}, - eventMapper: map[EventType]eventTypeInterceptors{}, + repo: tt.fields.repo, + interceptorMutex: sync.Mutex{}, + eventInterceptors: map[EventType]eventTypeInterceptors{}, } for eventType, mapper := range tt.fields.eventMapper { es = es.RegisterFilterEventMapper(eventType, mapper) } - if len(es.eventMapper) != len(tt.fields.eventMapper) { - t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper)) + if len(es.eventInterceptors) != len(tt.fields.eventMapper) { + t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors)) t.FailNow() } @@ -970,16 +970,16 @@ func TestEventstore_FilterEvents(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ - repo: tt.fields.repo, - interceptorMutex: sync.Mutex{}, - eventMapper: map[EventType]eventTypeInterceptors{}, + repo: tt.fields.repo, + interceptorMutex: sync.Mutex{}, + eventInterceptors: map[EventType]eventTypeInterceptors{}, } for eventType, mapper := range tt.fields.eventMapper { es = es.RegisterFilterEventMapper(eventType, mapper) } - if len(es.eventMapper) != len(tt.fields.eventMapper) { - t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper)) + if len(es.eventInterceptors) != len(tt.fields.eventMapper) { + t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors)) t.FailNow() } @@ -1257,15 +1257,15 @@ func TestEventstore_FilterToReducer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ - repo: tt.fields.repo, - interceptorMutex: sync.Mutex{}, - eventMapper: map[EventType]eventTypeInterceptors{}, + repo: tt.fields.repo, + interceptorMutex: sync.Mutex{}, + eventInterceptors: map[EventType]eventTypeInterceptors{}, } for eventType, mapper := range tt.fields.eventMapper { es = es.RegisterFilterEventMapper(eventType, mapper) } - if len(es.eventMapper) != len(tt.fields.eventMapper) { - t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper)) + if len(es.eventInterceptors) != len(tt.fields.eventMapper) { + t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors)) t.FailNow() } @@ -1410,14 +1410,14 @@ func TestEventstore_mapEvents(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ - interceptorMutex: sync.Mutex{}, - eventMapper: map[EventType]eventTypeInterceptors{}, + interceptorMutex: sync.Mutex{}, + eventInterceptors: map[EventType]eventTypeInterceptors{}, } for eventType, mapper := range tt.fields.eventMapper { es = es.RegisterFilterEventMapper(eventType, mapper) } - if len(es.eventMapper) != len(tt.fields.eventMapper) { - t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper)) + if len(es.eventInterceptors) != len(tt.fields.eventMapper) { + t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors)) t.FailNow() } diff --git a/internal/eventstore/v2/example_test.go b/internal/eventstore/v2/example_test.go index 8b918e8c6a..a41806526c 100644 --- a/internal/eventstore/v2/example_test.go +++ b/internal/eventstore/v2/example_test.go @@ -196,6 +196,46 @@ func (e *UserPasswordCheckedEvent) MetaData() *eventstore.EventMetaData { return e.metaData } +// ------------------------------------------------------------ +// User deleted event +// ------------------------------------------------------------ + +type UserDeletedEvent struct { + metaData *eventstore.EventMetaData `json:"-"` +} + +func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { + return "user.deleted", func(event *repository.Event) (eventstore.Event, error) { + return &UserDeletedEvent{ + metaData: eventstore.MetaDataFromRepo(event), + }, nil + } +} + +func (e *UserDeletedEvent) CheckPrevious() bool { + return false +} + +func (e *UserDeletedEvent) EditorService() string { + return "test.suite" +} + +func (e *UserDeletedEvent) EditorUser() string { + return "adlerhurst" +} + +func (e *UserDeletedEvent) Type() eventstore.EventType { + return "user.deleted" +} + +func (e *UserDeletedEvent) Data() interface{} { + return nil +} + +func (e *UserDeletedEvent) MetaData() *eventstore.EventMetaData { + return e.metaData +} + // ------------------------------------------------------------ // Users read model start // ------------------------------------------------------------ @@ -228,15 +268,20 @@ func (rm *UsersReadModel) AppendEvents(events ...eventstore.Event) (err error) { return errors.New("user not found") } err = user.AppendEvents(e) + case *UserDeletedEvent: + idx, _ := rm.userByID(e.metaData.AggregateID) + if idx < 0 { + return nil + } + copy(rm.Users[idx:], rm.Users[idx+1:]) + rm.Users[len(rm.Users)-1] = nil // or the zero value of T + rm.Users = rm.Users[:len(rm.Users)-1] } if err != nil { return err } } - //begin - //for stmt range stmts; exec - //commit return nil } @@ -307,11 +352,13 @@ func TestUserReadModel(t *testing.T) { es := eventstore.NewEventstore(sql.NewCRDB(testCRDBClient)) es.RegisterFilterEventMapper(UserAddedEventMapper()). RegisterFilterEventMapper(UserFirstNameChangedMapper()). - RegisterFilterEventMapper(UserPasswordCheckedMapper()) + RegisterFilterEventMapper(UserPasswordCheckedMapper()). + RegisterFilterEventMapper(UserDeletedMapper()) events, err := es.PushAggregates(context.Background(), NewUserAggregate("1").AppendEvents(&UserAddedEvent{FirstName: "hodor"}), NewUserAggregate("2").AppendEvents(&UserAddedEvent{FirstName: "hodor"}, &UserPasswordCheckedEvent{}, &UserPasswordCheckedEvent{}, &UserFirstNameChangedEvent{FirstName: "ueli"}), + NewUserAggregate("2").AppendEvents(&UserDeletedEvent{}), ) if err != nil { t.Errorf("unexpected error on push aggregates: %v", err)