fix: Previous sequence (#1086)

* feat: remove previous sequence check

* feat: object creation date

* feat: simplify member write model

* feat: simplify write model

* feat: simplify write model
This commit is contained in:
Fabi
2020-12-14 17:24:01 +01:00
committed by GitHub
parent 5b433dcaa3
commit 7f26f1815b
60 changed files with 229 additions and 693 deletions

View File

@@ -21,7 +21,7 @@ func (o *ObjectRoot) AppendEvent(event *Event) {
}
o.ChangeDate = event.CreationDate
if event.PreviousSequence == 0 {
if o.CreationDate.IsZero() {
o.CreationDate = o.ChangeDate
}

View File

@@ -18,9 +18,6 @@ type EventPusher interface {
// * struct which can be marshalled to json
// * pointer to struct which can be marshalled to json
Data() interface{}
//CheckPrevious ensures the event order if true
// if false the previous sequence is not checked on push
CheckPrevious() bool
}
type EventReader interface {

View File

@@ -65,16 +65,15 @@ func (e *BaseEvent) CreationDate() time.Time {
func BaseEventFromRepo(event *repository.Event) *BaseEvent {
return &BaseEvent{
aggregateID: event.AggregateID,
aggregateType: AggregateType(event.AggregateType),
aggregateVersion: Version(event.Version),
EventType: EventType(event.Type),
creationDate: event.CreationDate,
sequence: event.Sequence,
previouseSequence: event.PreviousSequence,
resourceOwner: event.ResourceOwner,
Service: event.EditorService,
User: event.EditorUser,
aggregateID: event.AggregateID,
aggregateType: AggregateType(event.AggregateType),
aggregateVersion: Version(event.Version),
EventType: EventType(event.Type),
creationDate: event.CreationDate,
sequence: event.Sequence,
resourceOwner: event.ResourceOwner,
Service: event.EditorService,
User: event.EditorUser,
}
}

View File

@@ -73,17 +73,16 @@ func (es *Eventstore) aggregatesToEvents(aggregates []aggregater) ([]*repository
return nil, err
}
events = append(events, &repository.Event{
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
PreviousEvent: previousEvent,
PreviousSequence: aggregate.PreviousSequence(),
Data: data,
CheckPreviousSequence: event.CheckPrevious(),
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
PreviousEvent: previousEvent,
PreviousSequence: aggregate.PreviousSequence(),
Data: data,
})
previousEvent = events[len(events)-1]
}

View File

@@ -64,10 +64,6 @@ func newTestEvent(description string, data func() interface{}, checkPrevious boo
}
}
func (e *testEvent) CheckPrevious() bool {
return e.shouldCheckPrevious
}
func (e *testEvent) Data() interface{} {
return e.data()
}
@@ -392,15 +388,14 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
wantErr: false,
events: []*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
},
@@ -432,26 +427,24 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
wantErr: false,
events: linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
},
@@ -516,39 +509,36 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
events: combineEventLists(
linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
[]*repository.Event{
{
AggregateID: "2",
AggregateType: "test.aggregate",
CheckPreviousSequence: true,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "2",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
),
@@ -661,15 +651,14 @@ func TestEventstore_Push(t *testing.T) {
t: t,
events: []*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
},
@@ -708,26 +697,24 @@ func TestEventstore_Push(t *testing.T) {
t: t,
events: linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
},
@@ -781,39 +768,36 @@ func TestEventstore_Push(t *testing.T) {
events: combineEventLists(
linkEvents(
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
&repository.Event{
AggregateID: "1",
AggregateType: "test.aggregate",
CheckPreviousSequence: false,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
),
[]*repository.Event{
{
AggregateID: "2",
AggregateType: "test.aggregate",
CheckPreviousSequence: true,
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
AggregateID: "2",
AggregateType: "test.aggregate",
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
Type: "test.event",
Version: "v1",
},
},
),
@@ -1335,9 +1319,6 @@ func compareEvents(t *testing.T, want, got *repository.Event) {
if want.AggregateType != got.AggregateType {
t.Errorf("wrong aggregateType got %q want %q", want.AggregateType, got.AggregateType)
}
if want.CheckPreviousSequence != got.CheckPreviousSequence {
t.Errorf("wrong check previous got %v want %v", want.CheckPreviousSequence, got.CheckPreviousSequence)
}
if !reflect.DeepEqual(want.Data, got.Data) {
t.Errorf("wrong data got %s want %s", string(want.Data), string(got.Data))
}

View File

@@ -80,10 +80,6 @@ func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (even
}
}
func (e *UserAddedEvent) CheckPrevious() bool {
return true
}
func (e *UserAddedEvent) Data() interface{} {
return e
}
@@ -122,10 +118,6 @@ func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event)
}
}
func (e *UserFirstNameChangedEvent) CheckPrevious() bool {
return true
}
func (e *UserFirstNameChangedEvent) Data() interface{} {
return e
}
@@ -156,10 +148,6 @@ func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event)
}
}
func (e *UserPasswordCheckedEvent) CheckPrevious() bool {
return false
}
func (e *UserPasswordCheckedEvent) Data() interface{} {
return nil
}
@@ -190,10 +178,6 @@ func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventst
}
}
func (e *UserDeletedEvent) CheckPrevious() bool {
return false
}
func (e *UserDeletedEvent) Data() interface{} {
return nil
}

View File

@@ -20,10 +20,6 @@ type Event struct {
// it implements a linked list
PreviousEvent *Event
//CheckPreviousSequence decides if the event can only be written
// if event.PreviousSequence == max(event_sequence) of this aggregate
CheckPreviousSequence bool
//CreationDate is the time the event is created
// it's used for human readability.
// Don't use it for event ordering,

View File

@@ -28,7 +28,6 @@ const (
" editor_service, " +
" resource_owner, " +
" previous_sequence, " +
" check_previous, " +
// variables below are calculated
" max_event_seq " +
") AS ( " +
@@ -45,7 +44,6 @@ const (
" $8::VARCHAR, " +
" resource_owner, " +
" $10::BIGINT, " +
" $11::BOOLEAN," +
" MAX(event_sequence) AS max_event_seq " +
" FROM eventstore.events " +
" WHERE " +
@@ -65,7 +63,6 @@ const (
" $8::VARCHAR, " +
" $9::VARCHAR, " +
" $10::BIGINT, " +
" $11::BOOLEAN, " +
" NULL::BIGINT " +
" ) " +
" ) " +
@@ -96,24 +93,8 @@ const (
" editor_user, " +
" editor_service, " +
" resource_owner, " +
" ( " +
" SELECT " +
" CASE " +
" WHEN NOT check_previous " +
" THEN NULL " +
" ELSE previous_sequence " +
" END" +
" ) " +
" previous_sequence " +
" FROM input_event " +
" WHERE 1 = " +
" CASE " +
" WHEN NOT check_previous " +
" THEN 1 " +
" ELSE ( " +
" SELECT 1 FROM input_event " +
" WHERE (max_event_seq IS NULL AND previous_sequence IS NULL) OR (max_event_seq IS NOT NULL AND max_event_seq = previous_sequence) " +
" ) " +
" END " +
" ) " +
"RETURNING id, event_sequence, previous_sequence, creation_date, resource_owner "
)
@@ -160,7 +141,6 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
event.EditorService,
event.ResourceOwner,
previousSequence,
event.CheckPreviousSequence,
).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate, &event.ResourceOwner)
event.PreviousSequence = uint64(previousSequence)