From e48621c1f33dd1b991738fa579a217b29ac47cef Mon Sep 17 00:00:00 2001 From: adlerhurst Date: Fri, 27 Nov 2020 13:29:35 +0100 Subject: [PATCH] idp command side done --- .../eventsourcing/eventstore/iam.go | 10 ++- internal/eventstore/v2/abstraction.go | 18 ++++ internal/eventstore/v2/aggregate.go | 20 +++++ internal/eventstore/v2/eventstore.go | 32 +++---- internal/v2/business/iam/converter.go | 11 ++- internal/v2/business/iam/idp_config.go | 86 ++++++++++++------- internal/v2/business/iam/idp_oidc_config.go | 2 +- internal/v2/repository/iam/idp_oidc_config.go | 19 ++-- 8 files changed, 138 insertions(+), 60 deletions(-) create mode 100644 internal/eventstore/v2/abstraction.go diff --git a/internal/admin/repository/eventsourcing/eventstore/iam.go b/internal/admin/repository/eventsourcing/eventstore/iam.go index d48e53d9f5..252451406c 100644 --- a/internal/admin/repository/eventsourcing/eventstore/iam.go +++ b/internal/admin/repository/eventsourcing/eventstore/iam.go @@ -98,7 +98,6 @@ func (repo *IAMRepository) GetIAMMemberRoles() []string { func (repo *IAMRepository) IDPConfigByID(ctx context.Context, idpConfigID string) (*iam_model.IDPConfigView, error) { if repo.IAMV2 != nil { - return repo.IAMV2.IDPConfigByID(ctx, repo.SystemDefaults.IamID, idpConfigID) } @@ -126,14 +125,23 @@ func (repo *IAMRepository) ChangeIDPConfig(ctx context.Context, idp *iam_model.I } func (repo *IAMRepository) DeactivateIDPConfig(ctx context.Context, idpConfigID string) (*iam_model.IDPConfig, error) { + if repo.IAMV2 != nil { + return repo.IAMV2.DeactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID) + } return repo.IAMEventstore.DeactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID) } func (repo *IAMRepository) ReactivateIDPConfig(ctx context.Context, idpConfigID string) (*iam_model.IDPConfig, error) { + if repo.IAMV2 != nil { + return repo.IAMV2.ReactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID) + } return repo.IAMEventstore.ReactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID) } func (repo *IAMRepository) RemoveIDPConfig(ctx context.Context, idpConfigID string) error { + // if repo.IAMV2 != nil { + // return repo.IAMV2. + // } aggregates := make([]*es_models.Aggregate, 0) idp := iam_model.NewIDPConfig(repo.SystemDefaults.IamID, idpConfigID) _, agg, err := repo.IAMEventstore.PrepareRemoveIDPConfig(ctx, idp) diff --git a/internal/eventstore/v2/abstraction.go b/internal/eventstore/v2/abstraction.go new file mode 100644 index 0000000000..0716490f57 --- /dev/null +++ b/internal/eventstore/v2/abstraction.go @@ -0,0 +1,18 @@ +package eventstore + +import "context" + +func PushAggregate(ctx context.Context, es *Eventstore, writeModel queryReducer, aggregate *Aggregate) error { + err := es.FilterToQueryReducer(ctx, writeModel) + if err != nil { + return err + } + + events, err := es.PushAggregates(ctx, aggregate) + if err != nil { + return err + } + + writeModel.AppendEvents(events...) + return writeModel.Reduce() +} diff --git a/internal/eventstore/v2/aggregate.go b/internal/eventstore/v2/aggregate.go index ab7c70d205..914387488f 100644 --- a/internal/eventstore/v2/aggregate.go +++ b/internal/eventstore/v2/aggregate.go @@ -1,5 +1,25 @@ package eventstore +type aggregater interface { + //ID returns the aggreagte id + ID() string + //Type returns the aggregate type + Type() AggregateType + //Events returns the events which will be pushed + Events() []EventPusher + //ResourceOwner returns the organisation id which manages this aggregate + // resource owner is only on the inital push needed + // afterwards the resource owner of the previous event is taken + ResourceOwner() string + //Version represents the semantic version of the aggregate + Version() Version + //PreviouseSequence should return the sequence of the latest event of this aggregate + // stored in the eventstore + // it's set to the first event of this push transaction, + // later events consume the sequence of the previously pushed event of the aggregate + PreviousSequence() uint64 +} + func NewAggregate( id string, typ AggregateType, diff --git a/internal/eventstore/v2/eventstore.go b/internal/eventstore/v2/eventstore.go index 0951fbb60b..b28382235e 100644 --- a/internal/eventstore/v2/eventstore.go +++ b/internal/eventstore/v2/eventstore.go @@ -36,24 +36,20 @@ func (es *Eventstore) Health(ctx context.Context) error { return es.repo.Health(ctx) } -type aggregater interface { - //ID returns the aggreagte id - ID() string - //Type returns the aggregate type - Type() AggregateType - //Events returns the events which will be pushed - Events() []EventPusher - //ResourceOwner returns the organisation id which manages this aggregate - // resource owner is only on the inital push needed - // afterwards the resource owner of the previous event is taken - ResourceOwner() string - //Version represents the semantic version of the aggregate - Version() Version - //PreviouseSequence should return the sequence of the latest event of this aggregate - // stored in the eventstore - // it's set to the first event of this push transaction, - // later events consume the sequence of the previously pushed event of the aggregate - PreviousSequence() uint64 +//PushAggregate pushes the aggregate and reduces the new events on the aggregate +func (es *Eventstore) PushAggregate(ctx context.Context, writeModel queryReducer, aggregate aggregater) error { + err := es.FilterToQueryReducer(ctx, writeModel) + if err != nil { + return err + } + + events, err := es.PushAggregates(ctx, aggregate) + if err != nil { + return err + } + + writeModel.AppendEvents(events...) + return writeModel.Reduce() } //PushAggregates maps the events of all aggregates to an eventstore event diff --git a/internal/v2/business/iam/converter.go b/internal/v2/business/iam/converter.go index b71e5311f5..545051a151 100644 --- a/internal/v2/business/iam/converter.go +++ b/internal/v2/business/iam/converter.go @@ -109,13 +109,12 @@ func readModelToObjectRoot(readModel eventstore.ReadModel) models.ObjectRoot { } } -func writeModelToObjectRoot(readModel eventstore.WriteModel) models.ObjectRoot { +func writeModelToObjectRoot(writeModel eventstore.WriteModel) models.ObjectRoot { return models.ObjectRoot{ - AggregateID: readModel.AggregateID, - // ChangeDate: readModel.ChangeDate, - // CreationDate: readModel.CreationDate, - ResourceOwner: readModel.ResourceOwner, - Sequence: readModel.ProcessedSequence, + AggregateID: writeModel.AggregateID, + ChangeDate: writeModel.ChangeDate, + ResourceOwner: writeModel.ResourceOwner, + Sequence: writeModel.ProcessedSequence, } } diff --git a/internal/v2/business/iam/idp_config.go b/internal/v2/business/iam/idp_config.go index eb6c7fdef8..42a5f6de1b 100644 --- a/internal/v2/business/iam/idp_config.go +++ b/internal/v2/business/iam/idp_config.go @@ -30,20 +30,14 @@ func (r *Repository) AddIDPConfig(ctx context.Context, config *iam_model.IDPConf if err != nil { return nil, err } - wm := iam.NewIDPConfigWriteModel(config.AggregateID, idpConfigID) - err = r.eventstore.FilterToQueryReducer(ctx, wm) - if err != nil { - return nil, err - } clientSecret, err := crypto.Crypt([]byte(config.OIDCConfig.ClientSecretString), r.secretCrypto) if err != nil { return nil, err } - aggregate := iam.AggregateFromWriteModel(&wm.WriteModel). - PushIDPConfigAdded(ctx, idpConfigID, config.Name, idp.ConfigType(config.Type), idp.StylingType(config.StylingType)). - PushIDPOIDCConfigAdded( + writeModel, err := r.pushIDPWriteModel(ctx, config.AggregateID, idpConfigID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate { + return a.PushIDPOIDCConfigAdded( ctx, config.OIDCConfig.ClientID, idpConfigID, @@ -52,43 +46,77 @@ func (r *Repository) AddIDPConfig(ctx context.Context, config *iam_model.IDPConf oidc.MappingField(config.OIDCConfig.IDPDisplayNameMapping), oidc.MappingField(config.OIDCConfig.UsernameMapping), config.OIDCConfig.Scopes...) - - events, err := r.eventstore.PushAggregates(ctx, aggregate) + }) if err != nil { return nil, err } - if err = wm.AppendAndReduce(events...); err != nil { - return nil, err - } - - return writeModelToIDPConfig(wm), nil + return writeModelToIDPConfig(writeModel), nil } func (r *Repository) ChangeIDPConfig(ctx context.Context, config *iam_model.IDPConfig) (*iam_model.IDPConfig, error) { - writeModel := iam.NewIDPConfigWriteModel(config.AggregateID, config.IDPConfigID) - err := r.eventstore.FilterToQueryReducer(ctx, writeModel) - if err != nil { - return nil, err - } - - aggregate := iam.AggregateFromWriteModel(&writeModel.WriteModel). - PushIDPConfigChanged( + writeModel, err := r.pushIDPWriteModel(ctx, config.AggregateID, config.IDPConfigID, func(a *iam.Aggregate, writeModel *iam.IDPConfigWriteModel) *iam.Aggregate { + return a.PushIDPConfigChanged( ctx, writeModel, config.IDPConfigID, config.Name, idp.ConfigType(config.Type), idp.StylingType(config.StylingType)) - - events, err := r.eventstore.PushAggregates(ctx, aggregate) + }) if err != nil { return nil, err } - if err = writeModel.AppendAndReduce(events...); err != nil { - return nil, err - } - return writeModelToIDPConfig(writeModel), nil } + +func (r *Repository) DeactivateIDPConfig(ctx context.Context, iamID, idpID string) (*iam_model.IDPConfig, error) { + writeModel, err := r.pushIDPWriteModel(ctx, iamID, idpID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate { + return a.PushIDPConfigDeactivated(ctx, idpID) + }) + if err != nil { + return nil, err + } + + return writeModelToIDPConfig(writeModel), nil +} + +func (r *Repository) ReactivateIDPConfig(ctx context.Context, iamID, idpID string) (*iam_model.IDPConfig, error) { + writeModel, err := r.pushIDPWriteModel(ctx, iamID, idpID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate { + return a.PushIDPConfigReactivated(ctx, idpID) + }) + if err != nil { + return nil, err + } + + return writeModelToIDPConfig(writeModel), nil +} + +func (r *Repository) RemoveIDPConfig(ctx context.Context, iamID, idpID string) (*iam_model.IDPConfig, error) { + writeModel, err := r.pushIDPWriteModel(ctx, iamID, idpID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate { + return a.PushIDPConfigRemoved(ctx, idpID) + }) + if err != nil { + return nil, err + } + + return writeModelToIDPConfig(writeModel), nil +} + +func (r *Repository) pushIDPWriteModel(ctx context.Context, iamID, idpID string, eventSetter func(*iam.Aggregate, *iam.IDPConfigWriteModel) *iam.Aggregate) (*iam.IDPConfigWriteModel, error) { + writeModel := iam.NewIDPConfigWriteModel(iamID, idpID) + err := r.eventstore.FilterToQueryReducer(ctx, writeModel) + if err != nil { + return nil, err + } + + aggregate := eventSetter(iam.AggregateFromWriteModel(&writeModel.WriteModel), writeModel) + + err = r.eventstore.PushAggregate(ctx, writeModel, aggregate) + if err != nil { + return nil, err + } + + return writeModel, nil +} diff --git a/internal/v2/business/iam/idp_oidc_config.go b/internal/v2/business/iam/idp_oidc_config.go index 0a2f983eb8..2e4db7be13 100644 --- a/internal/v2/business/iam/idp_oidc_config.go +++ b/internal/v2/business/iam/idp_oidc_config.go @@ -26,7 +26,7 @@ func (r *Repository) ChangeIDPOIDCConfig(ctx context.Context, config *iam_model. } } - aggregate := iam.AggregateFromWriteModel(&writeModel.ConfigWriteModel.WriteModel). + aggregate := iam.AggregateFromWriteModel(&writeModel.WriteModel). PushIDPOIDCConfigChanged( ctx, writeModel, diff --git a/internal/v2/repository/iam/idp_oidc_config.go b/internal/v2/repository/iam/idp_oidc_config.go index 659af88c92..9e76c057da 100644 --- a/internal/v2/repository/iam/idp_oidc_config.go +++ b/internal/v2/repository/iam/idp_oidc_config.go @@ -15,6 +15,7 @@ const ( ) type IDPOIDCConfigWriteModel struct { + eventstore.WriteModel oidc.ConfigWriteModel iamID string @@ -28,12 +29,8 @@ func NewIDPOIDCConfigWriteModel(iamID, idpConfigID string) *IDPOIDCConfigWriteMo } } -func (wm *IDPOIDCConfigWriteModel) Query() *eventstore.SearchQueryFactory { - return eventstore.NewSearchQueryFactory(eventstore.ColumnsEvent, AggregateType). - AggregateIDs(wm.iamID) -} - func (wm *IDPOIDCConfigWriteModel) AppendEvents(events ...eventstore.EventReader) { + wm.WriteModel.AppendEvents(events...) for _, event := range events { switch e := event.(type) { case *IDPOIDCConfigAddedEvent: @@ -52,6 +49,18 @@ func (wm *IDPOIDCConfigWriteModel) AppendEvents(events ...eventstore.EventReader } } +func (wm *IDPOIDCConfigWriteModel) Reduce() error { + if err := wm.ConfigWriteModel.Reduce(); err != nil { + return err + } + return wm.WriteModel.Reduce() +} + +func (wm *IDPOIDCConfigWriteModel) Query() *eventstore.SearchQueryFactory { + return eventstore.NewSearchQueryFactory(eventstore.ColumnsEvent, AggregateType). + AggregateIDs(wm.iamID) +} + type IDPOIDCConfigAddedEvent struct { oidc.ConfigAddedEvent }