From c8c5cf3c5f825ce9d619cd7914c74117f5023d11 Mon Sep 17 00:00:00 2001 From: Silvan Date: Fri, 28 Apr 2023 13:55:35 +0200 Subject: [PATCH] feat(cli): add `setup cleanup` sub command (#5770) * feat(cli): add `setup cleanup` sub command * chore: logging * chore: logging --- cmd/setup/cleanup.go | 51 +++++++++++++++++++++++++++++++ cmd/setup/setup.go | 2 ++ internal/migration/command.go | 10 +++--- internal/migration/migration.go | 54 +++++++++++++++++++++++++++++---- 4 files changed, 106 insertions(+), 11 deletions(-) create mode 100644 cmd/setup/cleanup.go diff --git a/cmd/setup/cleanup.go b/cmd/setup/cleanup.go new file mode 100644 index 0000000000..7139b67d35 --- /dev/null +++ b/cmd/setup/cleanup.go @@ -0,0 +1,51 @@ +package setup + +import ( + "context" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/migration" +) + +func NewCleanup() *cobra.Command { + return &cobra.Command{ + Use: "cleanup", + Short: "cleans up migration if they got stuck", + Long: `cleans up migration if they got stuck`, + Run: func(cmd *cobra.Command, args []string) { + config := MustNewConfig(viper.GetViper()) + Cleanup(config) + }, + } +} + +func Cleanup(config *Config) { + ctx := context.Background() + + logging.Info("cleanup started") + + dbClient, err := database.Connect(config.Database, false) + logging.OnError(err).Fatal("unable to connect to database") + + es, err := eventstore.Start(&eventstore.Config{Client: dbClient}) + logging.OnError(err).Fatal("unable to start eventstore") + migration.RegisterMappers(es) + + step, err := migration.LatestStep(ctx, es) + logging.OnError(err).Fatal("unable to query latest migration") + + if step.BaseEvent.EventType != migration.StartedType { + logging.Info("there is no stuck migration please run `zitadel setup`") + return + } + + logging.WithFields("name", step.Name).Info("cleanup migration") + + err = migration.CancelStep(ctx, es, step) + logging.OnError(err).Fatal("cleanup migration failed please retry") +} diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index e90ad85f94..918f08dc2d 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -45,6 +45,8 @@ Requirements: }, } + cmd.AddCommand(NewCleanup()) + Flags(cmd) return cmd diff --git a/internal/migration/command.go b/internal/migration/command.go index 9d04e3cbe7..6552ef6ed4 100644 --- a/internal/migration/command.go +++ b/internal/migration/command.go @@ -41,14 +41,14 @@ func setupStartedCmd(migration Migration) eventstore.Command { BaseEvent: *eventstore.NewBaseEventForPush( ctx, eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"), - startedType), + StartedType), migration: migration, Name: migration.String(), } } -func setupDoneCmd(migration Migration, err error) eventstore.Command { - ctx := authz.SetCtxData(service.WithService(context.Background(), "system"), authz.CtxData{UserID: "system", OrgID: "SYSTEM", ResourceOwner: "SYSTEM"}) +func setupDoneCmd(ctx context.Context, migration Migration, err error) eventstore.Command { + ctx = authz.SetCtxData(service.WithService(ctx, "system"), authz.CtxData{UserID: "system", OrgID: "SYSTEM", ResourceOwner: "SYSTEM"}) typ := doneType var lastRun interface{} if repeatable, ok := migration.(RepeatableMigration); ok { @@ -80,7 +80,7 @@ func (s *SetupStep) Data() interface{} { func (s *SetupStep) UniqueConstraints() []*eventstore.EventUniqueConstraint { switch s.Type() { - case startedType: + case StartedType: return []*eventstore.EventUniqueConstraint{ eventstore.NewAddGlobalEventUniqueConstraint("migration_started", s.migration.String(), "Errors.Step.Started.AlreadyExists"), } @@ -97,7 +97,7 @@ func (s *SetupStep) UniqueConstraints() []*eventstore.EventUniqueConstraint { } func RegisterMappers(es *eventstore.Eventstore) { - es.RegisterFilterEventMapper(aggregateType, startedType, SetupMapper) + es.RegisterFilterEventMapper(aggregateType, StartedType, SetupMapper) es.RegisterFilterEventMapper(aggregateType, doneType, SetupMapper) es.RegisterFilterEventMapper(aggregateType, failedType, SetupMapper) es.RegisterFilterEventMapper(aggregateType, repeatableDoneType, SetupMapper) diff --git a/internal/migration/migration.go b/internal/migration/migration.go index 3608332a8a..63a6cb7b7a 100644 --- a/internal/migration/migration.go +++ b/internal/migration/migration.go @@ -12,7 +12,7 @@ import ( ) const ( - startedType = eventstore.EventType("system.migration.started") + StartedType = eventstore.EventType("system.migration.started") doneType = eventstore.EventType("system.migration.done") failedType = eventstore.EventType("system.migration.failed") repeatableDoneType = eventstore.EventType("system.migration.repeatable.done") @@ -36,7 +36,7 @@ type RepeatableMigration interface { } func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) { - logging.Infof("verify migration %s", migration.String()) + logging.WithFields("name", migration.String()).Info("verify migration") if should, err := checkExec(ctx, es, migration); !should || err != nil { return err @@ -46,11 +46,11 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration return err } - logging.Infof("starting migration %s", migration.String()) + logging.WithFields("name", migration.String()).Info("starting migration") err = migration.Execute(ctx) logging.OnError(err).Error("migration failed") - _, pushErr := es.Push(ctx, setupDoneCmd(migration, err)) + _, pushErr := es.Push(ctx, setupDoneCmd(ctx, migration, err)) logging.OnError(pushErr).Error("migration failed") if err != nil { return err @@ -58,6 +58,48 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration return pushErr } +func LatestStep(ctx context.Context, es *eventstore.Eventstore) (*SetupStep, error) { + events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + OrderDesc(). + Limit(1). + AddQuery(). + AggregateTypes(aggregateType). + AggregateIDs(aggregateID). + EventTypes(StartedType, doneType, repeatableDoneType, failedType). + Builder()) + if err != nil { + return nil, err + } + step, ok := events[0].(*SetupStep) + if !ok { + return nil, errors.ThrowInternal(nil, "MIGRA-hppLM", "setup step is malformed") + } + return step, nil +} + +var _ Migration = (*cancelMigration)(nil) + +type cancelMigration struct { + name string +} + +// Execute implements Migration +func (*cancelMigration) Execute(context.Context) error { + return nil +} + +// String implements Migration +func (m *cancelMigration) String() string { + return m.name +} + +var errCancelStep = errors.ThrowError(nil, "MIGRA-zo86K", "migration canceled manually") + +func CancelStep(ctx context.Context, es *eventstore.Eventstore, step *SetupStep) error { + _, err := es.Push(ctx, setupDoneCmd(ctx, &cancelMigration{name: step.Name}, errCancelStep)) + return err +} + // checkExec ensures that only one setup step is done concurrently // if a setup step is already started, it calls shouldExec after some time again func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (bool, error) { @@ -88,7 +130,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat AddQuery(). AggregateTypes(aggregateType). AggregateIDs(aggregateID). - EventTypes(startedType, doneType, repeatableDoneType, failedType). + EventTypes(StartedType, doneType, repeatableDoneType, failedType). Builder()) if err != nil { return false, err @@ -106,7 +148,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat } switch event.Type() { - case startedType, failedType: + case StartedType, failedType: isStarted = !isStarted case doneType, repeatableDoneType: