This commit is contained in:
adlerhurst
2020-11-11 17:51:44 +01:00
parent 4e0577e74f
commit 720fea4bcc
19 changed files with 556 additions and 301 deletions

View File

@@ -1,34 +1,65 @@
package eventstore
func NewAggregate(id string) *Aggregate {
func NewAggregate(
id string,
typ AggregateType,
resourceOwner string,
version Version,
previousSequence uint64,
) *Aggregate {
return &Aggregate{
ID: id,
Events: []EventPusher{},
id: id,
typ: typ,
resourceOwner: resourceOwner,
version: version,
previousSequence: previousSequence,
events: []EventPusher{},
}
}
//Aggregate is the basic implementation of aggregater
type Aggregate struct {
PreviousSequence uint64 `json:"-"`
ID string `json:"-"`
Events []EventPusher `json:"-"`
id string `json:"-"`
typ AggregateType `json:"-"`
events []EventPusher `json:"-"`
resourceOwner string `json:"-"`
version Version `json:"-"`
previousSequence uint64 `json:"-"`
}
//AppendEvents adds all the events to the aggregate.
// The function doesn't compute the new state of the aggregate
func (a *Aggregate) AppendEvents(events ...EventPusher) *Aggregate {
a.Events = append(a.Events, events...)
//PushEvents adds all the events to the aggregate.
// The added events will be pushed to eventstore
func (a *Aggregate) PushEvents(events ...EventPusher) *Aggregate {
a.events = append(a.events, events...)
return a
}
//Reduce must be the last step in the reduce function of the extension
func (a *Aggregate) Reduce() error {
if len(a.Events) == 0 {
return nil
}
a.PreviousSequence = a.Events[len(a.Events)-1].Sequence()
// all events processed and not needed anymore
a.Events = nil
a.Events = []Event{}
return nil
//ID implements aggregater
func (a *Aggregate) ID() string {
return a.id
}
//Type implements aggregater
func (a *Aggregate) Type() AggregateType {
return a.typ
}
//Events implements aggregater
func (a *Aggregate) Events() []EventPusher {
return a.events
}
//ResourceOwner implements aggregater
func (a *Aggregate) ResourceOwner() string {
return a.resourceOwner
}
//Version implements aggregater
func (a *Aggregate) Version() Version {
return a.version
}
//PreviousSequence implements aggregater
func (a *Aggregate) PreviousSequence() uint64 {
return a.previousSequence
}

View File

@@ -58,7 +58,6 @@ func newTestEvent(description string, data func() interface{}, checkPrevious boo
shouldCheckPrevious: checkPrevious,
BaseEvent: *NewBaseEventForPush(
authz.NewMockContext("resourceOwner", "editorUser"),
"editorService",
"test.event",
),
}

View File

@@ -23,43 +23,20 @@ type UserAggregate struct {
FirstName string
}
func (a *UserAggregate) ID() string {
return a.Aggregate.ID
}
func (a *UserAggregate) Type() eventstore.AggregateType {
return "test.user"
}
func (a *UserAggregate) Events() []eventstore.EventPusher {
events := make([]eventstore.EventPusher, len(a.Aggregate.Events))
for i, event := range a.Aggregate.Events {
events[i] = event
}
return events
}
func (a *UserAggregate) ResourceOwner() string {
return "caos"
}
func (a *UserAggregate) Version() eventstore.Version {
return "v1"
}
func (a *UserAggregate) PreviousSequence() uint64 {
return a.Aggregate.PreviousSequence
}
func NewUserAggregate(id string) *UserAggregate {
return &UserAggregate{
Aggregate: *eventstore.NewAggregate(id),
Aggregate: *eventstore.NewAggregate(
id,
"test.user",
"caos",
"v1",
0,
),
}
}
func (rm *UserAggregate) AppendEvents(events ...eventstore.EventReader) *UserAggregate {
rm.Aggregate.AppendEvents(events...)
return rm
}
func (rm *UserAggregate) Reduce() error {
for _, event := range rm.Aggregate.Events {
for _, event := range rm.Aggregate.Events() {
switch e := event.(type) {
case *UserAddedEvent:
rm.FirstName = e.FirstName
@@ -67,7 +44,7 @@ func (rm *UserAggregate) Reduce() error {
rm.FirstName = e.FirstName
}
}
return rm.Aggregate.Reduce()
return nil
}
// ------------------------------------------------------------
@@ -228,12 +205,13 @@ func (e *UserDeletedEvent) Data() interface{} {
type UsersReadModel struct {
eventstore.ReadModel
Users []*UserReadModel
}
func NewUsersReadModel() *UsersReadModel {
return &UsersReadModel{
ReadModel: *eventstore.NewReadModel(""),
ReadModel: *eventstore.NewReadModel(),
Users: []*UserReadModel{},
}
}
@@ -284,7 +262,7 @@ func (rm *UsersReadModel) Reduce() error {
func (rm *UsersReadModel) userByID(id string) (idx int, user *UserReadModel) {
for idx, user = range rm.Users {
if user.ReadModel.ID == id {
if user.ID == id {
return idx, user
}
}
@@ -298,6 +276,8 @@ func (rm *UsersReadModel) userByID(id string) (idx int, user *UserReadModel) {
type UserReadModel struct {
eventstore.ReadModel
ID string
FirstName string
pwCheckCount int
lastPasswordCheck time.Time
@@ -305,7 +285,8 @@ type UserReadModel struct {
func NewUserReadModel(id string) *UserReadModel {
return &UserReadModel{
ReadModel: *eventstore.NewReadModel(id),
ReadModel: *eventstore.NewReadModel(),
ID: id,
}
}
@@ -342,9 +323,9 @@ func TestUserReadModel(t *testing.T) {
RegisterFilterEventMapper(UserDeletedMapper())
events, err := es.PushAggregates(context.Background(),
NewUserAggregate("1").AppendEvents(NewUserAddedEvent("hodor")),
NewUserAggregate("2").AppendEvents(NewUserAddedEvent("hodor"), NewUserPasswordCheckedEvent(), NewUserPasswordCheckedEvent(), NewUserFirstNameChangedEvent("ueli")),
NewUserAggregate("2").AppendEvents(NewUserDeletedEvent()),
NewUserAggregate("1").PushEvents(NewUserAddedEvent("hodor")),
NewUserAggregate("2").PushEvents(NewUserAddedEvent("hodor"), NewUserPasswordCheckedEvent(), NewUserPasswordCheckedEvent(), NewUserFirstNameChangedEvent("ueli")),
NewUserAggregate("2").PushEvents(NewUserDeletedEvent()),
)
if err != nil {
t.Errorf("unexpected error on push aggregates: %v", err)

View File

@@ -2,21 +2,21 @@ package eventstore
import "time"
func NewReadModel( /*id string*/ ) *ReadModel {
func NewReadModel() *ReadModel {
return &ReadModel{
// ID: id,
Events: []EventReader{},
}
}
//ReadModel is the minimum representation of a View model.
// It implements a basic reducer
// it might be saved in a database or in memory
type ReadModel struct {
ProcessedSequence uint64 `json:"-"`
// ID string `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []EventReader `json:"-"`
AggregateID string `json:"-"`
ProcessedSequence uint64 `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []EventReader `json:"-"`
}
//AppendEvents adds all the events to the read model.
@@ -26,12 +26,17 @@ func (rm *ReadModel) AppendEvents(events ...EventReader) *ReadModel {
return rm
}
//Reduce must be the last step in the reduce function of the extension
//Reduce is the basic implementaion of reducer
// If this function is extended the extending function should be the last step
func (rm *ReadModel) Reduce() error {
if len(rm.Events) == 0 {
return nil
}
if rm.AggregateID == "" {
rm.AggregateID = rm.Events[0].AggregateID()
}
if rm.CreationDate.IsZero() {
rm.CreationDate = rm.Events[0].CreationDate()
}