zitadel/cmd/setup/43.go

112 lines
2.8 KiB
Go

package setup
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/owner"
"github.com/zitadel/zitadel/internal/repository/project"
)
var (
//go:embed 43.sql
correctProjectOwnerEvents string
)
type CorrectProjectOwners struct {
eventstore *eventstore.Eventstore
}
func (mig *CorrectProjectOwners) Execute(ctx context.Context, _ eventstore.Event) error {
instances, err := mig.eventstore.InstanceIDs(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).
OrderDesc().
AddQuery().
AggregateTypes("instance").
EventTypes(instance.InstanceAddedEventType).
Builder(),
)
if err != nil {
return err
}
ctx = authz.SetCtxData(ctx, authz.CtxData{UserID: "SETUP"})
for i, instance := range instances {
ctx = authz.WithInstanceID(ctx, instance)
logging.WithFields("instance_id", instance, "migration", mig.String(), "progress", fmt.Sprintf("%d/%d", i+1, len(instances))).Info("correct owners of projects")
didCorrect, err := mig.correctInstanceProjects(ctx, instance)
if err != nil {
return err
}
if !didCorrect {
continue
}
_, err = projection.ProjectGrantProjection.Trigger(ctx)
logging.OnError(err).Debug("failed triggering project grant projection to update owners")
}
return nil
}
func (mig *CorrectProjectOwners) correctInstanceProjects(ctx context.Context, instance string) (didCorrect bool, err error) {
var correctedOwners []eventstore.Command
tx, err := mig.eventstore.Client().BeginTx(ctx, nil)
if err != nil {
return false, err
}
defer func() {
if err != nil {
_ = tx.Rollback()
return
}
err = tx.Commit()
}()
rows, err := tx.QueryContext(ctx, correctProjectOwnerEvents, instance)
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
aggregate := &eventstore.Aggregate{
InstanceID: instance,
Type: project.AggregateType,
Version: project.AggregateVersion,
}
var payload json.RawMessage
err := rows.Scan(
&aggregate.ID,
&aggregate.ResourceOwner,
&payload,
)
if err != nil {
return false, err
}
previousOwners := make(map[uint32]string)
if err := json.Unmarshal(payload, &previousOwners); err != nil {
return false, err
}
correctedOwners = append(correctedOwners, owner.NewCorrected(ctx, aggregate, previousOwners))
}
if rows.Err() != nil {
return false, rows.Err()
}
_, err = mig.eventstore.PushWithClient(ctx, tx, correctedOwners...)
return len(correctedOwners) > 0, err
}
func (*CorrectProjectOwners) String() string {
return "43_correct_project_owners"
}