diff --git a/internal/admin/repository/eventsourcing/handler/iam_member.go b/internal/admin/repository/eventsourcing/handler/iam_member.go index f46a361650..aceef8b553 100644 --- a/internal/admin/repository/eventsourcing/handler/iam_member.go +++ b/internal/admin/repository/eventsourcing/handler/iam_member.go @@ -38,8 +38,10 @@ func newIAMMember(handler handler) *IAMMember { func (m *IAMMember) subscribe() { m.subscription = m.es.Subscribe(m.AggregateTypes()...) + go func() { for event := range m.subscription.Events { + query.ReduceEvent(m, event) } }() diff --git a/internal/admin/repository/eventsourcing/handler/user.go b/internal/admin/repository/eventsourcing/handler/user.go index 89d658ae86..b4b445448b 100644 --- a/internal/admin/repository/eventsourcing/handler/user.go +++ b/internal/admin/repository/eventsourcing/handler/user.go @@ -30,7 +30,6 @@ const ( type User struct { handler - eventstore v1.Eventstore systemDefaults systemdefaults.SystemDefaults subscription *v1.Subscription } @@ -266,7 +265,7 @@ func (u *User) getOrgByID(ctx context.Context, orgID string) (*org_model.Org, er AggregateID: orgID, }, } - err = es_sdk.Filter(ctx, u.eventstore.FilterEvents, esOrg.AppendEvents, query) + err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, query) if err != nil && !errors.IsNotFound(err) { return nil, err } @@ -287,7 +286,7 @@ func (u *User) getIAMByID(ctx context.Context) (*iam_model.IAM, error) { AggregateID: domain.IAMID, }, } - err = es_sdk.Filter(ctx, u.eventstore.FilterEvents, iam.AppendEvents, query) + err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, iam.AppendEvents, query) if err != nil && caos_errs.IsNotFound(err) && iam.Sequence == 0 { return nil, err } diff --git a/internal/eventstore/event.go b/internal/eventstore/event.go index 775a3cc97c..578c047f49 100644 --- a/internal/eventstore/event.go +++ b/internal/eventstore/event.go @@ -36,4 +36,6 @@ type EventReader interface { Sequence() uint64 CreationDate() time.Time + //DataAsBytes returns the payload of the event. It represent the changed fields by the event + DataAsBytes() []byte } diff --git a/internal/eventstore/event_base.go b/internal/eventstore/event_base.go index cc771a89b7..9889a7cb92 100644 --- a/internal/eventstore/event_base.go +++ b/internal/eventstore/event_base.go @@ -22,6 +22,7 @@ type BaseEvent struct { User string `json:"-"` //Service is the service which created the event Service string `json:"-"` + Data []byte `json:"-"` } // EditorService implements EventPusher @@ -54,6 +55,11 @@ func (e *BaseEvent) Aggregate() Aggregate { return e.aggregate } +//Data returns the payload of the event. It represent the changed fields by the event +func (e *BaseEvent) DataAsBytes() []byte { + return e.Data +} + //BaseEventFromRepo maps a stored event to a BaseEvent func BaseEventFromRepo(event *repository.Event) *BaseEvent { return &BaseEvent{ @@ -68,6 +74,7 @@ func BaseEventFromRepo(event *repository.Event) *BaseEvent { sequence: event.Sequence, Service: event.EditorService, User: event.EditorUser, + Data: event.Data, } } diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 2a6c9c3256..00f81c3c7c 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -47,7 +47,14 @@ func (es *Eventstore) PushEvents(ctx context.Context, pushEvents ...EventPusher) if err != nil { return nil, err } - return es.mapEvents(events) + + eventReaders, err := es.mapEvents(events) + if err != nil { + return nil, err + } + + go notify(eventReaders) + return eventReaders, nil } func eventsToRepository(pushEvents []EventPusher) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) { diff --git a/internal/eventstore/subscription.go b/internal/eventstore/subscription.go new file mode 100644 index 0000000000..91d6d0b6be --- /dev/null +++ b/internal/eventstore/subscription.go @@ -0,0 +1,94 @@ +package eventstore + +import ( + v1 "github.com/caos/zitadel/internal/eventstore/v1" + "github.com/caos/zitadel/internal/eventstore/v1/models" + "sync" +) + +var ( + subscriptions = map[AggregateType][]*Subscription{} + subsMutext sync.Mutex +) + +type Subscription struct { + Events chan EventReader + aggregates []AggregateType +} + +func Subscribe(aggregates ...AggregateType) *Subscription { + events := make(chan EventReader, 100) + sub := &Subscription{ + Events: events, + aggregates: aggregates, + } + + subsMutext.Lock() + defer subsMutext.Unlock() + + for _, aggregate := range aggregates { + _, ok := subscriptions[aggregate] + if !ok { + subscriptions[aggregate] = make([]*Subscription, 0, 1) + } + subscriptions[aggregate] = append(subscriptions[aggregate], sub) + } + + return sub +} + +func notify(events []EventReader) { + go v1.Notify(MapEventsToV1Events(events)) + subsMutext.Lock() + defer subsMutext.Unlock() + for _, event := range events { + subs, ok := subscriptions[event.Aggregate().Typ] + if !ok { + continue + } + for _, sub := range subs { + sub.Events <- event + } + } +} + +func (s *Subscription) Unsubscribe() { + subsMutext.Lock() + defer subsMutext.Unlock() + for _, aggregate := range s.aggregates { + subs, ok := subscriptions[aggregate] + if !ok { + continue + } + for i := len(subs) - 1; i >= 0; i-- { + if subs[i] == s { + subs[i] = subs[len(subs)-1] + subs[len(subs)-1] = nil + subs = subs[:len(subs)-1] + } + } + } + close(s.Events) +} + +func MapEventsToV1Events(events []EventReader) []*models.Event { + v1Events := make([]*models.Event, len(events)) + for i, event := range events { + v1Events[i] = mapEventToV1Event(event) + } + return v1Events +} + +func mapEventToV1Event(event EventReader) *models.Event { + return &models.Event{ + Sequence: event.Sequence(), + CreationDate: event.CreationDate(), + Type: models.EventType(event.Type()), + AggregateType: models.AggregateType(event.Aggregate().Typ), + AggregateID: event.Aggregate().ID, + ResourceOwner: event.Aggregate().ResourceOwner, + EditorService: event.EditorService(), + EditorUser: event.EditorUser(), + Data: event.DataAsBytes(), + } +} diff --git a/internal/eventstore/v1/internal/repository/repository.go b/internal/eventstore/v1/internal/repository/repository.go index 2acc17f6e2..631a659956 100644 --- a/internal/eventstore/v1/internal/repository/repository.go +++ b/internal/eventstore/v1/internal/repository/repository.go @@ -9,9 +9,6 @@ import ( type Repository interface { Health(ctx context.Context) error - // PushEvents adds all events of the given aggregates to the eventstreams of the aggregates. - // This call is transaction save. The transaction will be rolled back if one event fails - PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) error // Filter returns all events matching the given search query Filter(ctx context.Context, searchQuery *models.SearchQueryFactory) (events []*models.Event, err error) //LatestSequence returns the latests sequence found by the the search query diff --git a/internal/eventstore/v1/internal/repository/sql/filter_test.go b/internal/eventstore/v1/internal/repository/sql/filter_test.go index b831be2e5e..382a832f25 100644 --- a/internal/eventstore/v1/internal/repository/sql/filter_test.go +++ b/internal/eventstore/v1/internal/repository/sql/filter_test.go @@ -10,6 +10,11 @@ import ( es_models "github.com/caos/zitadel/internal/eventstore/v1/models" ) +type mockEvents struct { + events []*es_models.Event + t *testing.T +} + func TestSQL_Filter(t *testing.T) { type fields struct { client *dbMock diff --git a/internal/eventstore/v1/internal/repository/sql/push.go b/internal/eventstore/v1/internal/repository/sql/push.go deleted file mode 100644 index 27d978a6f0..0000000000 --- a/internal/eventstore/v1/internal/repository/sql/push.go +++ /dev/null @@ -1,90 +0,0 @@ -package sql - -import ( - "context" - "database/sql" - "errors" - - "github.com/caos/logging" - caos_errs "github.com/caos/zitadel/internal/errors" - "github.com/caos/zitadel/internal/eventstore/v1/models" - "github.com/caos/zitadel/internal/telemetry/tracing" - "github.com/cockroachdb/cockroach-go/v2/crdb" -) - -const ( - insertStmt = "INSERT INTO eventstore.events " + - "(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " + - "SELECT $1, $2, $3, $4, COALESCE($5, now()), $6, $7, $8, $9, $10 " + - "WHERE EXISTS (" + - "SELECT 1 FROM eventstore.events WHERE aggregate_type = $11 AND aggregate_id = $12 HAVING MAX(event_sequence) = $13 OR ($14::BIGINT IS NULL AND COUNT(*) = 0)) " + - "RETURNING event_sequence, creation_date" -) - -func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) { - err = crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error { - stmt, err := tx.Prepare(insertStmt) - if err != nil { - tx.Rollback() - logging.Log("SQL-9ctx5").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("prepare failed") - return caos_errs.ThrowInternal(err, "SQL-juCgA", "prepare failed") - } - - for _, aggregate := range aggregates { - err = precondtion(tx, aggregate) - if err != nil { - tx.Rollback() - return err - } - err = insertEvents(stmt, Sequence(aggregate.PreviousSequence), aggregate.Events) - if err != nil { - tx.Rollback() - return err - } - } - return nil - }) - - if err != nil && !errors.Is(err, &caos_errs.CaosError{}) { - err = caos_errs.ThrowInternal(err, "SQL-DjgtG", "unable to store events") - } - - return err -} - -func precondtion(tx *sql.Tx, aggregate *models.Aggregate) error { - if aggregate.Precondition == nil { - return nil - } - events, err := filter(tx, models.FactoryFromSearchQuery(aggregate.Precondition.Query)) - if err != nil { - return caos_errs.ThrowPreconditionFailed(err, "SQL-oBPxB", "filter failed") - } - err = aggregate.Precondition.Validation(events...) - if err != nil { - return err - } - return nil -} - -func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Event) error { - for _, event := range events { - creationDate := sql.NullTime{Time: event.CreationDate, Valid: !event.CreationDate.IsZero()} - err := stmt.QueryRow(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, creationDate, Data(event.Data), event.EditorUser, event.EditorService, event.ResourceOwner, previousSequence, - event.AggregateType, event.AggregateID, previousSequence, previousSequence).Scan(&previousSequence, &event.CreationDate) - - if err != nil { - logging.LogWithFields("SQL-5M0sd", - "aggregate", event.AggregateType, - "previousSequence", previousSequence, - "aggregateId", event.AggregateID, - "aggregateType", event.AggregateType, - "eventType", event.Type).WithError(err).Info("query failed") - return caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event") - } - - event.Sequence = uint64(previousSequence) - } - - return nil -} diff --git a/internal/eventstore/v1/internal/repository/sql/push_test.go b/internal/eventstore/v1/internal/repository/sql/push_test.go deleted file mode 100644 index f2e78ba4cc..0000000000 --- a/internal/eventstore/v1/internal/repository/sql/push_test.go +++ /dev/null @@ -1,443 +0,0 @@ -package sql - -import ( - "context" - "database/sql" - "errors" - "reflect" - "runtime" - "testing" - - z_errors "github.com/caos/zitadel/internal/errors" - "github.com/caos/zitadel/internal/eventstore/v1/models" -) - -type mockEvents struct { - events []*models.Event - t *testing.T -} - -func TestSQL_PushAggregates(t *testing.T) { - type fields struct { - client *dbMock - } - type args struct { - aggregates []*models.Aggregate - } - tests := []struct { - name string - fields fields - args args - isError func(error) bool - shouldCheckEvents bool - }{ - { - name: "no aggregates", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectSavepoint(). - expectPrepareInsert(nil). - expectReleaseSavepoint(nil). - expectCommit(nil), - }, - args: args{aggregates: []*models.Aggregate{}}, - shouldCheckEvents: false, - isError: noErr, - }, - { - name: "prepare fails", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectSavepoint(). - expectPrepareInsert(sql.ErrConnDone). - expectReleaseSavepoint(nil). - expectCommit(nil), - }, - args: args{aggregates: []*models.Aggregate{}}, - shouldCheckEvents: false, - isError: func(err error) bool { return errors.Is(err, sql.ErrConnDone) }, - }, - { - name: "no aggregates release fails", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectSavepoint(). - expectPrepareInsert(nil). - expectReleaseSavepoint(sql.ErrConnDone). - expectCommit(nil), - }, - - args: args{aggregates: []*models.Aggregate{}}, - isError: z_errors.IsInternal, - shouldCheckEvents: false, - }, - { - name: "aggregate precondtion fails", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectSavepoint(). - expectPrepareInsert(nil). - expectFilterEventsError(z_errors.CreateCaosError(nil, "SQL-IzJOf", "err")). - expectRollback(nil), - }, - - args: args{aggregates: []*models.Aggregate{aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(1), nil)}}, - isError: z_errors.IsPreconditionFailed, - shouldCheckEvents: false, - }, - { - name: "one aggregate two events success", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectSavepoint(). - expectPrepareInsert(nil). - expectInsertEvent(&models.Event{ - AggregateID: "aggID", - AggregateType: "aggType", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - PreviousSequence: 34, - Type: "eventTyp", - AggregateVersion: "v0.0.1", - }, 45). - expectInsertEvent(&models.Event{ - AggregateID: "aggID", - AggregateType: "aggType", - EditorService: "svc2", - EditorUser: "usr2", - ResourceOwner: "ro2", - PreviousSequence: 45, - Type: "eventTyp", - AggregateVersion: "v0.0.1", - }, 46). - expectReleaseSavepoint(nil). - expectCommit(nil), - }, - args: args{ - aggregates: []*models.Aggregate{ - { - PreviousSequence: 34, - Events: []*models.Event{ - { - AggregateID: "aggID", - AggregateType: "aggType", - AggregateVersion: "v0.0.1", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - Type: "eventTyp", - }, - { - AggregateID: "aggID", - AggregateType: "aggType", - AggregateVersion: "v0.0.1", - EditorService: "svc2", - EditorUser: "usr2", - ResourceOwner: "ro2", - Type: "eventTyp", - }, - }, - }, - }, - }, - shouldCheckEvents: true, - isError: noErr, - }, - { - name: "two aggregates one event per aggregate success", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectSavepoint(). - expectPrepareInsert(nil). - expectInsertEvent(&models.Event{ - AggregateID: "aggID", - AggregateType: "aggType", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - PreviousSequence: 34, - Type: "eventTyp", - AggregateVersion: "v0.0.1", - }, 47). - expectInsertEvent(&models.Event{ - AggregateID: "aggID2", - AggregateType: "aggType2", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - PreviousSequence: 40, - Type: "eventTyp", - AggregateVersion: "v0.0.1", - }, 48). - expectReleaseSavepoint(nil). - expectCommit(nil), - }, - args: args{ - aggregates: []*models.Aggregate{ - { - PreviousSequence: 34, - Events: []*models.Event{ - { - AggregateID: "aggID", - AggregateType: "aggType", - AggregateVersion: "v0.0.1", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - Type: "eventTyp", - }, - }, - }, - { - PreviousSequence: 40, - Events: []*models.Event{ - { - AggregateID: "aggID2", - AggregateType: "aggType2", - AggregateVersion: "v0.0.1", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - Type: "eventTyp", - }, - }, - }, - }, - }, - shouldCheckEvents: true, - isError: noErr, - }, - { - name: "first event fails no action with second event", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectSavepoint(). - expectPrepareInsert(nil). - expectInsertEventError(&models.Event{ - AggregateID: "aggID", - AggregateType: "aggType", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - PreviousSequence: 34, - Data: []byte("{}"), - Type: "eventTyp", - AggregateVersion: "v0.0.1", - }). - expectReleaseSavepoint(nil). - expectRollback(nil), - }, - args: args{ - aggregates: []*models.Aggregate{ - { - Events: []*models.Event{ - { - AggregateID: "aggID", - AggregateType: "aggType", - AggregateVersion: "v0.0.1", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - Type: "eventTyp", - PreviousSequence: 34, - }, - { - AggregateID: "aggID", - AggregateType: "aggType", - AggregateVersion: "v0.0.1", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - Type: "eventTyp", - PreviousSequence: 0, - }, - }, - }, - }, - }, - isError: z_errors.IsInternal, - shouldCheckEvents: false, - }, - { - name: "one event, release savepoint fails", - fields: fields{ - client: mockDB(t). - expectBegin(nil). - expectPrepareInsert(nil). - expectSavepoint(). - expectInsertEvent(&models.Event{ - AggregateID: "aggID", - AggregateType: "aggType", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - PreviousSequence: 34, - Type: "eventTyp", - Data: []byte("{}"), - AggregateVersion: "v0.0.1", - }, 47). - expectReleaseSavepoint(sql.ErrConnDone). - expectCommit(nil). - expectRollback(nil), - }, - args: args{ - aggregates: []*models.Aggregate{ - { - Events: []*models.Event{ - { - AggregateID: "aggID", - AggregateType: "aggType", - AggregateVersion: "v0.0.1", - EditorService: "svc", - EditorUser: "usr", - ResourceOwner: "ro", - Type: "eventTyp", - PreviousSequence: 34, - }, - }, - }, - }, - }, - isError: z_errors.IsInternal, - shouldCheckEvents: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - sql := &SQL{ - client: tt.fields.client.sqlClient, - } - err := sql.PushAggregates(context.Background(), tt.args.aggregates...) - if err != nil && !tt.isError(err) { - t.Errorf("wrong error type = %v, errFunc %s", err, functionName(tt.isError)) - } - if !tt.shouldCheckEvents { - return - } - for _, aggregate := range tt.args.aggregates { - for _, event := range aggregate.Events { - if event.Sequence == 0 { - t.Error("sequence of returned event is not set") - } - if event.AggregateType == "" || event.AggregateID == "" { - t.Error("aggregate of event is not set") - } - } - } - if err := tt.fields.client.mock.ExpectationsWereMet(); err != nil { - t.Errorf("not all database expectaions met: %s", err) - } - }) - } -} - -func noErr(err error) bool { - return err == nil -} - -func functionName(i interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() -} - -func Test_precondtion(t *testing.T) { - type fields struct { - client *dbMock - } - type args struct { - aggregate *models.Aggregate - } - tests := []struct { - name string - fields fields - args args - isErr func(error) bool - }{ - { - name: "no precondition", - fields: fields{ - client: mockDB(t). - expectBegin(nil), - }, - args: args{ - aggregate: &models.Aggregate{}, - }, - }, - { - name: "precondition fails", - fields: fields{ - client: mockDB(t). - expectBegin(nil).expectFilterEventsLimit("test", 5, 0), - }, - args: args{ - aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(z_errors.ThrowPreconditionFailed(nil, "SQL-LBIKm", "err"))), - }, - isErr: z_errors.IsPreconditionFailed, - }, - { - name: "precondition with filter error", - fields: fields{ - client: mockDB(t). - expectBegin(nil).expectFilterEventsError(z_errors.ThrowInternal(nil, "SQL-ac9EW", "err")), - }, - args: args{ - aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(z_errors.CreateCaosError(nil, "SQL-LBIKm", "err"))), - }, - isErr: z_errors.IsPreconditionFailed, - }, - { - name: "precondition no events", - fields: fields{ - client: mockDB(t). - expectBegin(nil).expectFilterEventsLimit("test", 5, 0), - }, - args: args{ - aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(nil)), - }, - }, - { - name: "precondition with events", - fields: fields{ - client: mockDB(t). - expectBegin(nil).expectFilterEventsLimit("test", 5, 3), - }, - args: args{ - aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(nil)), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tx, err := tt.fields.client.sqlClient.Begin() - if err != nil { - t.Errorf("unable to start tx %v", err) - t.FailNow() - } - err = precondtion(tx, tt.args.aggregate) - if (err != nil) && (tt.isErr == nil) { - t.Errorf("no error expected got: %v", err) - } - if tt.isErr != nil && !tt.isErr(err) { - t.Errorf("precondtion() wrong error %T, %v", err, err) - } - }) - } -} - -func aggregateWithPrecondition(aggregate *models.Aggregate, query *models.SearchQuery, precondition func(...*models.Event) error) *models.Aggregate { - aggregate.SetPrecondition(query, precondition) - return aggregate -} - -func validationFunc(err error) func(events ...*models.Event) error { - return func(events ...*models.Event) error { - return err - } -} diff --git a/internal/eventstore/v1/sdk/sdk.go b/internal/eventstore/v1/sdk/sdk.go index 9414c385a8..4e23f69f4a 100644 --- a/internal/eventstore/v1/sdk/sdk.go +++ b/internal/eventstore/v1/sdk/sdk.go @@ -10,7 +10,6 @@ import ( type filterFunc func(context.Context, *es_models.SearchQuery) ([]*es_models.Event, error) type appendFunc func(...*es_models.Event) error type AggregateFunc func(context.Context) (*es_models.Aggregate, error) -type pushFunc func(context.Context, ...*es_models.Aggregate) error func Filter(ctx context.Context, filter filterFunc, appender appendFunc, query *es_models.SearchQuery) error { events, err := filter(ctx, query) @@ -26,62 +25,3 @@ func Filter(ctx context.Context, filter filterFunc, appender appendFunc, query * } return nil } - -// Push is Deprecated use PushAggregates -// Push creates the aggregates from aggregater -// and pushes the aggregates to the given pushFunc -// the given events are appended by the appender -func Push(ctx context.Context, push pushFunc, appender appendFunc, aggregaters ...AggregateFunc) (err error) { - if len(aggregaters) < 1 { - return errors.ThrowPreconditionFailed(nil, "SDK-q9wjp", "Errors.Internal") - } - - aggregates, err := makeAggregates(ctx, aggregaters) - if err != nil { - return err - } - - err = push(ctx, aggregates...) - if err != nil { - return err - } - - return appendAggregates(appender, aggregates) -} - -func PushAggregates(ctx context.Context, push pushFunc, appender appendFunc, aggregates ...*es_models.Aggregate) (err error) { - if len(aggregates) < 1 { - return errors.ThrowPreconditionFailed(nil, "SDK-q9wjp", "Errors.Internal") - } - - err = push(ctx, aggregates...) - if err != nil { - return err - } - if appender == nil { - return nil - } - - return appendAggregates(appender, aggregates) -} - -func appendAggregates(appender appendFunc, aggregates []*es_models.Aggregate) error { - for _, aggregate := range aggregates { - err := appender(aggregate.Events...) - if err != nil { - return ThrowAppendEventError(err, "SDK-o6kzK", "Errors.Internal") - } - } - return nil -} - -func makeAggregates(ctx context.Context, aggregaters []AggregateFunc) (aggregates []*es_models.Aggregate, err error) { - aggregates = make([]*es_models.Aggregate, len(aggregaters)) - for i, aggregater := range aggregaters { - aggregates[i], err = aggregater(ctx) - if err != nil { - return nil, err - } - } - return aggregates, nil -} diff --git a/internal/eventstore/v1/sdk/sdk_test.go b/internal/eventstore/v1/sdk/sdk_test.go index 9250659679..516cc381ca 100644 --- a/internal/eventstore/v1/sdk/sdk_test.go +++ b/internal/eventstore/v1/sdk/sdk_test.go @@ -75,119 +75,3 @@ func TestFilter(t *testing.T) { }) } } - -func TestPush(t *testing.T) { - type args struct { - push pushFunc - appender appendFunc - aggregaters []AggregateFunc - } - tests := []struct { - name string - args args - wantErr func(error) bool - }{ - { - name: "no aggregates", - args: args{ - push: nil, - appender: nil, - aggregaters: nil, - }, - wantErr: errors.IsPreconditionFailed, - }, - { - name: "aggregater fails", - args: args{ - push: nil, - appender: nil, - aggregaters: []AggregateFunc{ - func(context.Context) (*es_models.Aggregate, error) { - return nil, errors.ThrowInternal(nil, "SDK-Ec5x2", "test err") - }, - }, - }, - wantErr: errors.IsInternal, - }, - { - name: "push fails", - args: args{ - push: func(context.Context, ...*es_models.Aggregate) error { - return errors.ThrowInternal(nil, "SDK-0g4gW", "test error") - }, - appender: nil, - aggregaters: []AggregateFunc{ - func(context.Context) (*es_models.Aggregate, error) { - return &es_models.Aggregate{}, nil - }, - }, - }, - wantErr: errors.IsInternal, - }, - { - name: "append aggregates fails", - args: args{ - push: func(context.Context, ...*es_models.Aggregate) error { - return nil - }, - appender: func(...*es_models.Event) error { - return errors.ThrowInvalidArgument(nil, "SDK-BDhcT", "test err") - }, - aggregaters: []AggregateFunc{ - func(context.Context) (*es_models.Aggregate, error) { - return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil - }, - }, - }, - wantErr: IsAppendEventError, - }, - { - name: "correct one aggregate", - args: args{ - push: func(context.Context, ...*es_models.Aggregate) error { - return nil - }, - appender: func(...*es_models.Event) error { - return nil - }, - aggregaters: []AggregateFunc{ - func(context.Context) (*es_models.Aggregate, error) { - return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil - }, - }, - }, - wantErr: nil, - }, - { - name: "correct multiple aggregate", - args: args{ - push: func(context.Context, ...*es_models.Aggregate) error { - return nil - }, - appender: func(...*es_models.Event) error { - return nil - }, - aggregaters: []AggregateFunc{ - func(context.Context) (*es_models.Aggregate, error) { - return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil - }, - func(context.Context) (*es_models.Aggregate, error) { - return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil - }, - }, - }, - wantErr: nil, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := Push(context.Background(), tt.args.push, tt.args.appender, tt.args.aggregaters...) - if tt.wantErr == nil && err != nil { - t.Errorf("no error expected %v", err) - } - if tt.wantErr != nil && !tt.wantErr(err) { - t.Errorf("no error has wrong type %v", err) - } - }) - } -} diff --git a/internal/eventstore/v1/subscription.go b/internal/eventstore/v1/subscription.go index 093f47fb8f..ed5596ba7e 100644 --- a/internal/eventstore/v1/subscription.go +++ b/internal/eventstore/v1/subscription.go @@ -37,18 +37,16 @@ func (es *eventstore) Subscribe(aggregates ...models.AggregateType) *Subscriptio return sub } -func notify(aggregates []*models.Aggregate) { +func Notify(events []*models.Event) { subsMutext.Lock() defer subsMutext.Unlock() - for _, aggregate := range aggregates { - subs, ok := subscriptions[aggregate.Type()] + for _, event := range events { + subs, ok := subscriptions[event.AggregateType] if !ok { continue } for _, sub := range subs { - for _, event := range aggregate.Events { - sub.Events <- event - } + sub.Events <- event } } } diff --git a/internal/org/repository/eventsourcing/model/org.go b/internal/org/repository/eventsourcing/model/org.go index 86e89188a5..0880add1ce 100644 --- a/internal/org/repository/eventsourcing/model/org.go +++ b/internal/org/repository/eventsourcing/model/org.go @@ -127,7 +127,6 @@ func (o *Org) AppendEvents(events ...*es_models.Event) error { func (o *Org) AppendEvent(event *es_models.Event) (err error) { switch event.Type { case OrgAdded: - *o = Org{} err = o.setData(event) if err != nil { return err