diff --git a/cmd/setup/42.go b/cmd/setup/42.go new file mode 100644 index 0000000000..66f15d8c73 --- /dev/null +++ b/cmd/setup/42.go @@ -0,0 +1,111 @@ +package setup + +import ( + "context" + _ "embed" + "encoding/json" + "fmt" + + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/repository/owner" + "github.com/zitadel/zitadel/internal/repository/project" +) + +var ( + //go:embed 42.sql + correctProjectOwnerEvents string +) + +type CorrectProjectOwners struct { + eventstore *eventstore.Eventstore +} + +func (mig *CorrectProjectOwners) Execute(ctx context.Context, _ eventstore.Event) error { + instances, err := mig.eventstore.InstanceIDs( + ctx, + eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs). + OrderDesc(). + AddQuery(). + AggregateTypes("instance"). + EventTypes(instance.InstanceAddedEventType). + Builder(), + ) + if err != nil { + return err + } + + ctx = authz.SetCtxData(ctx, authz.CtxData{UserID: "SETUP"}) + for i, instance := range instances { + ctx = authz.WithInstanceID(ctx, instance) + logging.WithFields("instance_id", instance, "migration", mig.String(), "progress", fmt.Sprintf("%d/%d", i+1, len(instances))).Info("correct owners of projects") + didCorrect, err := mig.correctInstanceProjects(ctx, instance) + if err != nil { + return err + } + if !didCorrect { + continue + } + _, err = projection.ProjectGrantProjection.Trigger(ctx) + logging.OnError(err).Debug("failed triggering project grant projection to update owners") + } + return nil +} + +func (mig *CorrectProjectOwners) correctInstanceProjects(ctx context.Context, instance string) (didCorrect bool, err error) { + var correctedOwners []eventstore.Command + + tx, err := mig.eventstore.Client().BeginTx(ctx, nil) + if err != nil { + return false, err + } + defer func() { + if err != nil { + _ = tx.Rollback() + return + } + err = tx.Commit() + }() + + rows, err := tx.QueryContext(ctx, correctProjectOwnerEvents, instance) + if err != nil { + return false, err + } + defer rows.Close() + + for rows.Next() { + aggregate := &eventstore.Aggregate{ + InstanceID: instance, + Type: project.AggregateType, + Version: project.AggregateVersion, + } + var payload json.RawMessage + err := rows.Scan( + &aggregate.ID, + &aggregate.ResourceOwner, + &payload, + ) + if err != nil { + return false, err + } + previousOwners := make(map[uint32]string) + if err := json.Unmarshal(payload, &previousOwners); err != nil { + return false, err + } + correctedOwners = append(correctedOwners, owner.NewCorrected(ctx, aggregate, previousOwners)) + } + if rows.Err() != nil { + return false, rows.Err() + } + + _, err = mig.eventstore.PushWithClient(ctx, tx, correctedOwners...) + return len(correctedOwners) > 0, err +} + +func (*CorrectProjectOwners) String() string { + return "42_correct_project_owners4" +} diff --git a/cmd/setup/42.sql b/cmd/setup/42.sql new file mode 100644 index 0000000000..769feaed2f --- /dev/null +++ b/cmd/setup/42.sql @@ -0,0 +1,79 @@ +WITH corrupt_streams AS ( + select + e.instance_id + , e.aggregate_type + , e.aggregate_id + , min(e.sequence) as min_sequence + , count(distinct e.owner) as owner_count + from + eventstore.events2 e + where + e.instance_id = $1 + and aggregate_type = 'project' + group by + e.instance_id + , e.aggregate_type + , e.aggregate_id + having + count(distinct e.owner) > 1 +), correct_owners AS ( + select + e.instance_id + , e.aggregate_type + , e.aggregate_id + , e.owner + from + eventstore.events2 e + join + corrupt_streams cs + on + e.instance_id = cs.instance_id + and e.aggregate_type = cs.aggregate_type + and e.aggregate_id = cs.aggregate_id + and e.sequence = cs.min_sequence +), wrong_events AS ( + select + e.instance_id + , e.aggregate_type + , e.aggregate_id + , e.sequence + , e.owner wrong_owner + , co.owner correct_owner + from + eventstore.events2 e + join + correct_owners co + on + e.instance_id = co.instance_id + and e.aggregate_type = co.aggregate_type + and e.aggregate_id = co.aggregate_id + and e.owner <> co.owner +), updated_events AS ( + UPDATE eventstore.events2 e + SET owner = we.correct_owner + FROM + wrong_events we + WHERE + e.instance_id = we.instance_id + and e.aggregate_type = we.aggregate_type + and e.aggregate_id = we.aggregate_id + and e.sequence = we.sequence + RETURNING + we.aggregate_id + , we.correct_owner + , we.sequence + , we.wrong_owner +) +SELECT + ue.aggregate_id + , ue.correct_owner + , jsonb_object_agg( + ue.sequence + , ue.wrong_owner + ) payload +FROM + updated_events ue +GROUP BY + ue.aggregate_id + , ue.correct_owner +; diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 966b5777a7..f7f34f8703 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -127,6 +127,7 @@ type Steps struct { s37Apps7OIDConfigsBackChannelLogoutURI *Apps7OIDConfigsBackChannelLogoutURI s38BackChannelLogoutNotificationStart *BackChannelLogoutNotificationStart s40InitPushFunc *InitPushFunc + s42CorrectProjectOwners *CorrectProjectOwners } func MustNewSteps(v *viper.Viper) *Steps { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 7c789c399a..b456665cb8 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -170,6 +170,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: esPusherDBClient} steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: esPusherDBClient, esClient: eventstoreClient} steps.s40InitPushFunc = &InitPushFunc{dbClient: esPusherDBClient} + steps.s42CorrectProjectOwners = &CorrectProjectOwners{eventstore: eventstoreClient} err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -223,6 +224,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s35AddPositionToIndexEsWm, steps.s36FillV2Milestones, steps.s38BackChannelLogoutNotificationStart, + steps.s42CorrectProjectOwners, } { mustExecuteMigration(ctx, eventstoreClient, step, "migration failed") } diff --git a/internal/eventstore/v3/sequence.go b/internal/eventstore/v3/sequence.go index 7d97e1080d..1976af4093 100644 --- a/internal/eventstore/v3/sequence.go +++ b/internal/eventstore/v3/sequence.go @@ -125,7 +125,18 @@ func scanToSequence(rows *sql.Rows, sequences []*latestSequence) error { return nil } sequence.sequence = currentSequence - if sequence.aggregate.ResourceOwner == "" { + if resourceOwner != "" && sequence.aggregate.ResourceOwner != "" && sequence.aggregate.ResourceOwner != resourceOwner { + logging.WithFields( + "current_sequence", sequence.sequence, + "instance_id", sequence.aggregate.InstanceID, + "agg_type", sequence.aggregate.Type, + "agg_id", sequence.aggregate.ID, + "current_owner", resourceOwner, + "provided_owner", sequence.aggregate.ResourceOwner, + ).Info("would have set wrong resource owner") + } + // set resource owner from previous events + if resourceOwner != "" { sequence.aggregate.ResourceOwner = resourceOwner } diff --git a/internal/query/projection/project_grant.go b/internal/query/projection/project_grant.go index d6fbde8556..621623e061 100644 --- a/internal/query/projection/project_grant.go +++ b/internal/query/projection/project_grant.go @@ -93,6 +93,10 @@ func (p *projectGrantProjection) Reducers() []handler.AggregateReducer { Event: project.ProjectRemovedType, Reduce: p.reduceProjectRemoved, }, + { + Event: project.ProjectOwnerCorrected, + Reduce: p.reduceOwnerCorrected, + }, }, }, { @@ -269,3 +273,16 @@ func (p *projectGrantProjection) reduceOwnerRemoved(event eventstore.Event) (*ha ), ), nil } + +func (p *projectGrantProjection) reduceOwnerCorrected(event eventstore.Event) (*handler.Statement, error) { + return handler.NewUpdateStatement( + event, + []handler.Column{ + handler.NewCol(ProjectGrantColumnResourceOwner, event.Aggregate().ResourceOwner), + }, + []handler.Condition{ + handler.NewCond(ProjectGrantColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(ProjectGrantColumnProjectID, event.Aggregate().ID), + }, + ), nil +} diff --git a/internal/repository/owner/owner_corrected.go b/internal/repository/owner/owner_corrected.go new file mode 100644 index 0000000000..29bb4842d4 --- /dev/null +++ b/internal/repository/owner/owner_corrected.go @@ -0,0 +1,40 @@ +package owner + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +const OwnerCorrectedType = ".owner.corrected" + +type Corrected struct { + eventstore.BaseEvent `json:"-"` + + PreviousOwners map[uint32]string `json:"previousOwners,omitempty"` +} + +var _ eventstore.Command = (*Corrected)(nil) + +func (e *Corrected) Payload() interface{} { + return e +} + +func (e *Corrected) UniqueConstraints() []*eventstore.UniqueConstraint { + return nil +} + +func NewCorrected( + ctx context.Context, + aggregate *eventstore.Aggregate, + previousOwners map[uint32]string, +) *Corrected { + return &Corrected{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + aggregate, + eventstore.EventType(aggregate.Type+OwnerCorrectedType), + ), + PreviousOwners: previousOwners, + } +} diff --git a/internal/repository/project/project.go b/internal/repository/project/project.go index 6147a632eb..44f882b3e1 100644 --- a/internal/repository/project/project.go +++ b/internal/repository/project/project.go @@ -16,6 +16,7 @@ const ( ProjectDeactivatedType = projectEventTypePrefix + "deactivated" ProjectReactivatedType = projectEventTypePrefix + "reactivated" ProjectRemovedType = projectEventTypePrefix + "removed" + ProjectOwnerCorrected = projectEventTypePrefix + "owner.corrected" ProjectSearchType = "project" ProjectObjectRevision = uint8(1)