refactor(eventstore): rename EventPusher to Command, EventReader to Event, PushEvents to Push and FilterEvents to Filter (#2907)

This commit is contained in:
Silvan
2022-01-03 09:19:07 +01:00
committed by GitHub
parent 9a374f9c5c
commit 09be70949f
339 changed files with 1436 additions and 1346 deletions

View File

@@ -4,7 +4,8 @@ import (
"time"
)
type EventPusher interface {
// Command is the intend to store an event into the eventstore
type Command interface {
//Aggregate is the metadata of an aggregate
Aggregate() Aggregate
// EditorService is the service who wants to push the event
@@ -24,7 +25,8 @@ type EventPusher interface {
UniqueConstraints() []*EventUniqueConstraint
}
type EventReader interface {
// Event is a stored activity
type Event interface {
// EditorService is the service who pushed the event
EditorService() string
//EditorUser is the user who pushed the event

View File

@@ -27,17 +27,17 @@ type BaseEvent struct {
Data []byte `json:"-"`
}
// EditorService implements EventPusher
// EditorService implements Command
func (e *BaseEvent) EditorService() string {
return e.Service
}
//EditorUser implements EventPusher
//EditorUser implements Command
func (e *BaseEvent) EditorUser() string {
return e.User
}
//Type implements EventPusher
//Type implements Command
func (e *BaseEvent) Type() EventType {
return e.EventType
}

View File

@@ -19,7 +19,7 @@ type Eventstore struct {
}
type eventTypeInterceptors struct {
eventMapper func(*repository.Event) (EventReader, error)
eventMapper func(*repository.Event) (Event, error)
}
func NewEventstore(repo repository.Repository) *Eventstore {
@@ -36,10 +36,10 @@ func (es *Eventstore) Health(ctx context.Context) error {
return es.repo.Health(ctx)
}
//PushEvents pushes the events in a single transaction
//Push pushes the events in a single transaction
// an event needs at least an aggregate
func (es *Eventstore) PushEvents(ctx context.Context, pushEvents ...EventPusher) ([]EventReader, error) {
events, constraints, err := eventsToRepository(pushEvents)
func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error) {
events, constraints, err := commandsToRepository(cmds)
if err != nil {
return nil, err
}
@@ -57,37 +57,37 @@ func (es *Eventstore) PushEvents(ctx context.Context, pushEvents ...EventPusher)
return eventReaders, nil
}
func eventsToRepository(pushEvents []EventPusher) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) {
events = make([]*repository.Event, len(pushEvents))
for i, event := range pushEvents {
data, err := EventData(event)
func commandsToRepository(cmds []Command) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) {
events = make([]*repository.Event, len(cmds))
for i, cmd := range cmds {
data, err := EventData(cmd)
if err != nil {
return nil, nil, err
}
if event.Aggregate().ID == "" {
if cmd.Aggregate().ID == "" {
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Afdfe", "aggregate id must not be empty")
}
if event.Aggregate().Type == "" {
if cmd.Aggregate().Type == "" {
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Dfg32", "aggregate type must not be empty")
}
if event.Type() == "" {
if cmd.Type() == "" {
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Drg34", "event type must not be empty")
}
if event.Aggregate().Version == "" {
if cmd.Aggregate().Version == "" {
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Dgfg4", "aggregate version must not be empty")
}
events[i] = &repository.Event{
AggregateID: event.Aggregate().ID,
AggregateType: repository.AggregateType(event.Aggregate().Type),
ResourceOwner: event.Aggregate().ResourceOwner,
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(event.Aggregate().Version),
AggregateID: cmd.Aggregate().ID,
AggregateType: repository.AggregateType(cmd.Aggregate().Type),
ResourceOwner: cmd.Aggregate().ResourceOwner,
EditorService: cmd.EditorService(),
EditorUser: cmd.EditorUser(),
Type: repository.EventType(cmd.Type()),
Version: repository.Version(cmd.Aggregate().Version),
Data: data,
}
if len(event.UniqueConstraints()) > 0 {
constraints = append(constraints, uniqueConstraintsToRepository(event.UniqueConstraints())...)
if len(cmd.UniqueConstraints()) > 0 {
constraints = append(constraints, uniqueConstraintsToRepository(cmd.UniqueConstraints())...)
}
}
@@ -107,9 +107,9 @@ func uniqueConstraintsToRepository(constraints []*EventUniqueConstraint) (unique
return uniqueConstraints
}
//FilterEvents filters the stored events based on the searchQuery
//Filter filters the stored events based on the searchQuery
// and maps the events to the defined event structs
func (es *Eventstore) FilterEvents(ctx context.Context, queryFactory *SearchQueryBuilder) ([]EventReader, error) {
func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error) {
query, err := queryFactory.build()
if err != nil {
return nil, err
@@ -122,8 +122,8 @@ func (es *Eventstore) FilterEvents(ctx context.Context, queryFactory *SearchQuer
return es.mapEvents(events)
}
func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []EventReader, err error) {
mappedEvents = make([]EventReader, len(events))
func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Event, err error) {
mappedEvents = make([]Event, len(events))
es.interceptorMutex.Lock()
defer es.interceptorMutex.Unlock()
@@ -150,12 +150,12 @@ type reducer interface {
// it only appends the newly added events
Reduce() error
//AppendEvents appends the passed events to an internal list of events
AppendEvents(...EventReader)
AppendEvents(...Event)
}
//FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error {
events, err := es.FilterEvents(ctx, searchQuery)
events, err := es.Filter(ctx, searchQuery)
if err != nil {
return err
}
@@ -183,7 +183,7 @@ type queryReducer interface {
//FilterToQueryReducer filters the events based on the search query of the query function,
// appends all events to the reducer and calls it's reduce function
func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r queryReducer) error {
events, err := es.FilterEvents(ctx, r.Query())
events, err := es.Filter(ctx, r.Query())
if err != nil {
return err
}
@@ -193,7 +193,7 @@ func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r queryReducer)
}
//RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (EventReader, error)) *Eventstore {
func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) *Eventstore {
if mapper == nil || eventType == "" {
return es
}
@@ -207,7 +207,7 @@ func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func
return es
}
func EventData(event EventPusher) ([]byte, error) {
func EventData(event Command) ([]byte, error) {
switch data := event.Data().(type) {
case nil:
return nil, nil

View File

@@ -47,7 +47,7 @@ func (e *testEvent) Assets() []*Asset {
return nil
}
func testFilterMapper(event *repository.Event) (EventReader, error) {
func testFilterMapper(event *repository.Event) (Event, error) {
if event == nil {
return newTestEvent("testID", "hodor", nil, false), nil
}
@@ -60,10 +60,10 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
}
type args struct {
eventType EventType
mapper func(*repository.Event) (EventReader, error)
mapper func(*repository.Event) (Event, error)
}
type res struct {
event EventReader
event Event
mapperCount int
}
@@ -134,7 +134,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"event.type": {
eventMapper: func(*repository.Event) (EventReader, error) {
eventMapper: func(*repository.Event) (Event, error) {
return nil, errors.ThrowUnimplemented(nil, "V2-1qPvn", "unimplemented")
},
},
@@ -180,7 +180,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
func Test_eventData(t *testing.T) {
type args struct {
event EventPusher
event Command
}
type res struct {
jsonText []byte
@@ -343,7 +343,7 @@ func Test_eventData(t *testing.T) {
func TestEventstore_aggregatesToEvents(t *testing.T) {
type args struct {
events []EventPusher
events []Command
}
type res struct {
wantErr bool
@@ -357,7 +357,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "one aggregate one event",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -386,7 +386,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "one aggregate multiple events",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -432,7 +432,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "invalid data",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -449,7 +449,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "multiple aggregates",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -516,7 +516,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
events, _, err := eventsToRepository(tt.args.events)
events, _, err := commandsToRepository(tt.args.events)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
return
@@ -584,11 +584,11 @@ func (repo *testRepo) LatestSequence(ctx context.Context, queryFactory *reposito
func TestEventstore_Push(t *testing.T) {
type args struct {
events []EventPusher
events []Command
}
type fields struct {
repo *testRepo
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
eventMapper map[EventType]func(*repository.Event) (Event, error)
}
type res struct {
wantErr bool
@@ -602,7 +602,7 @@ func TestEventstore_Push(t *testing.T) {
{
name: "one aggregate one event",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -628,8 +628,8 @@ func TestEventstore_Push(t *testing.T) {
},
},
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -638,7 +638,7 @@ func TestEventstore_Push(t *testing.T) {
{
name: "one aggregate multiple events",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -681,8 +681,8 @@ func TestEventstore_Push(t *testing.T) {
},
},
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -694,7 +694,7 @@ func TestEventstore_Push(t *testing.T) {
{
name: "multiple aggregates",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -758,8 +758,8 @@ func TestEventstore_Push(t *testing.T) {
},
),
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -771,7 +771,7 @@ func TestEventstore_Push(t *testing.T) {
{
name: "push fails",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -794,7 +794,7 @@ func TestEventstore_Push(t *testing.T) {
{
name: "aggreagtes to events mapping fails",
args: args{
events: []EventPusher{
events: []Command{
newTestEvent(
"1",
"",
@@ -830,7 +830,7 @@ func TestEventstore_Push(t *testing.T) {
t.FailNow()
}
_, err := es.PushEvents(context.Background(), tt.args.events...)
_, err := es.Push(context.Background(), tt.args.events...)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -844,7 +844,7 @@ func TestEventstore_FilterEvents(t *testing.T) {
}
type fields struct {
repo *testRepo
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
eventMapper map[EventType]func(*repository.Event) (Event, error)
}
type res struct {
wantErr bool
@@ -882,8 +882,8 @@ func TestEventstore_FilterEvents(t *testing.T) {
events: []*repository.Event{},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -910,8 +910,8 @@ func TestEventstore_FilterEvents(t *testing.T) {
t: t,
err: errors.ThrowInternal(nil, "V2-RfkBa", "test err"),
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -943,8 +943,8 @@ func TestEventstore_FilterEvents(t *testing.T) {
},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -970,7 +970,7 @@ func TestEventstore_FilterEvents(t *testing.T) {
t.FailNow()
}
_, err := es.FilterEvents(context.Background(), tt.args.query)
_, err := es.Filter(context.Background(), tt.args.query)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -1089,7 +1089,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
type testReducer struct {
t *testing.T
events []EventReader
events []Event
expectedLength int
err error
}
@@ -1105,7 +1105,7 @@ func (r *testReducer) Reduce() error {
return nil
}
func (r *testReducer) AppendEvents(e ...EventReader) {
func (r *testReducer) AppendEvents(e ...Event) {
r.events = append(r.events, e...)
}
@@ -1116,7 +1116,7 @@ func TestEventstore_FilterToReducer(t *testing.T) {
}
type fields struct {
repo *testRepo
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
eventMapper map[EventType]func(*repository.Event) (Event, error)
}
type res struct {
wantErr bool
@@ -1158,8 +1158,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
events: []*repository.Event{},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -1190,8 +1190,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
t: t,
err: errors.ThrowInternal(nil, "V2-RfkBa", "test err"),
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -1227,8 +1227,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -1262,8 +1262,8 @@ func TestEventstore_FilterToReducer(t *testing.T) {
},
t: t,
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(e *repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
@@ -1338,13 +1338,13 @@ func compareEvents(t *testing.T, want, got *repository.Event) {
func TestEventstore_mapEvents(t *testing.T) {
type fields struct {
eventMapper map[EventType]func(*repository.Event) (EventReader, error)
eventMapper map[EventType]func(*repository.Event) (Event, error)
}
type args struct {
events []*repository.Event
}
type res struct {
events []EventReader
events []Event
wantErr bool
}
tests := []struct {
@@ -1381,8 +1381,8 @@ func TestEventstore_mapEvents(t *testing.T) {
},
},
fields: fields{
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(*repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(*repository.Event) (Event, error) {
return nil, errors.ThrowInternal(nil, "V2-8FbQk", "test err")
},
},
@@ -1401,14 +1401,14 @@ func TestEventstore_mapEvents(t *testing.T) {
},
},
fields: fields{
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(*repository.Event) (EventReader, error) {
eventMapper: map[EventType]func(*repository.Event) (Event, error){
"test.event": func(*repository.Event) (Event, error) {
return &testEvent{}, nil
},
},
},
res: res{
events: []EventReader{
events: []Event{
&testEvent{},
},
wantErr: false,

View File

@@ -45,8 +45,8 @@ func NewUserAddedEvent(id string, firstName string) *UserAddedEvent {
}
}
func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.added", func(event *repository.Event) (eventstore.EventReader, error) {
func UserAddedEventMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.added", func(event *repository.Event) (eventstore.Event, error) {
e := &UserAddedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
@@ -90,8 +90,8 @@ func NewUserFirstNameChangedEvent(id, firstName string) *UserFirstNameChangedEve
}
}
func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.firstName.changed", func(event *repository.Event) (eventstore.EventReader, error) {
func UserFirstNameChangedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.firstName.changed", func(event *repository.Event) (eventstore.Event, error) {
e := &UserFirstNameChangedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
@@ -132,8 +132,8 @@ func NewUserPasswordCheckedEvent(id string) *UserPasswordCheckedEvent {
}
}
func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.password.checked", func(event *repository.Event) (eventstore.EventReader, error) {
func UserPasswordCheckedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.password.checked", func(event *repository.Event) (eventstore.Event, error) {
return &UserPasswordCheckedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}, nil
@@ -169,8 +169,8 @@ func NewUserDeletedEvent(id string) *UserDeletedEvent {
}
}
func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.EventReader, error)) {
return "user.deleted", func(event *repository.Event) (eventstore.EventReader, error) {
func UserDeletedMapper() (eventstore.EventType, func(*repository.Event) (eventstore.Event, error)) {
return "user.deleted", func(event *repository.Event) (eventstore.Event, error) {
return &UserDeletedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}, nil
@@ -199,7 +199,7 @@ type UsersReadModel struct {
Users []*UserReadModel
}
func (rm *UsersReadModel) AppendEvents(events ...eventstore.EventReader) {
func (rm *UsersReadModel) AppendEvents(events ...eventstore.Event) {
rm.ReadModel.AppendEvents(events...)
for _, event := range events {
switch e := event.(type) {
@@ -294,7 +294,7 @@ func TestUserReadModel(t *testing.T) {
RegisterFilterEventMapper(UserPasswordCheckedMapper()).
RegisterFilterEventMapper(UserDeletedMapper())
events, err := es.PushEvents(context.Background(),
events, err := es.Push(context.Background(),
NewUserAddedEvent("1", "hodor"),
NewUserAddedEvent("2", "hodor"),
NewUserPasswordCheckedEvent("2"),

View File

@@ -196,7 +196,7 @@ func (h *StatementHandler) fetchPreviousStmts(
return nil, nil
}
events, err := h.Eventstore.FilterEvents(ctx, query)
events, err := h.Eventstore.Filter(ctx, query)
if err != nil {
return nil, err
}

View File

@@ -21,7 +21,7 @@ var (
errReduce = errors.New("reduce err")
)
var _ eventstore.EventReader = &testEvent{}
var _ eventstore.Event = &testEvent{}
type testEvent struct {
eventstore.BaseEvent
@@ -1560,13 +1560,13 @@ func TestStatementHandler_updateCurrentSequence(t *testing.T) {
}
func testReduce() handler.Reduce {
return func(event eventstore.EventReader) (*handler.Statement, error) {
return func(event eventstore.Event) (*handler.Statement, error) {
return NewNoOpStatement(event), nil
}
}
func testReduceErr(err error) handler.Reduce {
return func(event eventstore.EventReader) (*handler.Statement, error) {
return func(event eventstore.Event) (*handler.Statement, error) {
return nil, err
}
}

View File

@@ -6,7 +6,7 @@ import (
)
//reduce implements handler.Reduce function
func (h *StatementHandler) reduce(event eventstore.EventReader) (*handler.Statement, error) {
func (h *StatementHandler) reduce(event eventstore.Event) (*handler.Statement, error) {
reduce, ok := h.reduces[event.Type()]
if !ok {
return NewNoOpStatement(event), nil

View File

@@ -23,7 +23,7 @@ func WithTableSuffix(name string) func(*execConfig) {
}
}
func NewCreateStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) *handler.Statement {
func NewCreateStatement(event eventstore.Event, values []handler.Column, opts ...execOption) *handler.Statement {
cols, params, args := columnsToQuery(values)
columnNames := strings.Join(cols, ", ")
valuesPlaceholder := strings.Join(params, ", ")
@@ -48,7 +48,7 @@ func NewCreateStatement(event eventstore.EventReader, values []handler.Column, o
}
}
func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) *handler.Statement {
func NewUpsertStatement(event eventstore.Event, values []handler.Column, opts ...execOption) *handler.Statement {
cols, params, args := columnsToQuery(values)
columnNames := strings.Join(cols, ", ")
valuesPlaceholder := strings.Join(params, ", ")
@@ -73,7 +73,7 @@ func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, o
}
}
func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, conditions []handler.Condition, opts ...execOption) *handler.Statement {
func NewUpdateStatement(event eventstore.Event, values []handler.Column, conditions []handler.Condition, opts ...execOption) *handler.Statement {
cols, params, args := columnsToQuery(values)
wheres, whereArgs := conditionsToWhere(conditions, len(params))
args = append(args, whereArgs...)
@@ -106,7 +106,7 @@ func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, c
}
}
func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condition, opts ...execOption) *handler.Statement {
func NewDeleteStatement(event eventstore.Event, conditions []handler.Condition, opts ...execOption) *handler.Statement {
wheres, args := conditionsToWhere(conditions, 0)
wheresPlaceholders := strings.Join(wheres, " AND ")
@@ -131,7 +131,7 @@ func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condi
}
}
func NewNoOpStatement(event eventstore.EventReader) *handler.Statement {
func NewNoOpStatement(event eventstore.Event) *handler.Statement {
return &handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
@@ -139,7 +139,7 @@ func NewNoOpStatement(event eventstore.EventReader) *handler.Statement {
}
}
func NewMultiStatement(event eventstore.EventReader, opts ...func(eventstore.EventReader) Exec) *handler.Statement {
func NewMultiStatement(event eventstore.Event, opts ...func(eventstore.Event) Exec) *handler.Statement {
if len(opts) == 0 {
return NewNoOpStatement(event)
}
@@ -157,26 +157,26 @@ func NewMultiStatement(event eventstore.EventReader, opts ...func(eventstore.Eve
type Exec func(ex handler.Executer, projectionName string) error
func AddCreateStatement(columns []handler.Column, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
func AddCreateStatement(columns []handler.Column, opts ...execOption) func(eventstore.Event) Exec {
return func(event eventstore.Event) Exec {
return NewCreateStatement(event, columns, opts...).Execute
}
}
func AddUpsertStatement(values []handler.Column, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
func AddUpsertStatement(values []handler.Column, opts ...execOption) func(eventstore.Event) Exec {
return func(event eventstore.Event) Exec {
return NewUpsertStatement(event, values, opts...).Execute
}
}
func AddUpdateStatement(values []handler.Column, conditions []handler.Condition, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
func AddUpdateStatement(values []handler.Column, conditions []handler.Condition, opts ...execOption) func(eventstore.Event) Exec {
return func(event eventstore.Event) Exec {
return NewUpdateStatement(event, values, conditions, opts...).Execute
}
}
func AddDeleteStatement(conditions []handler.Condition, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
func AddDeleteStatement(conditions []handler.Condition, opts ...execOption) func(eventstore.Event) Exec {
return func(event eventstore.Event) Exec {
return NewDeleteStatement(event, conditions, opts...).Execute
}
}
@@ -206,7 +206,7 @@ func NewArrayRemoveCol(column string, value interface{}) handler.Column {
// if the value of a col is empty the data will be copied from the selected row
// if the value of a col is not empty the data will be set by the static value
// conds represent the conditions for the selection subquery
func NewCopyStatement(event eventstore.EventReader, cols []handler.Column, conds []handler.Condition, opts ...execOption) *handler.Statement {
func NewCopyStatement(event eventstore.Event, cols []handler.Column, conds []handler.Condition, opts ...execOption) *handler.Statement {
columnNames := make([]string, len(cols))
selectColumns := make([]string, len(cols))
argCounter := 0

View File

@@ -631,7 +631,7 @@ func TestNewMultiStatement(t *testing.T) {
type args struct {
table string
event *testEvent
execs []func(eventstore.EventReader) Exec
execs []func(eventstore.Event) Exec
}
type want struct {
@@ -671,7 +671,7 @@ func TestNewMultiStatement(t *testing.T) {
sequence: 1,
previousSequence: 0,
},
execs: []func(eventstore.EventReader) Exec{
execs: []func(eventstore.Event) Exec{
AddDeleteStatement(
[]handler.Condition{},
),
@@ -706,7 +706,7 @@ func TestNewMultiStatement(t *testing.T) {
previousSequence: 0,
aggregateType: "agg",
},
execs: []func(eventstore.EventReader) Exec{
execs: []func(eventstore.Event) Exec{
AddDeleteStatement(
[]handler.Condition{
{

View File

@@ -10,13 +10,13 @@ type HandlerConfig struct {
type Handler struct {
Eventstore *eventstore.Eventstore
Sub *eventstore.Subscription
EventQueue chan eventstore.EventReader
EventQueue chan eventstore.Event
}
func NewHandler(config HandlerConfig) Handler {
return Handler{
Eventstore: config.Eventstore,
EventQueue: make(chan eventstore.EventReader, 100),
EventQueue: make(chan eventstore.Event, 100),
}
}

View File

@@ -23,7 +23,7 @@ type Update func(context.Context, []*Statement, Reduce) (unexecutedStmts []*Stat
//Reduce reduces the given event to a statement
//which is used to update the projection
type Reduce func(eventstore.EventReader) (*Statement, error)
type Reduce func(eventstore.Event) (*Statement, error)
//Lock is used for mutex handling if needed on the projection
type Lock func(context.Context, time.Duration) <-chan error
@@ -156,7 +156,7 @@ func (h *ProjectionHandler) Process(
func (h *ProjectionHandler) processEvent(
ctx context.Context,
event eventstore.EventReader,
event eventstore.Event,
reduce Reduce,
) error {
stmt, err := reduce(event)
@@ -262,7 +262,7 @@ func (h *ProjectionHandler) fetchBulkStmts(
return false, err
}
events, err := h.Eventstore.FilterEvents(ctx, eventQuery)
events, err := h.Eventstore.Filter(ctx, eventQuery)
if err != nil {
logging.LogWithFields("HANDL-X8vlo", "projection", h.ProjectionName).WithError(err).Info("Unable to bulk fetch events")
return false, err

View File

@@ -46,7 +46,7 @@ func TestProjectionHandler_processEvent(t *testing.T) {
}
type args struct {
ctx context.Context
event eventstore.EventReader
event eventstore.Event
reduce Reduce
}
type want struct {
@@ -849,13 +849,13 @@ func testUpdate(t *testing.T, expectedStmtCount int, returnedErr error) Update {
}
func testReduce(stmts *Statement) Reduce {
return func(event eventstore.EventReader) (*Statement, error) {
return func(event eventstore.Event) (*Statement, error) {
return stmts, nil
}
}
func testReduceErr(err error) Reduce {
return func(event eventstore.EventReader) (*Statement, error) {
return func(event eventstore.Event) (*Statement, error) {
return nil, err
}
}

View File

@@ -6,17 +6,17 @@ import "time"
// It implements a basic reducer
// it might be saved in a database or in memory
type ReadModel struct {
AggregateID string `json:"-"`
ProcessedSequence uint64 `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []EventReader `json:"-"`
ResourceOwner string `json:"-"`
AggregateID string `json:"-"`
ProcessedSequence uint64 `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []Event `json:"-"`
ResourceOwner string `json:"-"`
}
//AppendEvents adds all the events to the read model.
// The function doesn't compute the new state of the read model
func (rm *ReadModel) AppendEvents(events ...EventReader) *ReadModel {
func (rm *ReadModel) AppendEvents(events ...Event) *ReadModel {
rm.Events = append(rm.Events, events...)
return rm
}
@@ -42,6 +42,6 @@ func (rm *ReadModel) Reduce() error {
rm.ProcessedSequence = rm.Events[len(rm.Events)-1].Sequence()
// all events processed and not needed anymore
rm.Events = nil
rm.Events = []EventReader{}
rm.Events = []Event{}
return nil
}

View File

@@ -13,12 +13,12 @@ var (
)
type Subscription struct {
Events chan EventReader
Events chan Event
types map[AggregateType][]EventType
}
//SubscribeAggregates subscribes for all events on the given aggregates
func SubscribeAggregates(eventQueue chan EventReader, aggregates ...AggregateType) *Subscription {
func SubscribeAggregates(eventQueue chan Event, aggregates ...AggregateType) *Subscription {
types := make(map[AggregateType][]EventType, len(aggregates))
for _, aggregate := range aggregates {
types[aggregate] = nil
@@ -40,7 +40,7 @@ func SubscribeAggregates(eventQueue chan EventReader, aggregates ...AggregateTyp
//SubscribeEventTypes subscribes for the given event types
// if no event types are provided the subscription is for all events of the aggregate
func SubscribeEventTypes(eventQueue chan EventReader, types map[AggregateType][]EventType) *Subscription {
func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventType) *Subscription {
aggregates := make([]AggregateType, len(types))
sub := &Subscription{
Events: eventQueue,
@@ -57,7 +57,7 @@ func SubscribeEventTypes(eventQueue chan EventReader, types map[AggregateType][]
return sub
}
func notify(events []EventReader) {
func notify(events []Event) {
go v1.Notify(MapEventsToV1Events(events))
subsMutext.Lock()
defer subsMutext.Unlock()
@@ -106,7 +106,7 @@ func (s *Subscription) Unsubscribe() {
}
}
func MapEventsToV1Events(events []EventReader) []*models.Event {
func MapEventsToV1Events(events []Event) []*models.Event {
v1Events := make([]*models.Event, len(events))
for i, event := range events {
v1Events[i] = mapEventToV1Event(event)
@@ -114,7 +114,7 @@ func MapEventsToV1Events(events []EventReader) []*models.Event {
return v1Events
}
func mapEventToV1Event(event EventReader) *models.Event {
func mapEventToV1Event(event Event) *models.Event {
return &models.Event{
Sequence: event.Sequence(),
CreationDate: event.CreationDate(),

View File

@@ -6,16 +6,16 @@ import "time"
// It implements a basic reducer
// it's purpose is to reduce events to create new ones
type WriteModel struct {
AggregateID string `json:"-"`
ProcessedSequence uint64 `json:"-"`
Events []EventReader `json:"-"`
ResourceOwner string `json:"-"`
ChangeDate time.Time `json:"-"`
AggregateID string `json:"-"`
ProcessedSequence uint64 `json:"-"`
Events []Event `json:"-"`
ResourceOwner string `json:"-"`
ChangeDate time.Time `json:"-"`
}
//AppendEvents adds all the events to the read model.
// The function doesn't compute the new state of the read model
func (rm *WriteModel) AppendEvents(events ...EventReader) {
func (rm *WriteModel) AppendEvents(events ...Event) {
rm.Events = append(rm.Events, events...)
}
@@ -38,6 +38,6 @@ func (wm *WriteModel) Reduce() error {
// all events processed and not needed anymore
wm.Events = nil
wm.Events = []EventReader{}
wm.Events = []Event{}
return nil
}