idp command side done

This commit is contained in:
adlerhurst 2020-11-27 13:29:35 +01:00
parent 9487e8bdeb
commit e48621c1f3
8 changed files with 138 additions and 60 deletions

View File

@ -98,7 +98,6 @@ func (repo *IAMRepository) GetIAMMemberRoles() []string {
func (repo *IAMRepository) IDPConfigByID(ctx context.Context, idpConfigID string) (*iam_model.IDPConfigView, error) {
if repo.IAMV2 != nil {
return repo.IAMV2.IDPConfigByID(ctx, repo.SystemDefaults.IamID, idpConfigID)
}
@ -126,14 +125,23 @@ func (repo *IAMRepository) ChangeIDPConfig(ctx context.Context, idp *iam_model.I
}
func (repo *IAMRepository) DeactivateIDPConfig(ctx context.Context, idpConfigID string) (*iam_model.IDPConfig, error) {
if repo.IAMV2 != nil {
return repo.IAMV2.DeactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID)
}
return repo.IAMEventstore.DeactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID)
}
func (repo *IAMRepository) ReactivateIDPConfig(ctx context.Context, idpConfigID string) (*iam_model.IDPConfig, error) {
if repo.IAMV2 != nil {
return repo.IAMV2.ReactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID)
}
return repo.IAMEventstore.ReactivateIDPConfig(ctx, repo.SystemDefaults.IamID, idpConfigID)
}
func (repo *IAMRepository) RemoveIDPConfig(ctx context.Context, idpConfigID string) error {
// if repo.IAMV2 != nil {
// return repo.IAMV2.
// }
aggregates := make([]*es_models.Aggregate, 0)
idp := iam_model.NewIDPConfig(repo.SystemDefaults.IamID, idpConfigID)
_, agg, err := repo.IAMEventstore.PrepareRemoveIDPConfig(ctx, idp)

View File

@ -0,0 +1,18 @@
package eventstore
import "context"
func PushAggregate(ctx context.Context, es *Eventstore, writeModel queryReducer, aggregate *Aggregate) error {
err := es.FilterToQueryReducer(ctx, writeModel)
if err != nil {
return err
}
events, err := es.PushAggregates(ctx, aggregate)
if err != nil {
return err
}
writeModel.AppendEvents(events...)
return writeModel.Reduce()
}

View File

@ -1,5 +1,25 @@
package eventstore
type aggregater interface {
//ID returns the aggreagte id
ID() string
//Type returns the aggregate type
Type() AggregateType
//Events returns the events which will be pushed
Events() []EventPusher
//ResourceOwner returns the organisation id which manages this aggregate
// resource owner is only on the inital push needed
// afterwards the resource owner of the previous event is taken
ResourceOwner() string
//Version represents the semantic version of the aggregate
Version() Version
//PreviouseSequence should return the sequence of the latest event of this aggregate
// stored in the eventstore
// it's set to the first event of this push transaction,
// later events consume the sequence of the previously pushed event of the aggregate
PreviousSequence() uint64
}
func NewAggregate(
id string,
typ AggregateType,

View File

@ -36,24 +36,20 @@ func (es *Eventstore) Health(ctx context.Context) error {
return es.repo.Health(ctx)
}
type aggregater interface {
//ID returns the aggreagte id
ID() string
//Type returns the aggregate type
Type() AggregateType
//Events returns the events which will be pushed
Events() []EventPusher
//ResourceOwner returns the organisation id which manages this aggregate
// resource owner is only on the inital push needed
// afterwards the resource owner of the previous event is taken
ResourceOwner() string
//Version represents the semantic version of the aggregate
Version() Version
//PreviouseSequence should return the sequence of the latest event of this aggregate
// stored in the eventstore
// it's set to the first event of this push transaction,
// later events consume the sequence of the previously pushed event of the aggregate
PreviousSequence() uint64
//PushAggregate pushes the aggregate and reduces the new events on the aggregate
func (es *Eventstore) PushAggregate(ctx context.Context, writeModel queryReducer, aggregate aggregater) error {
err := es.FilterToQueryReducer(ctx, writeModel)
if err != nil {
return err
}
events, err := es.PushAggregates(ctx, aggregate)
if err != nil {
return err
}
writeModel.AppendEvents(events...)
return writeModel.Reduce()
}
//PushAggregates maps the events of all aggregates to an eventstore event

View File

@ -109,13 +109,12 @@ func readModelToObjectRoot(readModel eventstore.ReadModel) models.ObjectRoot {
}
}
func writeModelToObjectRoot(readModel eventstore.WriteModel) models.ObjectRoot {
func writeModelToObjectRoot(writeModel eventstore.WriteModel) models.ObjectRoot {
return models.ObjectRoot{
AggregateID: readModel.AggregateID,
// ChangeDate: readModel.ChangeDate,
// CreationDate: readModel.CreationDate,
ResourceOwner: readModel.ResourceOwner,
Sequence: readModel.ProcessedSequence,
AggregateID: writeModel.AggregateID,
ChangeDate: writeModel.ChangeDate,
ResourceOwner: writeModel.ResourceOwner,
Sequence: writeModel.ProcessedSequence,
}
}

View File

@ -30,20 +30,14 @@ func (r *Repository) AddIDPConfig(ctx context.Context, config *iam_model.IDPConf
if err != nil {
return nil, err
}
wm := iam.NewIDPConfigWriteModel(config.AggregateID, idpConfigID)
err = r.eventstore.FilterToQueryReducer(ctx, wm)
if err != nil {
return nil, err
}
clientSecret, err := crypto.Crypt([]byte(config.OIDCConfig.ClientSecretString), r.secretCrypto)
if err != nil {
return nil, err
}
aggregate := iam.AggregateFromWriteModel(&wm.WriteModel).
PushIDPConfigAdded(ctx, idpConfigID, config.Name, idp.ConfigType(config.Type), idp.StylingType(config.StylingType)).
PushIDPOIDCConfigAdded(
writeModel, err := r.pushIDPWriteModel(ctx, config.AggregateID, idpConfigID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate {
return a.PushIDPOIDCConfigAdded(
ctx,
config.OIDCConfig.ClientID,
idpConfigID,
@ -52,43 +46,77 @@ func (r *Repository) AddIDPConfig(ctx context.Context, config *iam_model.IDPConf
oidc.MappingField(config.OIDCConfig.IDPDisplayNameMapping),
oidc.MappingField(config.OIDCConfig.UsernameMapping),
config.OIDCConfig.Scopes...)
events, err := r.eventstore.PushAggregates(ctx, aggregate)
})
if err != nil {
return nil, err
}
if err = wm.AppendAndReduce(events...); err != nil {
return nil, err
}
return writeModelToIDPConfig(wm), nil
return writeModelToIDPConfig(writeModel), nil
}
func (r *Repository) ChangeIDPConfig(ctx context.Context, config *iam_model.IDPConfig) (*iam_model.IDPConfig, error) {
writeModel := iam.NewIDPConfigWriteModel(config.AggregateID, config.IDPConfigID)
err := r.eventstore.FilterToQueryReducer(ctx, writeModel)
if err != nil {
return nil, err
}
aggregate := iam.AggregateFromWriteModel(&writeModel.WriteModel).
PushIDPConfigChanged(
writeModel, err := r.pushIDPWriteModel(ctx, config.AggregateID, config.IDPConfigID, func(a *iam.Aggregate, writeModel *iam.IDPConfigWriteModel) *iam.Aggregate {
return a.PushIDPConfigChanged(
ctx,
writeModel,
config.IDPConfigID,
config.Name,
idp.ConfigType(config.Type),
idp.StylingType(config.StylingType))
events, err := r.eventstore.PushAggregates(ctx, aggregate)
})
if err != nil {
return nil, err
}
if err = writeModel.AppendAndReduce(events...); err != nil {
return nil, err
}
return writeModelToIDPConfig(writeModel), nil
}
func (r *Repository) DeactivateIDPConfig(ctx context.Context, iamID, idpID string) (*iam_model.IDPConfig, error) {
writeModel, err := r.pushIDPWriteModel(ctx, iamID, idpID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate {
return a.PushIDPConfigDeactivated(ctx, idpID)
})
if err != nil {
return nil, err
}
return writeModelToIDPConfig(writeModel), nil
}
func (r *Repository) ReactivateIDPConfig(ctx context.Context, iamID, idpID string) (*iam_model.IDPConfig, error) {
writeModel, err := r.pushIDPWriteModel(ctx, iamID, idpID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate {
return a.PushIDPConfigReactivated(ctx, idpID)
})
if err != nil {
return nil, err
}
return writeModelToIDPConfig(writeModel), nil
}
func (r *Repository) RemoveIDPConfig(ctx context.Context, iamID, idpID string) (*iam_model.IDPConfig, error) {
writeModel, err := r.pushIDPWriteModel(ctx, iamID, idpID, func(a *iam.Aggregate, _ *iam.IDPConfigWriteModel) *iam.Aggregate {
return a.PushIDPConfigRemoved(ctx, idpID)
})
if err != nil {
return nil, err
}
return writeModelToIDPConfig(writeModel), nil
}
func (r *Repository) pushIDPWriteModel(ctx context.Context, iamID, idpID string, eventSetter func(*iam.Aggregate, *iam.IDPConfigWriteModel) *iam.Aggregate) (*iam.IDPConfigWriteModel, error) {
writeModel := iam.NewIDPConfigWriteModel(iamID, idpID)
err := r.eventstore.FilterToQueryReducer(ctx, writeModel)
if err != nil {
return nil, err
}
aggregate := eventSetter(iam.AggregateFromWriteModel(&writeModel.WriteModel), writeModel)
err = r.eventstore.PushAggregate(ctx, writeModel, aggregate)
if err != nil {
return nil, err
}
return writeModel, nil
}

View File

@ -26,7 +26,7 @@ func (r *Repository) ChangeIDPOIDCConfig(ctx context.Context, config *iam_model.
}
}
aggregate := iam.AggregateFromWriteModel(&writeModel.ConfigWriteModel.WriteModel).
aggregate := iam.AggregateFromWriteModel(&writeModel.WriteModel).
PushIDPOIDCConfigChanged(
ctx,
writeModel,

View File

@ -15,6 +15,7 @@ const (
)
type IDPOIDCConfigWriteModel struct {
eventstore.WriteModel
oidc.ConfigWriteModel
iamID string
@ -28,12 +29,8 @@ func NewIDPOIDCConfigWriteModel(iamID, idpConfigID string) *IDPOIDCConfigWriteMo
}
}
func (wm *IDPOIDCConfigWriteModel) Query() *eventstore.SearchQueryFactory {
return eventstore.NewSearchQueryFactory(eventstore.ColumnsEvent, AggregateType).
AggregateIDs(wm.iamID)
}
func (wm *IDPOIDCConfigWriteModel) AppendEvents(events ...eventstore.EventReader) {
wm.WriteModel.AppendEvents(events...)
for _, event := range events {
switch e := event.(type) {
case *IDPOIDCConfigAddedEvent:
@ -52,6 +49,18 @@ func (wm *IDPOIDCConfigWriteModel) AppendEvents(events ...eventstore.EventReader
}
}
func (wm *IDPOIDCConfigWriteModel) Reduce() error {
if err := wm.ConfigWriteModel.Reduce(); err != nil {
return err
}
return wm.WriteModel.Reduce()
}
func (wm *IDPOIDCConfigWriteModel) Query() *eventstore.SearchQueryFactory {
return eventstore.NewSearchQueryFactory(eventstore.ColumnsEvent, AggregateType).
AggregateIDs(wm.iamID)
}
type IDPOIDCConfigAddedEvent struct {
oidc.ConfigAddedEvent
}