try splitt event

This commit is contained in:
adlerhurst 2020-11-06 13:47:27 +01:00
parent 756a4f1d08
commit f4bd5ddcbc
9 changed files with 428 additions and 245 deletions

View File

@ -2,18 +2,12 @@ package eventstore
import ( import (
"time" "time"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
) )
//Event is the representation of a state change type EventPusher interface {
type Event interface { // EditorService is the service who wants to push the event
//CheckPrevious ensures the event order if true
// if false the previous sequence is not checked on push
CheckPrevious() bool
//EditorService must return the name of the service which creates the new event
EditorService() string EditorService() string
//EditorUser must return the id of the user who created the event //EditorUser is the user who wants to push the event
EditorUser() string EditorUser() string
//Type must return an event type which should be unique in the aggregate //Type must return an event type which should be unique in the aggregate
Type() EventType Type() EventType
@ -24,30 +18,30 @@ type Event interface {
// * struct which can be marshalled to json // * struct which can be marshalled to json
// * pointer to struct which can be marshalled to json // * pointer to struct which can be marshalled to json
Data() interface{} Data() interface{}
//MetaData returns all data saved on a event //CheckPrevious ensures the event order if true
// It must not be set on push // if false the previous sequence is not checked on push
// The event mapper function must set this struct CheckPrevious() bool
MetaData() *EventMetaData
} }
func MetaDataFromRepo(event *repository.Event) *EventMetaData { type EventReader interface {
return &EventMetaData{ // EditorService is the service who pushed the event
AggregateID: event.AggregateID, EditorService() string
AggregateType: AggregateType(event.AggregateType), //EditorUser is the user who pushed the event
AggregateVersion: Version(event.Version), EditorUser() string
PreviouseSequence: event.PreviousSequence, //Type is the type of the event
ResourceOwner: event.ResourceOwner, Type() EventType
Sequence: event.Sequence,
CreationDate: event.CreationDate, AggregateID() string
} AggregateType() AggregateType
ResourceOwner() string
AggregateVersion() Version
Sequence() uint64
PreviousSequence() uint64
CreationDate() time.Time
} }
type EventMetaData struct { //Event is the representation of a state change
AggregateID string type Event interface {
AggregateType AggregateType EventPusher
ResourceOwner string EventReader
AggregateVersion Version
Sequence uint64
PreviouseSequence uint64
CreationDate time.Time
} }

View File

@ -0,0 +1,78 @@
package eventstore
import (
"time"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
type BaseEvent struct {
aggregateID string `json:"-"`
aggregateType AggregateType `json:"-"`
EventType EventType `json:"-"`
resourceOwner string `json:"-"`
aggregateVersion Version `json:"-"`
sequence uint64 `json:"-"`
previouseSequence uint64 `json:"-"`
creationDate time.Time `json:"-"`
//User is the user who created the event
User string `json:"-"`
//Service is the service which created the event
Service string `json:"-"`
}
// EditorService implements EventPusher
func (e *BaseEvent) EditorService() string {
return e.Service
}
//EditorUser implements EventPusher
func (e *BaseEvent) EditorUser() string {
return e.User
}
//Type implements EventPusher
func (e *BaseEvent) Type() EventType {
return e.EventType
}
func (e *BaseEvent) AggregateID() string {
return e.aggregateID
}
func (e *BaseEvent) AggregateType() AggregateType {
return e.aggregateType
}
func (e *BaseEvent) ResourceOwner() string {
return e.resourceOwner
}
func (e *BaseEvent) AggregateVersion() Version {
return e.aggregateVersion
}
func (e *BaseEvent) Sequence() uint64 {
return e.sequence
}
func (e *BaseEvent) PreviousSequence() uint64 {
return e.previouseSequence
}
func (e *BaseEvent) CreationDate() time.Time {
return e.creationDate
}
func BaseEventFromRepo(event *repository.Event) *BaseEvent {
return &BaseEvent{
aggregateID: event.AggregateID,
aggregateType: AggregateType(event.AggregateType),
aggregateVersion: Version(event.Version),
EventType: EventType(event.Type),
creationDate: event.CreationDate,
sequence: event.Sequence,
previouseSequence: event.PreviousSequence,
resourceOwner: event.ResourceOwner,
Service: event.EditorService,
User: event.EditorUser,
}
}
func NewBaseEvent()

View File

@ -42,7 +42,7 @@ type aggregater interface {
//Type returns the aggregate type //Type returns the aggregate type
Type() AggregateType Type() AggregateType
//Events returns the events which will be pushed //Events returns the events which will be pushed
Events() []Event Events() []EventPusher
//ResourceOwner returns the organisation id which manages this aggregate //ResourceOwner returns the organisation id which manages this aggregate
// resource owner is only on the inital push needed // resource owner is only on the inital push needed
// afterwards the resource owner of the previous event is taken // afterwards the resource owner of the previous event is taken
@ -58,7 +58,7 @@ type aggregater interface {
//PushAggregates maps the events of all aggregates to an eventstore event //PushAggregates maps the events of all aggregates to an eventstore event
// based on the pushMapper // based on the pushMapper
func (es *Eventstore) PushAggregates(ctx context.Context, aggregates ...aggregater) ([]Event, error) { func (es *Eventstore) PushAggregates(ctx context.Context, aggregates ...aggregater) ([]EventReader, error) {
events, err := es.aggregatesToEvents(aggregates) events, err := es.aggregatesToEvents(aggregates)
if err != nil { if err != nil {
return nil, err return nil, err
@ -102,7 +102,7 @@ func (es *Eventstore) aggregatesToEvents(aggregates []aggregater) ([]*repository
//FilterEvents filters the stored events based on the searchQuery //FilterEvents filters the stored events based on the searchQuery
// and maps the events to the defined event structs // and maps the events to the defined event structs
func (es *Eventstore) FilterEvents(ctx context.Context, queryFactory *SearchQueryFactory) ([]Event, error) { func (es *Eventstore) FilterEvents(ctx context.Context, queryFactory *SearchQueryFactory) ([]EventReader, error) {
query, err := queryFactory.build() query, err := queryFactory.build()
if err != nil { if err != nil {
return nil, err return nil, err
@ -115,8 +115,8 @@ func (es *Eventstore) FilterEvents(ctx context.Context, queryFactory *SearchQuer
return es.mapEvents(events) return es.mapEvents(events)
} }
func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Event, err error) { func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []EventReader, err error) {
mappedEvents = make([]Event, len(events)) mappedEvents = make([]EventReader, len(events))
es.interceptorMutex.Lock() es.interceptorMutex.Lock()
defer es.interceptorMutex.Unlock() defer es.interceptorMutex.Unlock()
@ -140,7 +140,7 @@ type reducer interface {
// it only appends the newly added events // it only appends the newly added events
Reduce() error Reduce() error
//AppendEvents appends the passed events to an internal list of events //AppendEvents appends the passed events to an internal list of events
AppendEvents(...Event) error AppendEvents(...EventReader) error
} }
//FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function //FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
@ -180,7 +180,7 @@ func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func
return es return es
} }
func eventData(event Event) ([]byte, error) { func eventData(event EventPusher) ([]byte, error) {
switch data := event.Data().(type) { switch data := event.Data().(type) {
case nil: case nil:
return nil, nil return nil, nil

View File

@ -13,7 +13,7 @@ import (
type testAggregate struct { type testAggregate struct {
id string id string
events []Event events []EventPusher
previousSequence uint64 previousSequence uint64
} }
@ -25,7 +25,7 @@ func (a *testAggregate) Type() AggregateType {
return "test.aggregate" return "test.aggregate"
} }
func (a *testAggregate) Events() []Event { func (a *testAggregate) Events() []EventPusher {
return a.events return a.events
} }
@ -43,38 +43,39 @@ func (a *testAggregate) PreviousSequence() uint64 {
// testEvent implements the Event interface // testEvent implements the Event interface
type testEvent struct { type testEvent struct {
BaseEvent
description string description string
shouldCheckPrevious bool shouldCheckPrevious bool
data func() interface{} data func() interface{}
} }
func newTestEvent(description string, data func() interface{}, checkPrevious bool) *testEvent {
return &testEvent{
description: description,
data: data,
shouldCheckPrevious: checkPrevious,
BaseEvent: BaseEvent{
User: "editorUser",
Service: "editorService",
EventType: "test.event",
},
}
}
func (e *testEvent) CheckPrevious() bool { func (e *testEvent) CheckPrevious() bool {
return e.shouldCheckPrevious return e.shouldCheckPrevious
} }
func (e *testEvent) EditorService() string {
return "editorService"
}
func (e *testEvent) EditorUser() string {
return "editorUser"
}
func (e *testEvent) Type() EventType {
return "test.event"
}
func (e *testEvent) Data() interface{} { func (e *testEvent) Data() interface{} {
return e.data() return e.data()
} }
func (e *testEvent) PreviousSequence() uint64 { func testFilterMapper(event *repository.Event) (Event, error) {
return 0 if event == nil {
} return newTestEvent("hodor", nil, false), nil
}
func (e *testEvent) MetaData() *EventMetaData { return &testEvent{description: "hodor", BaseEvent: *BaseEventFromRepo(event)}, nil
return nil
}
func testFilterMapper(*repository.Event) (Event, error) {
return &testEvent{description: "hodor"}, nil
} }
func Test_eventstore_RegisterFilterEventMapper(t *testing.T) { func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
@ -132,7 +133,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
mapper: testFilterMapper, mapper: testFilterMapper,
}, },
res: res{ res: res{
event: &testEvent{description: "hodor"}, event: newTestEvent("hodor", nil, false),
mapperCount: 1, mapperCount: 1,
}, },
}, },
@ -148,7 +149,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
mapper: testFilterMapper, mapper: testFilterMapper,
}, },
res: res{ res: res{
event: &testEvent{description: "hodor"}, event: newTestEvent("hodor", nil, false),
mapperCount: 2, mapperCount: 2,
}, },
}, },
@ -168,7 +169,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
mapper: testFilterMapper, mapper: testFilterMapper,
}, },
res: res{ res: res{
event: &testEvent{description: "hodor"}, event: newTestEvent("hodor", nil, false),
mapperCount: 2, mapperCount: 2,
}, },
}, },
@ -195,7 +196,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
} }
if !reflect.DeepEqual(tt.res.event, event) { if !reflect.DeepEqual(tt.res.event, event) {
t.Errorf("events should be deep equal. \ngot %v\nwant %v", event, tt.res.event) t.Errorf("events should be deep equal. \ngot %#v\nwant %#v", event, tt.res.event)
} }
}) })
} }
@ -203,7 +204,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
func Test_eventData(t *testing.T) { func Test_eventData(t *testing.T) {
type args struct { type args struct {
event Event event EventPusher
} }
type res struct { type res struct {
jsonText []byte jsonText []byte
@ -217,11 +218,12 @@ func Test_eventData(t *testing.T) {
{ {
name: "data as json bytes", name: "data as json bytes",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
return []byte(`{"piff":"paff"}`) return []byte(`{"piff":"paff"}`)
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(`{"piff":"paff"}`), jsonText: []byte(`{"piff":"paff"}`),
@ -231,11 +233,12 @@ func Test_eventData(t *testing.T) {
{ {
name: "data as invalid json bytes", name: "data as invalid json bytes",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
return []byte(`{"piffpaff"}`) return []byte(`{"piffpaff"}`)
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(nil), jsonText: []byte(nil),
@ -245,13 +248,14 @@ func Test_eventData(t *testing.T) {
{ {
name: "data as struct", name: "data as struct",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
return struct { return struct {
Piff string `json:"piff"` Piff string `json:"piff"`
}{Piff: "paff"} }{Piff: "paff"}
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(`{"piff":"paff"}`), jsonText: []byte(`{"piff":"paff"}`),
@ -261,13 +265,14 @@ func Test_eventData(t *testing.T) {
{ {
name: "data as ptr to struct", name: "data as ptr to struct",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
return &struct { return &struct {
Piff string `json:"piff"` Piff string `json:"piff"`
}{Piff: "paff"} }{Piff: "paff"}
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(`{"piff":"paff"}`), jsonText: []byte(`{"piff":"paff"}`),
@ -277,11 +282,12 @@ func Test_eventData(t *testing.T) {
{ {
name: "no data", name: "no data",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
return nil return nil
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(nil), jsonText: []byte(nil),
@ -291,11 +297,12 @@ func Test_eventData(t *testing.T) {
{ {
name: "invalid because primitive", name: "invalid because primitive",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
return "" return ""
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(nil), jsonText: []byte(nil),
@ -305,12 +312,13 @@ func Test_eventData(t *testing.T) {
{ {
name: "invalid because pointer to primitive", name: "invalid because pointer to primitive",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
var s string var s string
return &s return &s
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(nil), jsonText: []byte(nil),
@ -320,13 +328,14 @@ func Test_eventData(t *testing.T) {
{ {
name: "invalid because invalid struct for json", name: "invalid because invalid struct for json",
args: args{ args: args{
event: &testEvent{ event: newTestEvent(
data: func() interface{} { "hodor",
func() interface{} {
return struct { return struct {
Field chan string `json:"field"` Field chan string `json:"field"`
}{} }{}
}, },
}, false),
}, },
res: res{ res: res{
jsonText: []byte(nil), jsonText: []byte(nil),
@ -367,11 +376,13 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: false, func() interface{} {
}, return nil
},
false),
}, },
}, },
}, },
@ -399,15 +410,19 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: false, func() interface{} {
}, return nil
&testEvent{ },
data: func() interface{} { return nil }, false),
shouldCheckPrevious: false, newTestEvent(
}, "",
func() interface{} {
return nil
},
false),
}, },
}, },
}, },
@ -446,11 +461,13 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return `{"data":""` }, "",
shouldCheckPrevious: false, func() interface{} {
}, return `{"data":""`
},
false),
}, },
}, },
}, },
@ -465,24 +482,30 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: false, func() interface{} {
}, return nil
&testEvent{ },
data: func() interface{} { return nil }, false),
shouldCheckPrevious: false, newTestEvent(
}, "",
func() interface{} {
return nil
},
false),
}, },
}, },
&testAggregate{ &testAggregate{
id: "2", id: "2",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: true, func() interface{} {
}, return nil
},
true),
}, },
}, },
}, },
@ -621,11 +644,13 @@ func TestEventstore_Push(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: false, func() interface{} {
}, return nil
},
false),
}, },
}, },
}, },
@ -660,15 +685,19 @@ func TestEventstore_Push(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: false, func() interface{} {
}, return nil
&testEvent{ },
data: func() interface{} { return nil }, false),
shouldCheckPrevious: false, newTestEvent(
}, "",
func() interface{} {
return nil
},
false),
}, },
}, },
}, },
@ -717,24 +746,30 @@ func TestEventstore_Push(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: false, func() interface{} {
}, return nil
&testEvent{ },
data: func() interface{} { return nil }, false),
shouldCheckPrevious: false, newTestEvent(
}, "",
func() interface{} {
return nil
},
false),
}, },
}, },
&testAggregate{ &testAggregate{
id: "2", id: "2",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: true, func() interface{} {
}, return nil
},
true),
}, },
}, },
}, },
@ -798,11 +833,13 @@ func TestEventstore_Push(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return nil }, "",
shouldCheckPrevious: false, func() interface{} {
}, return nil
},
false),
}, },
}, },
}, },
@ -823,11 +860,13 @@ func TestEventstore_Push(t *testing.T) {
aggregates: []aggregater{ aggregates: []aggregater{
&testAggregate{ &testAggregate{
id: "1", id: "1",
events: []Event{ events: []EventPusher{
&testEvent{ newTestEvent(
data: func() interface{} { return `{"data":""` }, "",
shouldCheckPrevious: false, func() interface{} {
}, return `{"data":""`
},
false),
}, },
}, },
}, },

View File

@ -18,6 +18,8 @@ import (
// ------------------------------------------------------------ // ------------------------------------------------------------
type UserAggregate struct { type UserAggregate struct {
eventstore.BaseEvent
eventstore.Aggregate eventstore.Aggregate
FirstName string FirstName string
} }
@ -28,8 +30,13 @@ func (a *UserAggregate) ID() string {
func (a *UserAggregate) Type() eventstore.AggregateType { func (a *UserAggregate) Type() eventstore.AggregateType {
return "test.user" return "test.user"
} }
func (a *UserAggregate) Events() []eventstore.Event { func (a *UserAggregate) Events() []eventstore.EventPusher {
return a.Aggregate.Events events := make([]eventstore.EventPusher, len(a.Aggregate.Events))
for i, event := range a.Aggregate.Events {
events[i] = event
}
return events
} }
func (a *UserAggregate) ResourceOwner() string { func (a *UserAggregate) ResourceOwner() string {
return "caos" return "caos"
@ -69,14 +76,26 @@ func (rm *UserAggregate) Reduce() error {
// ------------------------------------------------------------ // ------------------------------------------------------------
type UserAddedEvent struct { type UserAddedEvent struct {
eventstore.BaseEvent `json:"-"`
FirstName string `json:"firstName"` FirstName string `json:"firstName"`
metaData *eventstore.EventMetaData }
func NewUserAddedEvent(firstName string) *UserAddedEvent {
return &UserAddedEvent{
FirstName: firstName,
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.added",
},
}
} }
func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.added", func(event *repository.Event) (eventstore.Event, error) { return "user.added", func(event *repository.Event) (eventstore.Event, error) {
e := &UserAddedEvent{ e := &UserAddedEvent{
metaData: eventstore.MetaDataFromRepo(event), BaseEvent: *eventstore.BaseEventFromRepo(event),
} }
err := json.Unmarshal(event.Data, e) err := json.Unmarshal(event.Data, e)
if err != nil { if err != nil {
@ -90,39 +109,35 @@ func (e *UserAddedEvent) CheckPrevious() bool {
return true return true
} }
func (e *UserAddedEvent) EditorService() string {
return "test.suite"
}
func (e *UserAddedEvent) EditorUser() string {
return "adlerhurst"
}
func (e *UserAddedEvent) Type() eventstore.EventType {
return "user.added"
}
func (e *UserAddedEvent) Data() interface{} { func (e *UserAddedEvent) Data() interface{} {
return e return e
} }
func (e *UserAddedEvent) MetaData() *eventstore.EventMetaData {
return e.metaData
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// User first name changed event start // User first name changed event start
// ------------------------------------------------------------ // ------------------------------------------------------------
type UserFirstNameChangedEvent struct { type UserFirstNameChangedEvent struct {
FirstName string `json:"firstName"` eventstore.BaseEvent `json:"-"`
metaData *eventstore.EventMetaData `json:"-"`
FirstName string `json:"firstName"`
}
func NewUserFirstNameChangedEvent(firstName string) *UserFirstNameChangedEvent {
return &UserFirstNameChangedEvent{
FirstName: firstName,
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.firstName.changed",
},
}
} }
func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.firstName.changed", func(event *repository.Event) (eventstore.Event, error) { return "user.firstName.changed", func(event *repository.Event) (eventstore.Event, error) {
e := &UserFirstNameChangedEvent{ e := &UserFirstNameChangedEvent{
metaData: eventstore.MetaDataFromRepo(event), BaseEvent: *eventstore.BaseEventFromRepo(event),
} }
err := json.Unmarshal(event.Data, e) err := json.Unmarshal(event.Data, e)
if err != nil { if err != nil {
@ -136,38 +151,32 @@ func (e *UserFirstNameChangedEvent) CheckPrevious() bool {
return true return true
} }
func (e *UserFirstNameChangedEvent) EditorService() string {
return "test.suite"
}
func (e *UserFirstNameChangedEvent) EditorUser() string {
return "adlerhurst"
}
func (e *UserFirstNameChangedEvent) Type() eventstore.EventType {
return "user.firstName.changed"
}
func (e *UserFirstNameChangedEvent) Data() interface{} { func (e *UserFirstNameChangedEvent) Data() interface{} {
return e return e
} }
func (e *UserFirstNameChangedEvent) MetaData() *eventstore.EventMetaData {
return e.metaData
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// User password checked event start // User password checked event start
// ------------------------------------------------------------ // ------------------------------------------------------------
type UserPasswordCheckedEvent struct { type UserPasswordCheckedEvent struct {
metaData *eventstore.EventMetaData `json:"-"` eventstore.BaseEvent `json:"-"`
}
func NewUserPasswordCheckedEvent() *UserPasswordCheckedEvent {
return &UserPasswordCheckedEvent{
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.password.checked",
},
}
} }
func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.password.checked", func(event *repository.Event) (eventstore.Event, error) { return "user.password.checked", func(event *repository.Event) (eventstore.Event, error) {
return &UserPasswordCheckedEvent{ return &UserPasswordCheckedEvent{
metaData: eventstore.MetaDataFromRepo(event), BaseEvent: *eventstore.BaseEventFromRepo(event),
}, nil }, nil
} }
} }
@ -176,38 +185,32 @@ func (e *UserPasswordCheckedEvent) CheckPrevious() bool {
return false return false
} }
func (e *UserPasswordCheckedEvent) EditorService() string {
return "test.suite"
}
func (e *UserPasswordCheckedEvent) EditorUser() string {
return "adlerhurst"
}
func (e *UserPasswordCheckedEvent) Type() eventstore.EventType {
return "user.password.checked"
}
func (e *UserPasswordCheckedEvent) Data() interface{} { func (e *UserPasswordCheckedEvent) Data() interface{} {
return nil return nil
} }
func (e *UserPasswordCheckedEvent) MetaData() *eventstore.EventMetaData {
return e.metaData
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// User deleted event // User deleted event
// ------------------------------------------------------------ // ------------------------------------------------------------
type UserDeletedEvent struct { type UserDeletedEvent struct {
metaData *eventstore.EventMetaData `json:"-"` eventstore.BaseEvent `json:"-"`
}
func NewUserDeletedEvent() *UserDeletedEvent {
return &UserDeletedEvent{
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.deleted",
},
}
} }
func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) { func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.deleted", func(event *repository.Event) (eventstore.Event, error) { return "user.deleted", func(event *repository.Event) (eventstore.Event, error) {
return &UserDeletedEvent{ return &UserDeletedEvent{
metaData: eventstore.MetaDataFromRepo(event), BaseEvent: *eventstore.BaseEventFromRepo(event),
}, nil }, nil
} }
} }
@ -216,26 +219,10 @@ func (e *UserDeletedEvent) CheckPrevious() bool {
return false return false
} }
func (e *UserDeletedEvent) EditorService() string {
return "test.suite"
}
func (e *UserDeletedEvent) EditorUser() string {
return "adlerhurst"
}
func (e *UserDeletedEvent) Type() eventstore.EventType {
return "user.deleted"
}
func (e *UserDeletedEvent) Data() interface{} { func (e *UserDeletedEvent) Data() interface{} {
return nil return nil
} }
func (e *UserDeletedEvent) MetaData() *eventstore.EventMetaData {
return e.metaData
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// Users read model start // Users read model start
// ------------------------------------------------------------ // ------------------------------------------------------------
@ -258,18 +245,18 @@ func (rm *UsersReadModel) AppendEvents(events ...eventstore.Event) (err error) {
switch e := event.(type) { switch e := event.(type) {
case *UserAddedEvent: case *UserAddedEvent:
//insert //insert
user := NewUserReadModel(e.MetaData().AggregateID) user := NewUserReadModel(e.Base().AggregateID)
rm.Users = append(rm.Users, user) rm.Users = append(rm.Users, user)
err = user.AppendEvents(e) err = user.AppendEvents(e)
case *UserFirstNameChangedEvent, *UserPasswordCheckedEvent: case *UserFirstNameChangedEvent, *UserPasswordCheckedEvent:
//update //update
_, user := rm.userByID(e.MetaData().AggregateID) _, user := rm.userByID(e.Base().aggregateID)
if user == nil { if user == nil {
return errors.New("user not found") return errors.New("user not found")
} }
err = user.AppendEvents(e) err = user.AppendEvents(e)
case *UserDeletedEvent: case *UserDeletedEvent:
idx, _ := rm.userByID(e.metaData.AggregateID) idx, _ := rm.userByID(e.Base().AggregateID)
if idx < 0 { if idx < 0 {
return nil return nil
} }
@ -337,7 +324,7 @@ func (rm *UserReadModel) Reduce() error {
rm.FirstName = e.FirstName rm.FirstName = e.FirstName
case *UserPasswordCheckedEvent: case *UserPasswordCheckedEvent:
rm.pwCheckCount++ rm.pwCheckCount++
rm.lastPasswordCheck = e.metaData.CreationDate rm.lastPasswordCheck = e.Base().CreationDate
} }
} }
rm.ReadModel.Reduce() rm.ReadModel.Reduce()
@ -356,9 +343,9 @@ func TestUserReadModel(t *testing.T) {
RegisterFilterEventMapper(UserDeletedMapper()) RegisterFilterEventMapper(UserDeletedMapper())
events, err := es.PushAggregates(context.Background(), events, err := es.PushAggregates(context.Background(),
NewUserAggregate("1").AppendEvents(&UserAddedEvent{FirstName: "hodor"}), NewUserAggregate("1").AppendEvents(NewUserAddedEvent("hodor")),
NewUserAggregate("2").AppendEvents(&UserAddedEvent{FirstName: "hodor"}, &UserPasswordCheckedEvent{}, &UserPasswordCheckedEvent{}, &UserFirstNameChangedEvent{FirstName: "ueli"}), NewUserAggregate("2").AppendEvents(NewUserAddedEvent("hodor"), NewUserPasswordCheckedEvent(), NewUserPasswordCheckedEvent(), NewUserFirstNameChangedEvent("ueli")),
NewUserAggregate("2").AppendEvents(&UserDeletedEvent{}), NewUserAggregate("2").AppendEvents(NewUserDeletedEvent()),
) )
if err != nil { if err != nil {
t.Errorf("unexpected error on push aggregates: %v", err) t.Errorf("unexpected error on push aggregates: %v", err)

View File

@ -33,10 +33,10 @@ func (rm *ReadModel) Reduce() error {
} }
if rm.CreationDate.IsZero() { if rm.CreationDate.IsZero() {
rm.CreationDate = rm.Events[0].MetaData().CreationDate rm.CreationDate = rm.Events[0].Base().creationDate
} }
rm.ChangeDate = rm.Events[len(rm.Events)-1].MetaData().CreationDate rm.ChangeDate = rm.Events[len(rm.Events)-1].Base().creationDate
rm.ProcessedSequence = rm.Events[len(rm.Events)-1].MetaData().Sequence rm.ProcessedSequence = rm.Events[len(rm.Events)-1].Base().sequence
// all events processed and not needed anymore // all events processed and not needed anymore
rm.Events = nil rm.Events = nil
rm.Events = []Event{} rm.Events = []Event{}
@ -69,7 +69,7 @@ func (a *Aggregate) Reduce() error {
return nil return nil
} }
a.PreviousSequence = a.Events[len(a.Events)-1].MetaData().Sequence a.PreviousSequence = a.Events[len(a.Events)-1].Base().sequence
// all events processed and not needed anymore // all events processed and not needed anymore
a.Events = nil a.Events = nil
a.Events = []Event{} a.Events = []Event{}

View File

@ -0,0 +1,36 @@
package iam
import "github.com/caos/zitadel/internal/eventstore/v2"
type Aggregate struct {
eventstore.Aggregate
}
type Step int8
type SetupStepEvent struct {
eventstore.BaseEvent `json:"-"`
Step Step
//Done if the setup is started earlier
Done bool `json:"-"`
}
func (e *SetupStepEvent) CheckPrevious() bool {
return e.Type() == "iam.setup.started"
}
//Type implements event
func (e *SetupStepEvent) Type() eventstore.EventType {
if e.Done {
return "iam.setup.done"
}
return "iam.setup.started"
}
func (e *SetupStepEvent) Data() interface{} {
return e
}
type MemberAddedEvent struct {
}

View File

@ -0,0 +1,30 @@
package member
import (
"context"
"github.com/caos/zitadel/internal/eventstore/v2"
)
func NewMemberAddedEvent(ctx context.Context, userID string, roles ...string) *MemberAddedEvent {
return &MemberAddedEvent{
BaseEvent: eventstore.BaseEvent{},
Roles: roles,
UserID: userID,
}
}
type MemberAddedEvent struct {
eventstore.BaseEvent `json:"-"`
Roles []string `json:"roles"`
UserID string `json:"userId"`
}
func (e *MemberAddedEvent) CheckPrevious() bool {
return true
}
func (e *MemberAddedEvent) Data() interface{} {
return e
}

View File

@ -0,0 +1,19 @@
package member
import (
"github.com/caos/zitadel/internal/eventstore/v2"
)
type MemberRemovedEvent struct {
eventstore.BaseEvent `json:"-"`
UserID string `json:"userId"`
}
func (e *MemberRemovedEvent) CheckPrevious() bool {
return true
}
func (e *MemberRemovedEvent) Data() interface{} {
return e
}