fix: set correct owner on project grants

This commit is contained in:
adlerhurst 2024-12-19 11:50:27 +01:00
parent b5e92a6144
commit c979f221e8
8 changed files with 263 additions and 1 deletions

111
cmd/setup/42.go Normal file
View File

@ -0,0 +1,111 @@
package setup
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/owner"
"github.com/zitadel/zitadel/internal/repository/project"
)
var (
//go:embed 42.sql
correctProjectOwnerEvents string
)
type CorrectProjectOwners struct {
eventstore *eventstore.Eventstore
}
func (mig *CorrectProjectOwners) Execute(ctx context.Context, _ eventstore.Event) error {
instances, err := mig.eventstore.InstanceIDs(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).
OrderDesc().
AddQuery().
AggregateTypes("instance").
EventTypes(instance.InstanceAddedEventType).
Builder(),
)
if err != nil {
return err
}
ctx = authz.SetCtxData(ctx, authz.CtxData{UserID: "SETUP"})
for i, instance := range instances {
ctx = authz.WithInstanceID(ctx, instance)
logging.WithFields("instance_id", instance, "migration", mig.String(), "progress", fmt.Sprintf("%d/%d", i+1, len(instances))).Info("correct owners of projects")
didCorrect, err := mig.correctInstanceProjects(ctx, instance)
if err != nil {
return err
}
if !didCorrect {
continue
}
_, err = projection.ProjectGrantProjection.Trigger(ctx)
logging.OnError(err).Debug("failed triggering project grant projection to update owners")
}
return nil
}
func (mig *CorrectProjectOwners) correctInstanceProjects(ctx context.Context, instance string) (didCorrect bool, err error) {
var correctedOwners []eventstore.Command
tx, err := mig.eventstore.Client().BeginTx(ctx, nil)
if err != nil {
return false, err
}
defer func() {
if err != nil {
_ = tx.Rollback()
return
}
err = tx.Commit()
}()
rows, err := tx.QueryContext(ctx, correctProjectOwnerEvents, instance)
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
aggregate := &eventstore.Aggregate{
InstanceID: instance,
Type: project.AggregateType,
Version: project.AggregateVersion,
}
var payload json.RawMessage
err := rows.Scan(
&aggregate.ID,
&aggregate.ResourceOwner,
&payload,
)
if err != nil {
return false, err
}
previousOwners := make(map[uint32]string)
if err := json.Unmarshal(payload, &previousOwners); err != nil {
return false, err
}
correctedOwners = append(correctedOwners, owner.NewCorrected(ctx, aggregate, previousOwners))
}
if rows.Err() != nil {
return false, rows.Err()
}
_, err = mig.eventstore.PushWithClient(ctx, tx, correctedOwners...)
return len(correctedOwners) > 0, err
}
func (*CorrectProjectOwners) String() string {
return "42_correct_project_owners4"
}

79
cmd/setup/42.sql Normal file
View File

@ -0,0 +1,79 @@
WITH corrupt_streams AS (
select
e.instance_id
, e.aggregate_type
, e.aggregate_id
, min(e.sequence) as min_sequence
, count(distinct e.owner) as owner_count
from
eventstore.events2 e
where
e.instance_id = $1
and aggregate_type = 'project'
group by
e.instance_id
, e.aggregate_type
, e.aggregate_id
having
count(distinct e.owner) > 1
), correct_owners AS (
select
e.instance_id
, e.aggregate_type
, e.aggregate_id
, e.owner
from
eventstore.events2 e
join
corrupt_streams cs
on
e.instance_id = cs.instance_id
and e.aggregate_type = cs.aggregate_type
and e.aggregate_id = cs.aggregate_id
and e.sequence = cs.min_sequence
), wrong_events AS (
select
e.instance_id
, e.aggregate_type
, e.aggregate_id
, e.sequence
, e.owner wrong_owner
, co.owner correct_owner
from
eventstore.events2 e
join
correct_owners co
on
e.instance_id = co.instance_id
and e.aggregate_type = co.aggregate_type
and e.aggregate_id = co.aggregate_id
and e.owner <> co.owner
), updated_events AS (
UPDATE eventstore.events2 e
SET owner = we.correct_owner
FROM
wrong_events we
WHERE
e.instance_id = we.instance_id
and e.aggregate_type = we.aggregate_type
and e.aggregate_id = we.aggregate_id
and e.sequence = we.sequence
RETURNING
we.aggregate_id
, we.correct_owner
, we.sequence
, we.wrong_owner
)
SELECT
ue.aggregate_id
, ue.correct_owner
, jsonb_object_agg(
ue.sequence
, ue.wrong_owner
) payload
FROM
updated_events ue
GROUP BY
ue.aggregate_id
, ue.correct_owner
;

View File

@ -127,6 +127,7 @@ type Steps struct {
s37Apps7OIDConfigsBackChannelLogoutURI *Apps7OIDConfigsBackChannelLogoutURI
s38BackChannelLogoutNotificationStart *BackChannelLogoutNotificationStart
s40InitPushFunc *InitPushFunc
s42CorrectProjectOwners *CorrectProjectOwners
}
func MustNewSteps(v *viper.Viper) *Steps {

View File

@ -170,6 +170,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: esPusherDBClient}
steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: esPusherDBClient, esClient: eventstoreClient}
steps.s40InitPushFunc = &InitPushFunc{dbClient: esPusherDBClient}
steps.s42CorrectProjectOwners = &CorrectProjectOwners{eventstore: eventstoreClient}
err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections")
@ -223,6 +224,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s35AddPositionToIndexEsWm,
steps.s36FillV2Milestones,
steps.s38BackChannelLogoutNotificationStart,
steps.s42CorrectProjectOwners,
} {
mustExecuteMigration(ctx, eventstoreClient, step, "migration failed")
}

View File

@ -125,7 +125,18 @@ func scanToSequence(rows *sql.Rows, sequences []*latestSequence) error {
return nil
}
sequence.sequence = currentSequence
if sequence.aggregate.ResourceOwner == "" {
if resourceOwner != "" && sequence.aggregate.ResourceOwner != "" && sequence.aggregate.ResourceOwner != resourceOwner {
logging.WithFields(
"current_sequence", sequence.sequence,
"instance_id", sequence.aggregate.InstanceID,
"agg_type", sequence.aggregate.Type,
"agg_id", sequence.aggregate.ID,
"current_owner", resourceOwner,
"provided_owner", sequence.aggregate.ResourceOwner,
).Info("would have set wrong resource owner")
}
// set resource owner from previous events
if resourceOwner != "" {
sequence.aggregate.ResourceOwner = resourceOwner
}

View File

@ -93,6 +93,10 @@ func (p *projectGrantProjection) Reducers() []handler.AggregateReducer {
Event: project.ProjectRemovedType,
Reduce: p.reduceProjectRemoved,
},
{
Event: project.ProjectOwnerCorrected,
Reduce: p.reduceOwnerCorrected,
},
},
},
{
@ -269,3 +273,16 @@ func (p *projectGrantProjection) reduceOwnerRemoved(event eventstore.Event) (*ha
),
), nil
}
func (p *projectGrantProjection) reduceOwnerCorrected(event eventstore.Event) (*handler.Statement, error) {
return handler.NewUpdateStatement(
event,
[]handler.Column{
handler.NewCol(ProjectGrantColumnResourceOwner, event.Aggregate().ResourceOwner),
},
[]handler.Condition{
handler.NewCond(ProjectGrantColumnInstanceID, event.Aggregate().InstanceID),
handler.NewCond(ProjectGrantColumnProjectID, event.Aggregate().ID),
},
), nil
}

View File

@ -0,0 +1,40 @@
package owner
import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
)
const OwnerCorrectedType = ".owner.corrected"
type Corrected struct {
eventstore.BaseEvent `json:"-"`
PreviousOwners map[uint32]string `json:"previousOwners,omitempty"`
}
var _ eventstore.Command = (*Corrected)(nil)
func (e *Corrected) Payload() interface{} {
return e
}
func (e *Corrected) UniqueConstraints() []*eventstore.UniqueConstraint {
return nil
}
func NewCorrected(
ctx context.Context,
aggregate *eventstore.Aggregate,
previousOwners map[uint32]string,
) *Corrected {
return &Corrected{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
aggregate,
eventstore.EventType(aggregate.Type+OwnerCorrectedType),
),
PreviousOwners: previousOwners,
}
}

View File

@ -16,6 +16,7 @@ const (
ProjectDeactivatedType = projectEventTypePrefix + "deactivated"
ProjectReactivatedType = projectEventTypePrefix + "reactivated"
ProjectRemovedType = projectEventTypePrefix + "removed"
ProjectOwnerCorrected = projectEventTypePrefix + "owner.corrected"
ProjectSearchType = "project"
ProjectObjectRevision = uint8(1)