fix: push events (#1262)

* fix: push events instead of aggregates

* fix: tests

* try without aggregate methods and with aggregate methods

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: change push aggregate to push events

* fix: client secret

* fix: query eventtypes

* fix: query eventtypes

* fix: eventstore index

* fix: index

* fix: merge new eventstore

* fix: remove unnecessary todos

* fix: remove unnecessary todos

Co-authored-by: Fabiennne <fabienne.gerschwiler@gmail.com>
This commit is contained in:
Silvan
2021-02-18 14:48:27 +01:00
committed by GitHub
parent 027a6386c0
commit 00fec8830a
187 changed files with 3000 additions and 2161 deletions

View File

@@ -1,86 +1,66 @@
package eventstore
type Aggregater interface {
//ID returns the aggreagte id
ID() string
//KeyType returns the aggregate type
Type() AggregateType
//Events returns the events which will be pushed
Events() []EventPusher
//ResourceOwner returns the organisation id which manages this aggregate
// resource owner is only on the inital push needed
// afterwards the resource owner of the previous event is taken
ResourceOwner() string
//Version represents the semantic version of the aggregate
Version() Version
}
import (
"context"
"github.com/caos/zitadel/internal/api/authz"
)
type aggregateOpt func(*Aggregate)
//NewAggregate is the default constructor of an aggregate
// opts overwrite values calculated by given parameters
func NewAggregate(
ctx context.Context,
id string,
typ AggregateType,
resourceOwner string,
version Version,
opts ...aggregateOpt,
) *Aggregate {
return &Aggregate{
id: id,
typ: typ,
resourceOwner: resourceOwner,
version: version,
events: []EventPusher{},
a := &Aggregate{
ID: id,
Typ: typ,
ResourceOwner: authz.GetCtxData(ctx).OrgID,
Version: version,
}
for _, opt := range opts {
opt(a)
}
return a
}
//WithResourceOwner overwrites the resource owner of the aggregate
// by default the resource owner is set by the context
func WithResourceOwner(resourceOwner string) aggregateOpt {
return func(aggregate *Aggregate) {
aggregate.ResourceOwner = resourceOwner
}
}
//AggregateFromWriteModel maps the given WriteModel to an Aggregate
func AggregateFromWriteModel(
wm *WriteModel,
typ AggregateType,
version Version,
) *Aggregate {
return &Aggregate{
id: wm.AggregateID,
typ: typ,
resourceOwner: wm.ResourceOwner,
version: version,
events: []EventPusher{},
ID: wm.AggregateID,
Typ: typ,
ResourceOwner: wm.ResourceOwner,
Version: version,
}
}
//Aggregate is the basic implementation of Aggregater
type Aggregate struct {
id string `json:"-"`
typ AggregateType `json:"-"`
events []EventPusher `json:"-"`
resourceOwner string `json:"-"`
version Version `json:"-"`
}
//PushEvents adds all the events to the aggregate.
// The added events will be pushed to eventstore
func (a *Aggregate) PushEvents(events ...EventPusher) *Aggregate {
a.events = append(a.events, events...)
return a
}
//ID implements Aggregater
func (a *Aggregate) ID() string {
return a.id
}
//KeyType implements Aggregater
func (a *Aggregate) Type() AggregateType {
return a.typ
}
//Events implements Aggregater
func (a *Aggregate) Events() []EventPusher {
return a.events
}
//ResourceOwner implements Aggregater
func (a *Aggregate) ResourceOwner() string {
return a.resourceOwner
}
//Version implements Aggregater
func (a *Aggregate) Version() Version {
return a.version
//ID is the unique identitfier of this aggregate
ID string `json:"-"`
//Typ is the name of the aggregate.
Typ AggregateType `json:"-"`
//ResourceOwner is the org this aggregates belongs to
ResourceOwner string `json:"-"`
//Version is the semver this aggregate represents
Version Version `json:"-"`
}

View File

@@ -5,6 +5,8 @@ import (
)
type EventPusher interface {
//Aggregate is the metadata of an aggregate
Aggregate() Aggregate
// EditorService is the service who wants to push the event
EditorService() string
//EditorUser is the user who wants to push the event
@@ -30,10 +32,8 @@ type EventReader interface {
//KeyType is the type of the event
Type() EventType
AggregateID() string
AggregateType() AggregateType
ResourceOwner() string
AggregateVersion() Version
Aggregate() Aggregate
Sequence() uint64
CreationDate() time.Time
}

View File

@@ -9,15 +9,14 @@ import (
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
//BaseEvent represents the minimum metadata of an event
type BaseEvent struct {
aggregateID string `json:"-"`
aggregateType AggregateType `json:"-"`
EventType EventType `json:"-"`
EventType EventType
resourceOwner string `json:"-"`
aggregateVersion Version `json:"-"`
sequence uint64 `json:"-"`
creationDate time.Time `json:"-"`
aggregate Aggregate
sequence uint64
creationDate time.Time
//User is the user who created the event
User string `json:"-"`
@@ -35,59 +34,51 @@ func (e *BaseEvent) EditorUser() string {
return e.User
}
//KeyType implements EventPusher
//Type implements EventPusher
func (e *BaseEvent) Type() EventType {
return e.EventType
}
func (e *BaseEvent) AggregateID() string {
return e.aggregateID
}
func (e *BaseEvent) AggregateType() AggregateType {
return e.aggregateType
}
func (e *BaseEvent) ResourceOwner() string {
return e.resourceOwner
}
func (e *BaseEvent) AggregateVersion() Version {
return e.aggregateVersion
}
//Sequence is an upcounting unique number of the event
func (e *BaseEvent) Sequence() uint64 {
return e.sequence
}
//CreationDate is the the time, the event is inserted into the eventstore
func (e *BaseEvent) CreationDate() time.Time {
return e.creationDate
}
//Aggregate represents the metadata of the event's aggregate
func (e *BaseEvent) Aggregate() Aggregate {
return e.aggregate
}
//BaseEventFromRepo maps a stored event to a BaseEvent
func BaseEventFromRepo(event *repository.Event) *BaseEvent {
return &BaseEvent{
aggregateID: event.AggregateID,
aggregateType: AggregateType(event.AggregateType),
aggregateVersion: Version(event.Version),
EventType: EventType(event.Type),
creationDate: event.CreationDate,
sequence: event.Sequence,
resourceOwner: event.ResourceOwner,
Service: event.EditorService,
User: event.EditorUser,
aggregate: Aggregate{
ID: event.AggregateID,
Typ: AggregateType(event.AggregateType),
ResourceOwner: event.ResourceOwner,
Version: Version(event.Version),
},
EventType: EventType(event.Type),
creationDate: event.CreationDate,
sequence: event.Sequence,
Service: event.EditorService,
User: event.EditorUser,
}
}
func NewBaseEventForPush(ctx context.Context, typ EventType) *BaseEvent {
svcName := service.FromContext(ctx)
//NewBaseEventForPush is the constructor for event's which will be pushed into the eventstore
// the resource owner of the aggregate is only used if it's the first event of this aggregateroot
// afterwards the resource owner of the first previous events is taken
func NewBaseEventForPush(ctx context.Context, aggregate *Aggregate, typ EventType) *BaseEvent {
return &BaseEvent{
aggregate: *aggregate,
User: authz.GetCtxData(ctx).UserID,
Service: svcName,
Service: service.FromContext(ctx),
EventType: typ,
}
}
func NewBaseEventForPushWithResourceOwner(ctx context.Context, typ EventType, resourceOwner string) *BaseEvent {
svcName := service.FromContext(ctx)
return &BaseEvent{
User: authz.GetCtxData(ctx).UserID,
Service: svcName,
EventType: typ,
resourceOwner: resourceOwner,
}
}

View File

@@ -36,67 +36,56 @@ func (es *Eventstore) Health(ctx context.Context) error {
return es.repo.Health(ctx)
}
//PushAggregate pushes the aggregate and reduces the new events on the aggregate
func (es *Eventstore) PushAggregate(ctx context.Context, writeModel queryReducer, aggregate Aggregater) error {
events, err := es.PushAggregates(ctx, aggregate)
if err != nil {
return err
}
writeModel.AppendEvents(events...)
return writeModel.Reduce()
}
//PushAggregates maps the events of all aggregates to an eventstore event
// based on the pushMapper
func (es *Eventstore) PushAggregates(ctx context.Context, aggregates ...Aggregater) ([]EventReader, error) {
events, uniqueConstraints, err := es.aggregatesToEvents(aggregates)
//PushEvents pushes the events in a single transaction
// an event needs at least an aggregate
func (es *Eventstore) PushEvents(ctx context.Context, pushEvents ...EventPusher) ([]EventReader, error) {
events, constraints, err := eventsToRepository(pushEvents)
if err != nil {
return nil, err
}
err = es.repo.Push(ctx, events, uniqueConstraints...)
err = es.repo.Push(ctx, events, constraints...)
if err != nil {
return nil, err
}
return es.mapEvents(events)
}
func (es *Eventstore) aggregatesToEvents(aggregates []Aggregater) ([]*repository.Event, []*repository.UniqueConstraint, error) {
events := make([]*repository.Event, 0, len(aggregates))
uniqueConstraints := make([]*repository.UniqueConstraint, 0)
for _, aggregate := range aggregates {
for _, event := range aggregate.Events() {
data, err := eventData(event)
if err != nil {
return nil, nil, err
}
events = append(events, &repository.Event{
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
Data: data,
})
if event.UniqueConstraints() != nil {
for _, constraint := range event.UniqueConstraints() {
uniqueConstraints = append(uniqueConstraints,
&repository.UniqueConstraint{
UniqueType: constraint.UniqueType,
UniqueField: constraint.UniqueField,
Action: uniqueConstraintActionToRepository(constraint.Action),
ErrorMessage: constraint.ErrorMessage,
},
)
}
}
func eventsToRepository(pushEvents []EventPusher) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) {
events = make([]*repository.Event, len(pushEvents))
for i, event := range pushEvents {
data, err := eventData(event)
if err != nil {
return nil, nil, err
}
events[i] = &repository.Event{
AggregateID: event.Aggregate().ID,
AggregateType: repository.AggregateType(event.Aggregate().Typ),
ResourceOwner: event.Aggregate().ResourceOwner,
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(event.Aggregate().Version),
Data: data,
}
if len(event.UniqueConstraints()) > 0 {
constraints = append(constraints, uniqueConstraintsToRepository(event.UniqueConstraints())...)
}
}
return events, uniqueConstraints, nil
return events, constraints, nil
}
func uniqueConstraintsToRepository(constraints []*EventUniqueConstraint) (uniqueConstraints []*repository.UniqueConstraint) {
uniqueConstraints = make([]*repository.UniqueConstraint, len(constraints))
for i, constraint := range constraints {
uniqueConstraints[i] = &repository.UniqueConstraint{
UniqueType: constraint.UniqueType,
UniqueField: constraint.UniqueField,
Action: uniqueConstraintActionToRepository(constraint.Action),
ErrorMessage: constraint.ErrorMessage,
}
}
return uniqueConstraints
}
//FilterEvents filters the stored events based on the searchQuery

View File

@@ -31,7 +31,7 @@ func (a *testAggregate) Events() []EventPusher {
}
func (a *testAggregate) ResourceOwner() string {
return "ro"
return "caos"
}
func (a *testAggregate) Version() Version {
@@ -47,13 +47,14 @@ type testEvent struct {
data func() interface{}
}
func newTestEvent(description string, data func() interface{}, checkPrevious bool) *testEvent {
func newTestEvent(id, description string, data func() interface{}, checkPrevious bool) *testEvent {
return &testEvent{
description: description,
data: data,
shouldCheckPrevious: checkPrevious,
BaseEvent: *NewBaseEventForPush(
service.WithService(authz.NewMockContext("resourceOwner", "editorUser"), "editorService"),
NewAggregate(authz.NewMockContext("caos", "adlerhurst"), id, "test.aggregate", "v1"),
"test.event",
),
}
@@ -69,7 +70,7 @@ func (e *testEvent) UniqueConstraints() []*EventUniqueConstraint {
func testFilterMapper(event *repository.Event) (EventReader, error) {
if event == nil {
return newTestEvent("hodor", nil, false), nil
return newTestEvent("testID", "hodor", nil, false), nil
}
return &testEvent{description: "hodor", BaseEvent: *BaseEventFromRepo(event)}, nil
}
@@ -129,7 +130,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
mapper: testFilterMapper,
},
res: res{
event: newTestEvent("hodor", nil, false),
event: newTestEvent("testID", "hodor", nil, false),
mapperCount: 1,
},
},
@@ -145,7 +146,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
mapper: testFilterMapper,
},
res: res{
event: newTestEvent("hodor", nil, false),
event: newTestEvent("testID", "hodor", nil, false),
mapperCount: 2,
},
},
@@ -165,7 +166,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
mapper: testFilterMapper,
},
res: res{
event: newTestEvent("hodor", nil, false),
event: newTestEvent("testID", "hodor", nil, false),
mapperCount: 2,
},
},
@@ -215,6 +216,7 @@ func Test_eventData(t *testing.T) {
name: "data as json bytes",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
return []byte(`{"piff":"paff"}`)
@@ -230,6 +232,7 @@ func Test_eventData(t *testing.T) {
name: "data as invalid json bytes",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
return []byte(`{"piffpaff"}`)
@@ -245,6 +248,7 @@ func Test_eventData(t *testing.T) {
name: "data as struct",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
return struct {
@@ -262,6 +266,7 @@ func Test_eventData(t *testing.T) {
name: "data as ptr to struct",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
return &struct {
@@ -279,6 +284,7 @@ func Test_eventData(t *testing.T) {
name: "no data",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
return nil
@@ -294,6 +300,7 @@ func Test_eventData(t *testing.T) {
name: "invalid because primitive",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
return ""
@@ -309,6 +316,7 @@ func Test_eventData(t *testing.T) {
name: "invalid because pointer to primitive",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
var s string
@@ -325,6 +333,7 @@ func Test_eventData(t *testing.T) {
name: "invalid because invalid struct for json",
args: args{
event: newTestEvent(
"id",
"hodor",
func() interface{} {
return struct {
@@ -355,7 +364,8 @@ func Test_eventData(t *testing.T) {
func TestEventstore_aggregatesToEvents(t *testing.T) {
type args struct {
aggregates []Aggregater
aggregates []Aggregate
events []EventPusher
}
type res struct {
wantErr bool
@@ -369,18 +379,14 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "one aggregate one event",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
false),
},
},
res: res{
@@ -392,7 +398,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -402,24 +408,21 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "one aggregate multiple events",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
false),
newTestEvent(
"",
func() interface{} {
return nil
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
false),
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
false),
},
},
res: res{
@@ -431,7 +434,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -441,7 +444,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -451,18 +454,14 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "invalid data",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return `{"data":""`
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return `{"data":""`
},
},
false),
},
},
res: res{
@@ -472,35 +471,28 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
{
name: "multiple aggregates",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
false),
newTestEvent(
"",
func() interface{} {
return nil
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
&testAggregate{
id: "2",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
true),
false),
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
false),
newTestEvent(
"2",
"",
func() interface{} {
return nil
},
true),
},
},
res: res{
@@ -513,7 +505,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -523,7 +515,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -535,7 +527,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -546,8 +538,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := &Eventstore{}
events, _, err := es.aggregatesToEvents(tt.args.aggregates)
events, _, err := eventsToRepository(tt.args.events)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
return
@@ -613,7 +604,7 @@ func (repo *testRepo) LatestSequence(ctx context.Context, queryFactory *reposito
func TestEventstore_Push(t *testing.T) {
type args struct {
aggregates []Aggregater
events []EventPusher
}
type fields struct {
repo *testRepo
@@ -631,18 +622,14 @@ func TestEventstore_Push(t *testing.T) {
{
name: "one aggregate one event",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
false),
},
},
fields: fields{
@@ -655,7 +642,7 @@ func TestEventstore_Push(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -671,24 +658,21 @@ func TestEventstore_Push(t *testing.T) {
{
name: "one aggregate multiple events",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
false),
newTestEvent(
"",
func() interface{} {
return nil
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
false),
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
false),
},
},
fields: fields{
@@ -701,7 +685,7 @@ func TestEventstore_Push(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -711,7 +695,7 @@ func TestEventstore_Push(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -730,35 +714,28 @@ func TestEventstore_Push(t *testing.T) {
{
name: "multiple aggregates",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
false),
newTestEvent(
"",
func() interface{} {
return nil
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
&testAggregate{
id: "2",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
true),
false),
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
false),
newTestEvent(
"2",
"",
func() interface{} {
return nil
},
true),
},
},
fields: fields{
@@ -772,7 +749,7 @@ func TestEventstore_Push(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -782,7 +759,7 @@ func TestEventstore_Push(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -794,7 +771,7 @@ func TestEventstore_Push(t *testing.T) {
Data: []byte(nil),
EditorService: "editorService",
EditorUser: "editorUser",
ResourceOwner: "ro",
ResourceOwner: "caos",
Type: "test.event",
Version: "v1",
},
@@ -814,18 +791,14 @@ func TestEventstore_Push(t *testing.T) {
{
name: "push fails",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return nil
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return nil
},
},
false),
},
},
fields: fields{
@@ -841,18 +814,14 @@ func TestEventstore_Push(t *testing.T) {
{
name: "aggreagtes to events mapping fails",
args: args{
aggregates: []Aggregater{
&testAggregate{
id: "1",
events: []EventPusher{
newTestEvent(
"",
func() interface{} {
return `{"data":""`
},
false),
events: []EventPusher{
newTestEvent(
"1",
"",
func() interface{} {
return `{"data":""`
},
},
false),
},
},
fields: fields{
@@ -881,7 +850,7 @@ func TestEventstore_Push(t *testing.T) {
t.FailNow()
}
_, err := es.PushAggregates(context.Background(), tt.args.aggregates...)
_, err := es.PushEvents(context.Background(), tt.args.events...)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -1313,13 +1282,13 @@ func compareEvents(t *testing.T, want, got *repository.Event) {
t.Helper()
if want.AggregateID != got.AggregateID {
t.Errorf("wrong aggregateID got %q want %q", want.AggregateID, got.AggregateID)
t.Errorf("wrong aggregateID got %q want %q", got.AggregateID, want.AggregateID)
}
if want.AggregateType != got.AggregateType {
t.Errorf("wrong aggregateType got %q want %q", want.AggregateType, got.AggregateType)
t.Errorf("wrong aggregateType got %q want %q", got.AggregateType, want.AggregateType)
}
if !reflect.DeepEqual(want.Data, got.Data) {
t.Errorf("wrong data got %s want %s", string(want.Data), string(got.Data))
t.Errorf("wrong data got %s want %s", string(got.Data), string(want.Data))
}
if want.EditorService != got.EditorService {
t.Errorf("wrong editor service got %q want %q", got.EditorService, want.EditorService)

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/eventstore/v2"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
"github.com/caos/zitadel/internal/eventstore/v2/repository/sql"
@@ -15,34 +16,13 @@ import (
// ------------------------------------------------------------
// User aggregate start
// ------------------------------------------------------------
type UserAggregate struct {
eventstore.Aggregate
FirstName string
}
func NewUserAggregate(id string) *UserAggregate {
return &UserAggregate{
Aggregate: *eventstore.NewAggregate(
id,
"test.user",
"caos",
"v1",
),
}
}
func (rm *UserAggregate) Reduce() error {
for _, event := range rm.Aggregate.Events() {
switch e := event.(type) {
case *UserAddedEvent:
rm.FirstName = e.FirstName
case *UserFirstNameChangedEvent:
rm.FirstName = e.FirstName
}
}
return nil
func NewUserAggregate(id string) *eventstore.Aggregate {
return eventstore.NewAggregate(
authz.NewMockContext("caos", "adlerhurst"),
id,
"test.user",
"v1",
)
}
// ------------------------------------------------------------
@@ -55,14 +35,13 @@ type UserAddedEvent struct {
FirstName string `json:"firstName"`
}
func NewUserAddedEvent(firstName string) *UserAddedEvent {
func NewUserAddedEvent(id string, firstName string) *UserAddedEvent {
return &UserAddedEvent{
FirstName: firstName,
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.added",
},
BaseEvent: *eventstore.NewBaseEventForPush(
context.Background(),
NewUserAggregate(id),
"user.added"),
}
}
@@ -97,14 +76,13 @@ type UserFirstNameChangedEvent struct {
FirstName string `json:"firstName"`
}
func NewUserFirstNameChangedEvent(firstName string) *UserFirstNameChangedEvent {
func NewUserFirstNameChangedEvent(id, firstName string) *UserFirstNameChangedEvent {
return &UserFirstNameChangedEvent{
FirstName: firstName,
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.firstName.changed",
},
BaseEvent: *eventstore.NewBaseEventForPush(
context.Background(),
NewUserAggregate(id),
"user.firstname.changed"),
}
}
@@ -137,13 +115,12 @@ type UserPasswordCheckedEvent struct {
eventstore.BaseEvent `json:"-"`
}
func NewUserPasswordCheckedEvent() *UserPasswordCheckedEvent {
func NewUserPasswordCheckedEvent(id string) *UserPasswordCheckedEvent {
return &UserPasswordCheckedEvent{
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.password.checked",
},
BaseEvent: *eventstore.NewBaseEventForPush(
context.Background(),
NewUserAggregate(id),
"user.password.checked"),
}
}
@@ -171,13 +148,12 @@ type UserDeletedEvent struct {
eventstore.BaseEvent `json:"-"`
}
func NewUserDeletedEvent() *UserDeletedEvent {
func NewUserDeletedEvent(id string) *UserDeletedEvent {
return &UserDeletedEvent{
BaseEvent: eventstore.BaseEvent{
Service: "test.suite",
User: "adlerhurst",
EventType: "user.deleted",
},
BaseEvent: *eventstore.NewBaseEventForPush(
context.Background(),
NewUserAggregate(id),
"user.deleted"),
}
}
@@ -213,18 +189,18 @@ func (rm *UsersReadModel) AppendEvents(events ...eventstore.EventReader) {
switch e := event.(type) {
case *UserAddedEvent:
//insert
user := NewUserReadModel(e.AggregateID())
user := NewUserReadModel(e.Aggregate().ID)
rm.Users = append(rm.Users, user)
user.AppendEvents(e)
case *UserFirstNameChangedEvent, *UserPasswordCheckedEvent:
//update
_, user := rm.userByID(e.AggregateID())
_, user := rm.userByID(e.Aggregate().ID)
if user == nil {
return
}
user.AppendEvents(e)
case *UserDeletedEvent:
idx, _ := rm.userByID(e.AggregateID())
idx, _ := rm.userByID(e.Aggregate().ID)
if idx < 0 {
return
}
@@ -302,11 +278,14 @@ func TestUserReadModel(t *testing.T) {
RegisterFilterEventMapper(UserPasswordCheckedMapper()).
RegisterFilterEventMapper(UserDeletedMapper())
events, err := es.PushAggregates(context.Background(),
NewUserAggregate("1").PushEvents(NewUserAddedEvent("hodor")),
NewUserAggregate("2").PushEvents(NewUserAddedEvent("hodor"), NewUserPasswordCheckedEvent(), NewUserPasswordCheckedEvent(), NewUserFirstNameChangedEvent("ueli")),
NewUserAggregate("2").PushEvents(NewUserDeletedEvent()),
)
events, err := es.PushEvents(context.Background(),
NewUserAddedEvent("1", "hodor"),
NewUserAddedEvent("2", "hodor"),
NewUserPasswordCheckedEvent("2"),
NewUserPasswordCheckedEvent("2"),
NewUserFirstNameChangedEvent("2", "ueli"),
NewUserDeletedEvent("2"))
if err != nil {
t.Errorf("unexpected error on push aggregates: %v", err)
}

View File

@@ -2,7 +2,7 @@ package eventstore
import "time"
//MemberReadModel is the minimum representation of a View model.
//ReadModel is the minimum representation of a View model.
// It implements a basic reducer
// it might be saved in a database or in memory
type ReadModel struct {
@@ -29,10 +29,10 @@ func (rm *ReadModel) Reduce() error {
}
if rm.AggregateID == "" {
rm.AggregateID = rm.Events[0].AggregateID()
rm.AggregateID = rm.Events[0].Aggregate().ID
}
if rm.ResourceOwner == "" {
rm.ResourceOwner = rm.Events[0].ResourceOwner()
rm.ResourceOwner = rm.Events[0].Aggregate().ResourceOwner
}
if rm.CreationDate.IsZero() {

View File

@@ -93,7 +93,7 @@ func (factory *SearchQueryBuilder) build() (*repository.SearchQuery, error) {
if factory == nil ||
len(factory.aggregateTypes) < 1 ||
factory.columns.Validate() != nil {
return nil, errors.ThrowPreconditionFailed(nil, "MODEL-tGAD3", "factory invalid")
return nil, errors.ThrowPreconditionFailed(nil, "MODEL-4m9gs", "factory invalid")
}
filters := []*repository.Filter{
factory.aggregateTypeFilter(),

View File

@@ -488,7 +488,7 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
}
if !reflect.DeepEqual(query, tt.res.query) {
t.Errorf("NewSearchQueryFactory() = %+v, want %+v", factory, tt.res)
t.Errorf("NewSearchQueryFactory() = %+v, want %+v", factory, tt.res.query)
}
})
}

View File

@@ -15,9 +15,8 @@ type WriteModel struct {
//AppendEvents adds all the events to the read model.
// The function doesn't compute the new state of the read model
func (rm *WriteModel) AppendEvents(events ...EventReader) *WriteModel {
func (rm *WriteModel) AppendEvents(events ...EventReader) {
rm.Events = append(rm.Events, events...)
return rm
}
//Reduce is the basic implementaion of reducer
@@ -28,10 +27,10 @@ func (wm *WriteModel) Reduce() error {
}
if wm.AggregateID == "" {
wm.AggregateID = wm.Events[0].AggregateID()
wm.AggregateID = wm.Events[0].Aggregate().ID
}
if wm.ResourceOwner == "" {
wm.ResourceOwner = wm.Events[0].ResourceOwner()
wm.ResourceOwner = wm.Events[0].Aggregate().ResourceOwner
}
wm.ProcessedSequence = wm.Events[len(wm.Events)-1].Sequence()