mirror of
https://github.com/zitadel/zitadel.git
synced 2025-03-01 02:27:24 +00:00
refactor: eventstore v2
This commit is contained in:
parent
f2559c2027
commit
9342efa834
@ -14,27 +14,40 @@ type Event interface {
|
||||
//CheckPrevious ensures the event order if true
|
||||
// if false the previous sequence is not checked on push
|
||||
CheckPrevious() bool
|
||||
|
||||
//EditorService must return the name of the service which creates the new event
|
||||
EditorService() string
|
||||
//EditorUser must return the id of the user who created the event
|
||||
EditorUser() string
|
||||
//Type must return an event type which should be unique in the aggregate
|
||||
Type() EventType
|
||||
//Data returns the payload of the event. It represent the changed fields by the event
|
||||
// valid types are:
|
||||
// * nil (no payload),
|
||||
// * json byte array
|
||||
// * struct which can be marshalled to json
|
||||
// * pointer to struct which can be marshalled to json
|
||||
Data() interface{}
|
||||
PreviousSequence() uint64
|
||||
}
|
||||
|
||||
type eventAppender interface {
|
||||
//AppendEvents appends the passed events to an internal list of events
|
||||
AppendEvents(...Event) error
|
||||
//Eventstore abstracts all functions needed to store valid events
|
||||
// and filters the stored events
|
||||
type Eventstore struct {
|
||||
repo repository.Repository
|
||||
interceptorMutex sync.Mutex
|
||||
eventMapper map[EventType]eventTypeInterceptors
|
||||
}
|
||||
|
||||
type reducer interface {
|
||||
//Reduce handles the events of the internal events list
|
||||
// it only appends the newly added events
|
||||
Reduce() error
|
||||
type eventTypeInterceptors struct {
|
||||
filterMapper func(*repository.Event) (Event, error)
|
||||
}
|
||||
|
||||
//Health checks if the eventstore can properly work
|
||||
// It checks if the repository can serve load
|
||||
func (es *Eventstore) Health(ctx context.Context) error {
|
||||
return es.repo.Health(ctx)
|
||||
}
|
||||
|
||||
type aggregater interface {
|
||||
eventAppender
|
||||
reducer
|
||||
//ID returns the aggreagte id
|
||||
ID() string
|
||||
//Type returns the aggregate type
|
||||
@ -45,27 +58,11 @@ type aggregater interface {
|
||||
ResourceOwner() string
|
||||
//Version represents the semantic version of the aggregate
|
||||
Version() Version
|
||||
}
|
||||
type readModeler interface {
|
||||
eventAppender
|
||||
reducer
|
||||
}
|
||||
|
||||
type Eventstore struct {
|
||||
repo repository.Repository
|
||||
interceptorMutex sync.Mutex
|
||||
eventMapper map[EventType]eventTypeInterceptors
|
||||
}
|
||||
|
||||
type eventTypeInterceptors struct {
|
||||
pushMapper func(Event) (*repository.Event, error)
|
||||
filterMapper func(*repository.Event) (Event, error)
|
||||
}
|
||||
|
||||
//Health checks if the eventstore can properly work
|
||||
// It checks if the repository can serve load
|
||||
func (es *Eventstore) Health(ctx context.Context) error {
|
||||
return es.repo.Health(ctx)
|
||||
//PreviouseSequence should return the sequence of the latest event of this aggregate
|
||||
// stored in the eventstore
|
||||
// it's set to the first event of this push transaction,
|
||||
// later events consume the sequence of the previously pushed event
|
||||
PreviousSequence() uint64
|
||||
}
|
||||
|
||||
//PushAggregates maps the events of all aggregates to an eventstore event
|
||||
@ -102,8 +99,8 @@ func (es *Eventstore) aggregatesToEvents(aggregates []aggregater) ([]*repository
|
||||
Type: repository.EventType(event.Type()),
|
||||
Version: repository.Version(aggregate.Version()),
|
||||
PreviousEvent: previousEvent,
|
||||
PreviousSequence: aggregate.PreviousSequence(),
|
||||
Data: data,
|
||||
PreviousSequence: event.PreviousSequence(),
|
||||
})
|
||||
previousEvent = events[len(events)-1]
|
||||
}
|
||||
@ -146,32 +143,28 @@ func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Even
|
||||
return mappedEvents, nil
|
||||
}
|
||||
|
||||
//FilterToAggregate filters the events based on the searchQuery, appends all events to the aggregate and reduces the aggregate
|
||||
func (es *Eventstore) FilterToAggregate(ctx context.Context, searchQuery *SearchQueryFactory, aggregate aggregater) (err error) {
|
||||
type reducer interface {
|
||||
//Reduce handles the events of the internal events list
|
||||
// it only appends the newly added events
|
||||
Reduce() error
|
||||
//AppendEvents appends the passed events to an internal list of events
|
||||
AppendEvents(...Event) error
|
||||
}
|
||||
|
||||
//FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
|
||||
func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryFactory, r reducer) error {
|
||||
events, err := es.FilterEvents(ctx, searchQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = aggregate.AppendEvents(events...); err != nil {
|
||||
if err = r.AppendEvents(events...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return aggregate.Reduce()
|
||||
}
|
||||
|
||||
//FilterToReadModel filters the events based on the searchQuery, appends all events to the readModel and reduces the readModel
|
||||
func (es *Eventstore) FilterToReadModel(ctx context.Context, searchQuery *SearchQueryFactory, readModel readModeler) (err error) {
|
||||
events, err := es.FilterEvents(ctx, searchQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = readModel.AppendEvents(events...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return readModel.Reduce()
|
||||
return r.Reduce()
|
||||
}
|
||||
|
||||
//LatestSequence filters the latest sequence for the given search query
|
||||
func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryFactory) (uint64, error) {
|
||||
query, err := queryFactory.Build()
|
||||
if err != nil {
|
||||
@ -180,7 +173,7 @@ func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQu
|
||||
return es.repo.LatestSequence(ctx, query)
|
||||
}
|
||||
|
||||
//RegisterPushEventMapper registers a function for mapping an eventstore event to an event
|
||||
//RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
|
||||
func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) error {
|
||||
if eventType == "" || mapper == nil {
|
||||
return errors.ThrowInvalidArgument(nil, "V2-IPpUR", "eventType and mapper must be filled")
|
||||
@ -196,22 +189,6 @@ func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func
|
||||
return nil
|
||||
}
|
||||
|
||||
//RegisterPushEventMapper registers a function for mapping an event to an eventstore event
|
||||
func (es *Eventstore) RegisterPushEventMapper(eventType EventType, mapper func(Event) (*repository.Event, error)) error {
|
||||
if eventType == "" || mapper == nil {
|
||||
return errors.ThrowInvalidArgument(nil, "V2-Kexpp", "eventType and mapper must be filled")
|
||||
}
|
||||
|
||||
es.interceptorMutex.Lock()
|
||||
defer es.interceptorMutex.Unlock()
|
||||
|
||||
interceptor := es.eventMapper[eventType]
|
||||
interceptor.pushMapper = mapper
|
||||
es.eventMapper[eventType] = interceptor
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func eventData(event Event) ([]byte, error) {
|
||||
switch data := event.Data().(type) {
|
||||
case nil:
|
||||
|
@ -36,130 +36,6 @@ func (e *testEvent) PreviousSequence() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func testPushMapper(Event) (*repository.Event, error) {
|
||||
return &repository.Event{AggregateID: "aggregateID"}, nil
|
||||
}
|
||||
|
||||
func Test_eventstore_RegisterPushEventMapper(t *testing.T) {
|
||||
type fields struct {
|
||||
eventMapper map[EventType]eventTypeInterceptors
|
||||
}
|
||||
type args struct {
|
||||
eventType EventType
|
||||
mapper func(Event) (*repository.Event, error)
|
||||
}
|
||||
type res struct {
|
||||
event *repository.Event
|
||||
isErr func(error) bool
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
res res
|
||||
}{
|
||||
{
|
||||
name: "no event type",
|
||||
args: args{
|
||||
eventType: "",
|
||||
mapper: testPushMapper,
|
||||
},
|
||||
fields: fields{
|
||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
||||
},
|
||||
res: res{
|
||||
isErr: errors.IsErrorInvalidArgument,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no event mapper",
|
||||
args: args{
|
||||
eventType: "event.type",
|
||||
mapper: nil,
|
||||
},
|
||||
fields: fields{
|
||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
||||
},
|
||||
res: res{
|
||||
isErr: errors.IsErrorInvalidArgument,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "new interceptor",
|
||||
fields: fields{
|
||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
||||
},
|
||||
args: args{
|
||||
eventType: "new.event",
|
||||
mapper: testPushMapper,
|
||||
},
|
||||
res: res{
|
||||
event: &repository.Event{AggregateID: "aggregateID"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "existing interceptor new push mapper",
|
||||
fields: fields{
|
||||
eventMapper: map[EventType]eventTypeInterceptors{
|
||||
"existing": {},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
eventType: "new.event",
|
||||
mapper: testPushMapper,
|
||||
},
|
||||
res: res{
|
||||
event: &repository.Event{AggregateID: "aggregateID"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "existing interceptor existing push mapper",
|
||||
fields: fields{
|
||||
eventMapper: map[EventType]eventTypeInterceptors{
|
||||
"existing": {
|
||||
pushMapper: func(Event) (*repository.Event, error) {
|
||||
return nil, errors.ThrowUnimplemented(nil, "V2-1qPvn", "unimplemented")
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
eventType: "new.event",
|
||||
mapper: testPushMapper,
|
||||
},
|
||||
res: res{
|
||||
event: &repository.Event{AggregateID: "aggregateID"},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
es := &Eventstore{
|
||||
eventMapper: tt.fields.eventMapper,
|
||||
}
|
||||
err := es.RegisterPushEventMapper(tt.args.eventType, tt.args.mapper)
|
||||
if (tt.res.isErr != nil && !tt.res.isErr(err)) || (tt.res.isErr == nil && err != nil) {
|
||||
t.Errorf("wrong error got: %v", err)
|
||||
return
|
||||
}
|
||||
if tt.res.isErr != nil {
|
||||
return
|
||||
}
|
||||
|
||||
mapper := es.eventMapper[tt.args.eventType]
|
||||
event, err := mapper.pushMapper(nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tt.res.event, event) {
|
||||
t.Errorf("events should be deep equal. \ngot %v\nwant %v", event, tt.res.event)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testFilterMapper(*repository.Event) (Event, error) {
|
||||
return &testEvent{description: "hodor"}, nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user