feat: pub sub (#1341)

* fix: pub sub

* fix: adaot config to commands (and queries)

* remove dependency on vv2 in v1

* fix: pub sub in new eventstore

* fix tests

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Fabi 2021-02-24 13:24:33 +01:00 committed by GitHub
parent 8dcbbc87ca
commit c0f55e7209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 124 additions and 723 deletions

View File

@ -38,8 +38,10 @@ func newIAMMember(handler handler) *IAMMember {
func (m *IAMMember) subscribe() {
m.subscription = m.es.Subscribe(m.AggregateTypes()...)
go func() {
for event := range m.subscription.Events {
query.ReduceEvent(m, event)
}
}()

View File

@ -30,7 +30,6 @@ const (
type User struct {
handler
eventstore v1.Eventstore
systemDefaults systemdefaults.SystemDefaults
subscription *v1.Subscription
}
@ -266,7 +265,7 @@ func (u *User) getOrgByID(ctx context.Context, orgID string) (*org_model.Org, er
AggregateID: orgID,
},
}
err = es_sdk.Filter(ctx, u.eventstore.FilterEvents, esOrg.AppendEvents, query)
err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, query)
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
@ -287,7 +286,7 @@ func (u *User) getIAMByID(ctx context.Context) (*iam_model.IAM, error) {
AggregateID: domain.IAMID,
},
}
err = es_sdk.Filter(ctx, u.eventstore.FilterEvents, iam.AppendEvents, query)
err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, iam.AppendEvents, query)
if err != nil && caos_errs.IsNotFound(err) && iam.Sequence == 0 {
return nil, err
}

View File

@ -36,4 +36,6 @@ type EventReader interface {
Sequence() uint64
CreationDate() time.Time
//DataAsBytes returns the payload of the event. It represent the changed fields by the event
DataAsBytes() []byte
}

View File

@ -22,6 +22,7 @@ type BaseEvent struct {
User string `json:"-"`
//Service is the service which created the event
Service string `json:"-"`
Data []byte `json:"-"`
}
// EditorService implements EventPusher
@ -54,6 +55,11 @@ func (e *BaseEvent) Aggregate() Aggregate {
return e.aggregate
}
//Data returns the payload of the event. It represent the changed fields by the event
func (e *BaseEvent) DataAsBytes() []byte {
return e.Data
}
//BaseEventFromRepo maps a stored event to a BaseEvent
func BaseEventFromRepo(event *repository.Event) *BaseEvent {
return &BaseEvent{
@ -68,6 +74,7 @@ func BaseEventFromRepo(event *repository.Event) *BaseEvent {
sequence: event.Sequence,
Service: event.EditorService,
User: event.EditorUser,
Data: event.Data,
}
}

View File

@ -47,7 +47,14 @@ func (es *Eventstore) PushEvents(ctx context.Context, pushEvents ...EventPusher)
if err != nil {
return nil, err
}
return es.mapEvents(events)
eventReaders, err := es.mapEvents(events)
if err != nil {
return nil, err
}
go notify(eventReaders)
return eventReaders, nil
}
func eventsToRepository(pushEvents []EventPusher) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) {

View File

@ -0,0 +1,94 @@
package eventstore
import (
v1 "github.com/caos/zitadel/internal/eventstore/v1"
"github.com/caos/zitadel/internal/eventstore/v1/models"
"sync"
)
var (
subscriptions = map[AggregateType][]*Subscription{}
subsMutext sync.Mutex
)
type Subscription struct {
Events chan EventReader
aggregates []AggregateType
}
func Subscribe(aggregates ...AggregateType) *Subscription {
events := make(chan EventReader, 100)
sub := &Subscription{
Events: events,
aggregates: aggregates,
}
subsMutext.Lock()
defer subsMutext.Unlock()
for _, aggregate := range aggregates {
_, ok := subscriptions[aggregate]
if !ok {
subscriptions[aggregate] = make([]*Subscription, 0, 1)
}
subscriptions[aggregate] = append(subscriptions[aggregate], sub)
}
return sub
}
func notify(events []EventReader) {
go v1.Notify(MapEventsToV1Events(events))
subsMutext.Lock()
defer subsMutext.Unlock()
for _, event := range events {
subs, ok := subscriptions[event.Aggregate().Typ]
if !ok {
continue
}
for _, sub := range subs {
sub.Events <- event
}
}
}
func (s *Subscription) Unsubscribe() {
subsMutext.Lock()
defer subsMutext.Unlock()
for _, aggregate := range s.aggregates {
subs, ok := subscriptions[aggregate]
if !ok {
continue
}
for i := len(subs) - 1; i >= 0; i-- {
if subs[i] == s {
subs[i] = subs[len(subs)-1]
subs[len(subs)-1] = nil
subs = subs[:len(subs)-1]
}
}
}
close(s.Events)
}
func MapEventsToV1Events(events []EventReader) []*models.Event {
v1Events := make([]*models.Event, len(events))
for i, event := range events {
v1Events[i] = mapEventToV1Event(event)
}
return v1Events
}
func mapEventToV1Event(event EventReader) *models.Event {
return &models.Event{
Sequence: event.Sequence(),
CreationDate: event.CreationDate(),
Type: models.EventType(event.Type()),
AggregateType: models.AggregateType(event.Aggregate().Typ),
AggregateID: event.Aggregate().ID,
ResourceOwner: event.Aggregate().ResourceOwner,
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Data: event.DataAsBytes(),
}
}

View File

@ -9,9 +9,6 @@ import (
type Repository interface {
Health(ctx context.Context) error
// PushEvents adds all events of the given aggregates to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) error
// Filter returns all events matching the given search query
Filter(ctx context.Context, searchQuery *models.SearchQueryFactory) (events []*models.Event, err error)
//LatestSequence returns the latests sequence found by the the search query

View File

@ -10,6 +10,11 @@ import (
es_models "github.com/caos/zitadel/internal/eventstore/v1/models"
)
type mockEvents struct {
events []*es_models.Event
t *testing.T
}
func TestSQL_Filter(t *testing.T) {
type fields struct {
client *dbMock

View File

@ -1,90 +0,0 @@
package sql
import (
"context"
"database/sql"
"errors"
"github.com/caos/logging"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v1/models"
"github.com/caos/zitadel/internal/telemetry/tracing"
"github.com/cockroachdb/cockroach-go/v2/crdb"
)
const (
insertStmt = "INSERT INTO eventstore.events " +
"(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " +
"SELECT $1, $2, $3, $4, COALESCE($5, now()), $6, $7, $8, $9, $10 " +
"WHERE EXISTS (" +
"SELECT 1 FROM eventstore.events WHERE aggregate_type = $11 AND aggregate_id = $12 HAVING MAX(event_sequence) = $13 OR ($14::BIGINT IS NULL AND COUNT(*) = 0)) " +
"RETURNING event_sequence, creation_date"
)
func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) {
err = crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
stmt, err := tx.Prepare(insertStmt)
if err != nil {
tx.Rollback()
logging.Log("SQL-9ctx5").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("prepare failed")
return caos_errs.ThrowInternal(err, "SQL-juCgA", "prepare failed")
}
for _, aggregate := range aggregates {
err = precondtion(tx, aggregate)
if err != nil {
tx.Rollback()
return err
}
err = insertEvents(stmt, Sequence(aggregate.PreviousSequence), aggregate.Events)
if err != nil {
tx.Rollback()
return err
}
}
return nil
})
if err != nil && !errors.Is(err, &caos_errs.CaosError{}) {
err = caos_errs.ThrowInternal(err, "SQL-DjgtG", "unable to store events")
}
return err
}
func precondtion(tx *sql.Tx, aggregate *models.Aggregate) error {
if aggregate.Precondition == nil {
return nil
}
events, err := filter(tx, models.FactoryFromSearchQuery(aggregate.Precondition.Query))
if err != nil {
return caos_errs.ThrowPreconditionFailed(err, "SQL-oBPxB", "filter failed")
}
err = aggregate.Precondition.Validation(events...)
if err != nil {
return err
}
return nil
}
func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Event) error {
for _, event := range events {
creationDate := sql.NullTime{Time: event.CreationDate, Valid: !event.CreationDate.IsZero()}
err := stmt.QueryRow(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, creationDate, Data(event.Data), event.EditorUser, event.EditorService, event.ResourceOwner, previousSequence,
event.AggregateType, event.AggregateID, previousSequence, previousSequence).Scan(&previousSequence, &event.CreationDate)
if err != nil {
logging.LogWithFields("SQL-5M0sd",
"aggregate", event.AggregateType,
"previousSequence", previousSequence,
"aggregateId", event.AggregateID,
"aggregateType", event.AggregateType,
"eventType", event.Type).WithError(err).Info("query failed")
return caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event")
}
event.Sequence = uint64(previousSequence)
}
return nil
}

View File

@ -1,443 +0,0 @@
package sql
import (
"context"
"database/sql"
"errors"
"reflect"
"runtime"
"testing"
z_errors "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v1/models"
)
type mockEvents struct {
events []*models.Event
t *testing.T
}
func TestSQL_PushAggregates(t *testing.T) {
type fields struct {
client *dbMock
}
type args struct {
aggregates []*models.Aggregate
}
tests := []struct {
name string
fields fields
args args
isError func(error) bool
shouldCheckEvents bool
}{
{
name: "no aggregates",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert(nil).
expectReleaseSavepoint(nil).
expectCommit(nil),
},
args: args{aggregates: []*models.Aggregate{}},
shouldCheckEvents: false,
isError: noErr,
},
{
name: "prepare fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert(sql.ErrConnDone).
expectReleaseSavepoint(nil).
expectCommit(nil),
},
args: args{aggregates: []*models.Aggregate{}},
shouldCheckEvents: false,
isError: func(err error) bool { return errors.Is(err, sql.ErrConnDone) },
},
{
name: "no aggregates release fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert(nil).
expectReleaseSavepoint(sql.ErrConnDone).
expectCommit(nil),
},
args: args{aggregates: []*models.Aggregate{}},
isError: z_errors.IsInternal,
shouldCheckEvents: false,
},
{
name: "aggregate precondtion fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert(nil).
expectFilterEventsError(z_errors.CreateCaosError(nil, "SQL-IzJOf", "err")).
expectRollback(nil),
},
args: args{aggregates: []*models.Aggregate{aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(1), nil)}},
isError: z_errors.IsPreconditionFailed,
shouldCheckEvents: false,
},
{
name: "one aggregate two events success",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert(nil).
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}, 45).
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc2",
EditorUser: "usr2",
ResourceOwner: "ro2",
PreviousSequence: 45,
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}, 46).
expectReleaseSavepoint(nil).
expectCommit(nil),
},
args: args{
aggregates: []*models.Aggregate{
{
PreviousSequence: 34,
Events: []*models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
},
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc2",
EditorUser: "usr2",
ResourceOwner: "ro2",
Type: "eventTyp",
},
},
},
},
},
shouldCheckEvents: true,
isError: noErr,
},
{
name: "two aggregates one event per aggregate success",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert(nil).
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}, 47).
expectInsertEvent(&models.Event{
AggregateID: "aggID2",
AggregateType: "aggType2",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 40,
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}, 48).
expectReleaseSavepoint(nil).
expectCommit(nil),
},
args: args{
aggregates: []*models.Aggregate{
{
PreviousSequence: 34,
Events: []*models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
},
},
},
{
PreviousSequence: 40,
Events: []*models.Event{
{
AggregateID: "aggID2",
AggregateType: "aggType2",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
},
},
},
},
},
shouldCheckEvents: true,
isError: noErr,
},
{
name: "first event fails no action with second event",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert(nil).
expectInsertEventError(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Data: []byte("{}"),
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}).
expectReleaseSavepoint(nil).
expectRollback(nil),
},
args: args{
aggregates: []*models.Aggregate{
{
Events: []*models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 0,
},
},
},
},
},
isError: z_errors.IsInternal,
shouldCheckEvents: false,
},
{
name: "one event, release savepoint fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectPrepareInsert(nil).
expectSavepoint().
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Type: "eventTyp",
Data: []byte("{}"),
AggregateVersion: "v0.0.1",
}, 47).
expectReleaseSavepoint(sql.ErrConnDone).
expectCommit(nil).
expectRollback(nil),
},
args: args{
aggregates: []*models.Aggregate{
{
Events: []*models.Event{
{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
},
},
},
},
isError: z_errors.IsInternal,
shouldCheckEvents: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sql := &SQL{
client: tt.fields.client.sqlClient,
}
err := sql.PushAggregates(context.Background(), tt.args.aggregates...)
if err != nil && !tt.isError(err) {
t.Errorf("wrong error type = %v, errFunc %s", err, functionName(tt.isError))
}
if !tt.shouldCheckEvents {
return
}
for _, aggregate := range tt.args.aggregates {
for _, event := range aggregate.Events {
if event.Sequence == 0 {
t.Error("sequence of returned event is not set")
}
if event.AggregateType == "" || event.AggregateID == "" {
t.Error("aggregate of event is not set")
}
}
}
if err := tt.fields.client.mock.ExpectationsWereMet(); err != nil {
t.Errorf("not all database expectaions met: %s", err)
}
})
}
}
func noErr(err error) bool {
return err == nil
}
func functionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}
func Test_precondtion(t *testing.T) {
type fields struct {
client *dbMock
}
type args struct {
aggregate *models.Aggregate
}
tests := []struct {
name string
fields fields
args args
isErr func(error) bool
}{
{
name: "no precondition",
fields: fields{
client: mockDB(t).
expectBegin(nil),
},
args: args{
aggregate: &models.Aggregate{},
},
},
{
name: "precondition fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsLimit("test", 5, 0),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(z_errors.ThrowPreconditionFailed(nil, "SQL-LBIKm", "err"))),
},
isErr: z_errors.IsPreconditionFailed,
},
{
name: "precondition with filter error",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsError(z_errors.ThrowInternal(nil, "SQL-ac9EW", "err")),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(z_errors.CreateCaosError(nil, "SQL-LBIKm", "err"))),
},
isErr: z_errors.IsPreconditionFailed,
},
{
name: "precondition no events",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsLimit("test", 5, 0),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(nil)),
},
},
{
name: "precondition with events",
fields: fields{
client: mockDB(t).
expectBegin(nil).expectFilterEventsLimit("test", 5, 3),
},
args: args{
aggregate: aggregateWithPrecondition(&models.Aggregate{}, models.NewSearchQuery().SetLimit(5).AggregateTypeFilter("test"), validationFunc(nil)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tx, err := tt.fields.client.sqlClient.Begin()
if err != nil {
t.Errorf("unable to start tx %v", err)
t.FailNow()
}
err = precondtion(tx, tt.args.aggregate)
if (err != nil) && (tt.isErr == nil) {
t.Errorf("no error expected got: %v", err)
}
if tt.isErr != nil && !tt.isErr(err) {
t.Errorf("precondtion() wrong error %T, %v", err, err)
}
})
}
}
func aggregateWithPrecondition(aggregate *models.Aggregate, query *models.SearchQuery, precondition func(...*models.Event) error) *models.Aggregate {
aggregate.SetPrecondition(query, precondition)
return aggregate
}
func validationFunc(err error) func(events ...*models.Event) error {
return func(events ...*models.Event) error {
return err
}
}

View File

@ -10,7 +10,6 @@ import (
type filterFunc func(context.Context, *es_models.SearchQuery) ([]*es_models.Event, error)
type appendFunc func(...*es_models.Event) error
type AggregateFunc func(context.Context) (*es_models.Aggregate, error)
type pushFunc func(context.Context, ...*es_models.Aggregate) error
func Filter(ctx context.Context, filter filterFunc, appender appendFunc, query *es_models.SearchQuery) error {
events, err := filter(ctx, query)
@ -26,62 +25,3 @@ func Filter(ctx context.Context, filter filterFunc, appender appendFunc, query *
}
return nil
}
// Push is Deprecated use PushAggregates
// Push creates the aggregates from aggregater
// and pushes the aggregates to the given pushFunc
// the given events are appended by the appender
func Push(ctx context.Context, push pushFunc, appender appendFunc, aggregaters ...AggregateFunc) (err error) {
if len(aggregaters) < 1 {
return errors.ThrowPreconditionFailed(nil, "SDK-q9wjp", "Errors.Internal")
}
aggregates, err := makeAggregates(ctx, aggregaters)
if err != nil {
return err
}
err = push(ctx, aggregates...)
if err != nil {
return err
}
return appendAggregates(appender, aggregates)
}
func PushAggregates(ctx context.Context, push pushFunc, appender appendFunc, aggregates ...*es_models.Aggregate) (err error) {
if len(aggregates) < 1 {
return errors.ThrowPreconditionFailed(nil, "SDK-q9wjp", "Errors.Internal")
}
err = push(ctx, aggregates...)
if err != nil {
return err
}
if appender == nil {
return nil
}
return appendAggregates(appender, aggregates)
}
func appendAggregates(appender appendFunc, aggregates []*es_models.Aggregate) error {
for _, aggregate := range aggregates {
err := appender(aggregate.Events...)
if err != nil {
return ThrowAppendEventError(err, "SDK-o6kzK", "Errors.Internal")
}
}
return nil
}
func makeAggregates(ctx context.Context, aggregaters []AggregateFunc) (aggregates []*es_models.Aggregate, err error) {
aggregates = make([]*es_models.Aggregate, len(aggregaters))
for i, aggregater := range aggregaters {
aggregates[i], err = aggregater(ctx)
if err != nil {
return nil, err
}
}
return aggregates, nil
}

View File

@ -75,119 +75,3 @@ func TestFilter(t *testing.T) {
})
}
}
func TestPush(t *testing.T) {
type args struct {
push pushFunc
appender appendFunc
aggregaters []AggregateFunc
}
tests := []struct {
name string
args args
wantErr func(error) bool
}{
{
name: "no aggregates",
args: args{
push: nil,
appender: nil,
aggregaters: nil,
},
wantErr: errors.IsPreconditionFailed,
},
{
name: "aggregater fails",
args: args{
push: nil,
appender: nil,
aggregaters: []AggregateFunc{
func(context.Context) (*es_models.Aggregate, error) {
return nil, errors.ThrowInternal(nil, "SDK-Ec5x2", "test err")
},
},
},
wantErr: errors.IsInternal,
},
{
name: "push fails",
args: args{
push: func(context.Context, ...*es_models.Aggregate) error {
return errors.ThrowInternal(nil, "SDK-0g4gW", "test error")
},
appender: nil,
aggregaters: []AggregateFunc{
func(context.Context) (*es_models.Aggregate, error) {
return &es_models.Aggregate{}, nil
},
},
},
wantErr: errors.IsInternal,
},
{
name: "append aggregates fails",
args: args{
push: func(context.Context, ...*es_models.Aggregate) error {
return nil
},
appender: func(...*es_models.Event) error {
return errors.ThrowInvalidArgument(nil, "SDK-BDhcT", "test err")
},
aggregaters: []AggregateFunc{
func(context.Context) (*es_models.Aggregate, error) {
return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil
},
},
},
wantErr: IsAppendEventError,
},
{
name: "correct one aggregate",
args: args{
push: func(context.Context, ...*es_models.Aggregate) error {
return nil
},
appender: func(...*es_models.Event) error {
return nil
},
aggregaters: []AggregateFunc{
func(context.Context) (*es_models.Aggregate, error) {
return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil
},
},
},
wantErr: nil,
},
{
name: "correct multiple aggregate",
args: args{
push: func(context.Context, ...*es_models.Aggregate) error {
return nil
},
appender: func(...*es_models.Event) error {
return nil
},
aggregaters: []AggregateFunc{
func(context.Context) (*es_models.Aggregate, error) {
return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil
},
func(context.Context) (*es_models.Aggregate, error) {
return &es_models.Aggregate{Events: []*es_models.Event{&es_models.Event{}}}, nil
},
},
},
wantErr: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := Push(context.Background(), tt.args.push, tt.args.appender, tt.args.aggregaters...)
if tt.wantErr == nil && err != nil {
t.Errorf("no error expected %v", err)
}
if tt.wantErr != nil && !tt.wantErr(err) {
t.Errorf("no error has wrong type %v", err)
}
})
}
}

View File

@ -37,18 +37,16 @@ func (es *eventstore) Subscribe(aggregates ...models.AggregateType) *Subscriptio
return sub
}
func notify(aggregates []*models.Aggregate) {
func Notify(events []*models.Event) {
subsMutext.Lock()
defer subsMutext.Unlock()
for _, aggregate := range aggregates {
subs, ok := subscriptions[aggregate.Type()]
for _, event := range events {
subs, ok := subscriptions[event.AggregateType]
if !ok {
continue
}
for _, sub := range subs {
for _, event := range aggregate.Events {
sub.Events <- event
}
sub.Events <- event
}
}
}

View File

@ -127,7 +127,6 @@ func (o *Org) AppendEvents(events ...*es_models.Event) error {
func (o *Org) AppendEvent(event *es_models.Event) (err error) {
switch event.Type {
case OrgAdded:
*o = Org{}
err = o.setData(event)
if err != nil {
return err