mirror of
https://github.com/zitadel/zitadel.git
synced 2025-04-16 16:51:38 +00:00
fix: eventstore tests
This commit is contained in:
parent
faee29bbb6
commit
727d783478
53
internal/eventstore/v2/event.go
Normal file
53
internal/eventstore/v2/event.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package eventstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/caos/zitadel/internal/eventstore/v2/repository"
|
||||||
|
)
|
||||||
|
|
||||||
|
//Event is the representation of a state change
|
||||||
|
type Event interface {
|
||||||
|
//CheckPrevious ensures the event order if true
|
||||||
|
// if false the previous sequence is not checked on push
|
||||||
|
CheckPrevious() bool
|
||||||
|
//EditorService must return the name of the service which creates the new event
|
||||||
|
EditorService() string
|
||||||
|
//EditorUser must return the id of the user who created the event
|
||||||
|
EditorUser() string
|
||||||
|
//Type must return an event type which should be unique in the aggregate
|
||||||
|
Type() EventType
|
||||||
|
//Data returns the payload of the event. It represent the changed fields by the event
|
||||||
|
// valid types are:
|
||||||
|
// * nil (no payload),
|
||||||
|
// * json byte array
|
||||||
|
// * struct which can be marshalled to json
|
||||||
|
// * pointer to struct which can be marshalled to json
|
||||||
|
Data() interface{}
|
||||||
|
//MetaData returns all data saved on a event
|
||||||
|
// It must not be set on push
|
||||||
|
// The event mapper function must set this struct
|
||||||
|
MetaData() *EventMetaData
|
||||||
|
}
|
||||||
|
|
||||||
|
func MetaDataFromRepo(event *repository.Event) *EventMetaData {
|
||||||
|
return &EventMetaData{
|
||||||
|
AggregateID: event.AggregateID,
|
||||||
|
AggregateType: AggregateType(event.AggregateType),
|
||||||
|
AggregateVersion: Version(event.Version),
|
||||||
|
PreviouseSequence: event.PreviousSequence,
|
||||||
|
ResourceOwner: event.ResourceOwner,
|
||||||
|
Sequence: event.Sequence,
|
||||||
|
CreationDate: event.CreationDate,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventMetaData struct {
|
||||||
|
AggregateID string
|
||||||
|
AggregateType AggregateType
|
||||||
|
ResourceOwner string
|
||||||
|
AggregateVersion Version
|
||||||
|
Sequence uint64
|
||||||
|
PreviouseSequence uint64
|
||||||
|
CreationDate time.Time
|
||||||
|
}
|
@ -5,64 +5,17 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/caos/zitadel/internal/errors"
|
"github.com/caos/zitadel/internal/errors"
|
||||||
"github.com/caos/zitadel/internal/eventstore/v2/repository"
|
"github.com/caos/zitadel/internal/eventstore/v2/repository"
|
||||||
)
|
)
|
||||||
|
|
||||||
//Event is the representation of a state change
|
|
||||||
type Event interface {
|
|
||||||
//CheckPrevious ensures the event order if true
|
|
||||||
// if false the previous sequence is not checked on push
|
|
||||||
CheckPrevious() bool
|
|
||||||
//EditorService must return the name of the service which creates the new event
|
|
||||||
EditorService() string
|
|
||||||
//EditorUser must return the id of the user who created the event
|
|
||||||
EditorUser() string
|
|
||||||
//Type must return an event type which should be unique in the aggregate
|
|
||||||
Type() EventType
|
|
||||||
//Data returns the payload of the event. It represent the changed fields by the event
|
|
||||||
// valid types are:
|
|
||||||
// * nil (no payload),
|
|
||||||
// * json byte array
|
|
||||||
// * struct which can be marshalled to json
|
|
||||||
// * pointer to struct which can be marshalled to json
|
|
||||||
Data() interface{}
|
|
||||||
//MetaData returns all data saved on a event
|
|
||||||
// It must not be set on push
|
|
||||||
// The event mapper function must set this struct
|
|
||||||
MetaData() *EventMetaData
|
|
||||||
}
|
|
||||||
|
|
||||||
func MetaDataFromRepo(event *repository.Event) *EventMetaData {
|
|
||||||
return &EventMetaData{
|
|
||||||
AggregateID: event.AggregateID,
|
|
||||||
AggregateType: AggregateType(event.AggregateType),
|
|
||||||
AggregateVersion: Version(event.Version),
|
|
||||||
PreviouseSequence: event.PreviousSequence,
|
|
||||||
ResourceOwner: event.ResourceOwner,
|
|
||||||
Sequence: event.Sequence,
|
|
||||||
CreationDate: event.CreationDate,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type EventMetaData struct {
|
|
||||||
AggregateID string
|
|
||||||
AggregateType AggregateType
|
|
||||||
ResourceOwner string
|
|
||||||
AggregateVersion Version
|
|
||||||
Sequence uint64
|
|
||||||
PreviouseSequence uint64
|
|
||||||
CreationDate time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
//Eventstore abstracts all functions needed to store valid events
|
//Eventstore abstracts all functions needed to store valid events
|
||||||
// and filters the stored events
|
// and filters the stored events
|
||||||
type Eventstore struct {
|
type Eventstore struct {
|
||||||
repo repository.Repository
|
repo repository.Repository
|
||||||
interceptorMutex sync.Mutex
|
interceptorMutex sync.Mutex
|
||||||
eventMapper map[EventType]eventTypeInterceptors
|
eventInterceptors map[EventType]eventTypeInterceptors
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventTypeInterceptors struct {
|
type eventTypeInterceptors struct {
|
||||||
@ -71,9 +24,9 @@ type eventTypeInterceptors struct {
|
|||||||
|
|
||||||
func NewEventstore(repo repository.Repository) *Eventstore {
|
func NewEventstore(repo repository.Repository) *Eventstore {
|
||||||
return &Eventstore{
|
return &Eventstore{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
interceptorMutex: sync.Mutex{},
|
interceptorMutex: sync.Mutex{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,7 +122,7 @@ func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Even
|
|||||||
defer es.interceptorMutex.Unlock()
|
defer es.interceptorMutex.Unlock()
|
||||||
|
|
||||||
for i, event := range events {
|
for i, event := range events {
|
||||||
interceptors, ok := es.eventMapper[EventType(event.Type)]
|
interceptors, ok := es.eventInterceptors[EventType(event.Type)]
|
||||||
if !ok || interceptors.eventMapper == nil {
|
if !ok || interceptors.eventMapper == nil {
|
||||||
return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined")
|
return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined")
|
||||||
}
|
}
|
||||||
@ -220,9 +173,9 @@ func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func
|
|||||||
es.interceptorMutex.Lock()
|
es.interceptorMutex.Lock()
|
||||||
defer es.interceptorMutex.Unlock()
|
defer es.interceptorMutex.Unlock()
|
||||||
|
|
||||||
interceptor := es.eventMapper[eventType]
|
interceptor := es.eventInterceptors[eventType]
|
||||||
interceptor.eventMapper = mapper
|
interceptor.eventMapper = mapper
|
||||||
es.eventMapper[eventType] = interceptor
|
es.eventInterceptors[eventType] = interceptor
|
||||||
|
|
||||||
return es
|
return es
|
||||||
}
|
}
|
||||||
|
@ -176,11 +176,11 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
|
|||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
eventMapper: tt.fields.eventMapper,
|
eventInterceptors: tt.fields.eventMapper,
|
||||||
}
|
}
|
||||||
es = es.RegisterFilterEventMapper(tt.args.eventType, tt.args.mapper)
|
es = es.RegisterFilterEventMapper(tt.args.eventType, tt.args.mapper)
|
||||||
if len(es.eventMapper) != tt.res.mapperCount {
|
if len(es.eventInterceptors) != tt.res.mapperCount {
|
||||||
t.Errorf("unexpected mapper count: want %d, got %d", tt.res.mapperCount, len(es.eventMapper))
|
t.Errorf("unexpected mapper count: want %d, got %d", tt.res.mapperCount, len(es.eventInterceptors))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -188,7 +188,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
mapper := es.eventMapper[tt.args.eventType]
|
mapper := es.eventInterceptors[tt.args.eventType]
|
||||||
event, err := mapper.eventMapper(nil)
|
event, err := mapper.eventMapper(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
@ -846,15 +846,15 @@ func TestEventstore_Push(t *testing.T) {
|
|||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
repo: tt.fields.repo,
|
repo: tt.fields.repo,
|
||||||
interceptorMutex: sync.Mutex{},
|
interceptorMutex: sync.Mutex{},
|
||||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
for eventType, mapper := range tt.fields.eventMapper {
|
for eventType, mapper := range tt.fields.eventMapper {
|
||||||
es = es.RegisterFilterEventMapper(eventType, mapper)
|
es = es.RegisterFilterEventMapper(eventType, mapper)
|
||||||
}
|
}
|
||||||
if len(es.eventMapper) != len(tt.fields.eventMapper) {
|
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper))
|
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -970,16 +970,16 @@ func TestEventstore_FilterEvents(t *testing.T) {
|
|||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
repo: tt.fields.repo,
|
repo: tt.fields.repo,
|
||||||
interceptorMutex: sync.Mutex{},
|
interceptorMutex: sync.Mutex{},
|
||||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
|
|
||||||
for eventType, mapper := range tt.fields.eventMapper {
|
for eventType, mapper := range tt.fields.eventMapper {
|
||||||
es = es.RegisterFilterEventMapper(eventType, mapper)
|
es = es.RegisterFilterEventMapper(eventType, mapper)
|
||||||
}
|
}
|
||||||
if len(es.eventMapper) != len(tt.fields.eventMapper) {
|
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper))
|
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1257,15 +1257,15 @@ func TestEventstore_FilterToReducer(t *testing.T) {
|
|||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
repo: tt.fields.repo,
|
repo: tt.fields.repo,
|
||||||
interceptorMutex: sync.Mutex{},
|
interceptorMutex: sync.Mutex{},
|
||||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
for eventType, mapper := range tt.fields.eventMapper {
|
for eventType, mapper := range tt.fields.eventMapper {
|
||||||
es = es.RegisterFilterEventMapper(eventType, mapper)
|
es = es.RegisterFilterEventMapper(eventType, mapper)
|
||||||
}
|
}
|
||||||
if len(es.eventMapper) != len(tt.fields.eventMapper) {
|
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper))
|
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1410,14 +1410,14 @@ func TestEventstore_mapEvents(t *testing.T) {
|
|||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
interceptorMutex: sync.Mutex{},
|
interceptorMutex: sync.Mutex{},
|
||||||
eventMapper: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
for eventType, mapper := range tt.fields.eventMapper {
|
for eventType, mapper := range tt.fields.eventMapper {
|
||||||
es = es.RegisterFilterEventMapper(eventType, mapper)
|
es = es.RegisterFilterEventMapper(eventType, mapper)
|
||||||
}
|
}
|
||||||
if len(es.eventMapper) != len(tt.fields.eventMapper) {
|
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventMapper))
|
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,6 +196,46 @@ func (e *UserPasswordCheckedEvent) MetaData() *eventstore.EventMetaData {
|
|||||||
return e.metaData
|
return e.metaData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------
|
||||||
|
// User deleted event
|
||||||
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
|
type UserDeletedEvent struct {
|
||||||
|
metaData *eventstore.EventMetaData `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
|
||||||
|
return "user.deleted", func(event *repository.Event) (eventstore.Event, error) {
|
||||||
|
return &UserDeletedEvent{
|
||||||
|
metaData: eventstore.MetaDataFromRepo(event),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *UserDeletedEvent) CheckPrevious() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *UserDeletedEvent) EditorService() string {
|
||||||
|
return "test.suite"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *UserDeletedEvent) EditorUser() string {
|
||||||
|
return "adlerhurst"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *UserDeletedEvent) Type() eventstore.EventType {
|
||||||
|
return "user.deleted"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *UserDeletedEvent) Data() interface{} {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *UserDeletedEvent) MetaData() *eventstore.EventMetaData {
|
||||||
|
return e.metaData
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
// Users read model start
|
// Users read model start
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
@ -228,15 +268,20 @@ func (rm *UsersReadModel) AppendEvents(events ...eventstore.Event) (err error) {
|
|||||||
return errors.New("user not found")
|
return errors.New("user not found")
|
||||||
}
|
}
|
||||||
err = user.AppendEvents(e)
|
err = user.AppendEvents(e)
|
||||||
|
case *UserDeletedEvent:
|
||||||
|
idx, _ := rm.userByID(e.metaData.AggregateID)
|
||||||
|
if idx < 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
copy(rm.Users[idx:], rm.Users[idx+1:])
|
||||||
|
rm.Users[len(rm.Users)-1] = nil // or the zero value of T
|
||||||
|
rm.Users = rm.Users[:len(rm.Users)-1]
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//begin
|
|
||||||
//for stmt range stmts; exec
|
|
||||||
//commit
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,11 +352,13 @@ func TestUserReadModel(t *testing.T) {
|
|||||||
es := eventstore.NewEventstore(sql.NewCRDB(testCRDBClient))
|
es := eventstore.NewEventstore(sql.NewCRDB(testCRDBClient))
|
||||||
es.RegisterFilterEventMapper(UserAddedEventMapper()).
|
es.RegisterFilterEventMapper(UserAddedEventMapper()).
|
||||||
RegisterFilterEventMapper(UserFirstNameChangedMapper()).
|
RegisterFilterEventMapper(UserFirstNameChangedMapper()).
|
||||||
RegisterFilterEventMapper(UserPasswordCheckedMapper())
|
RegisterFilterEventMapper(UserPasswordCheckedMapper()).
|
||||||
|
RegisterFilterEventMapper(UserDeletedMapper())
|
||||||
|
|
||||||
events, err := es.PushAggregates(context.Background(),
|
events, err := es.PushAggregates(context.Background(),
|
||||||
NewUserAggregate("1").AppendEvents(&UserAddedEvent{FirstName: "hodor"}),
|
NewUserAggregate("1").AppendEvents(&UserAddedEvent{FirstName: "hodor"}),
|
||||||
NewUserAggregate("2").AppendEvents(&UserAddedEvent{FirstName: "hodor"}, &UserPasswordCheckedEvent{}, &UserPasswordCheckedEvent{}, &UserFirstNameChangedEvent{FirstName: "ueli"}),
|
NewUserAggregate("2").AppendEvents(&UserAddedEvent{FirstName: "hodor"}, &UserPasswordCheckedEvent{}, &UserPasswordCheckedEvent{}, &UserFirstNameChangedEvent{FirstName: "ueli"}),
|
||||||
|
NewUserAggregate("2").AppendEvents(&UserDeletedEvent{}),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error on push aggregates: %v", err)
|
t.Errorf("unexpected error on push aggregates: %v", err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user