feat(cli): add setup cleanup sub command (#5770)

* feat(cli): add `setup cleanup` sub command

* chore: logging

* chore: logging
This commit is contained in:
Silvan
2023-04-28 13:55:35 +02:00
committed by GitHub
parent 86f4477ae1
commit c8c5cf3c5f
4 changed files with 106 additions and 11 deletions

51
cmd/setup/cleanup.go Normal file
View File

@@ -0,0 +1,51 @@
package setup
import (
"context"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/migration"
)
func NewCleanup() *cobra.Command {
return &cobra.Command{
Use: "cleanup",
Short: "cleans up migration if they got stuck",
Long: `cleans up migration if they got stuck`,
Run: func(cmd *cobra.Command, args []string) {
config := MustNewConfig(viper.GetViper())
Cleanup(config)
},
}
}
func Cleanup(config *Config) {
ctx := context.Background()
logging.Info("cleanup started")
dbClient, err := database.Connect(config.Database, false)
logging.OnError(err).Fatal("unable to connect to database")
es, err := eventstore.Start(&eventstore.Config{Client: dbClient})
logging.OnError(err).Fatal("unable to start eventstore")
migration.RegisterMappers(es)
step, err := migration.LatestStep(ctx, es)
logging.OnError(err).Fatal("unable to query latest migration")
if step.BaseEvent.EventType != migration.StartedType {
logging.Info("there is no stuck migration please run `zitadel setup`")
return
}
logging.WithFields("name", step.Name).Info("cleanup migration")
err = migration.CancelStep(ctx, es, step)
logging.OnError(err).Fatal("cleanup migration failed please retry")
}

View File

@@ -45,6 +45,8 @@ Requirements:
}, },
} }
cmd.AddCommand(NewCleanup())
Flags(cmd) Flags(cmd)
return cmd return cmd

View File

@@ -41,14 +41,14 @@ func setupStartedCmd(migration Migration) eventstore.Command {
BaseEvent: *eventstore.NewBaseEventForPush( BaseEvent: *eventstore.NewBaseEventForPush(
ctx, ctx,
eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"), eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"),
startedType), StartedType),
migration: migration, migration: migration,
Name: migration.String(), Name: migration.String(),
} }
} }
func setupDoneCmd(migration Migration, err error) eventstore.Command { func setupDoneCmd(ctx context.Context, migration Migration, err error) eventstore.Command {
ctx := authz.SetCtxData(service.WithService(context.Background(), "system"), authz.CtxData{UserID: "system", OrgID: "SYSTEM", ResourceOwner: "SYSTEM"}) ctx = authz.SetCtxData(service.WithService(ctx, "system"), authz.CtxData{UserID: "system", OrgID: "SYSTEM", ResourceOwner: "SYSTEM"})
typ := doneType typ := doneType
var lastRun interface{} var lastRun interface{}
if repeatable, ok := migration.(RepeatableMigration); ok { if repeatable, ok := migration.(RepeatableMigration); ok {
@@ -80,7 +80,7 @@ func (s *SetupStep) Data() interface{} {
func (s *SetupStep) UniqueConstraints() []*eventstore.EventUniqueConstraint { func (s *SetupStep) UniqueConstraints() []*eventstore.EventUniqueConstraint {
switch s.Type() { switch s.Type() {
case startedType: case StartedType:
return []*eventstore.EventUniqueConstraint{ return []*eventstore.EventUniqueConstraint{
eventstore.NewAddGlobalEventUniqueConstraint("migration_started", s.migration.String(), "Errors.Step.Started.AlreadyExists"), eventstore.NewAddGlobalEventUniqueConstraint("migration_started", s.migration.String(), "Errors.Step.Started.AlreadyExists"),
} }
@@ -97,7 +97,7 @@ func (s *SetupStep) UniqueConstraints() []*eventstore.EventUniqueConstraint {
} }
func RegisterMappers(es *eventstore.Eventstore) { func RegisterMappers(es *eventstore.Eventstore) {
es.RegisterFilterEventMapper(aggregateType, startedType, SetupMapper) es.RegisterFilterEventMapper(aggregateType, StartedType, SetupMapper)
es.RegisterFilterEventMapper(aggregateType, doneType, SetupMapper) es.RegisterFilterEventMapper(aggregateType, doneType, SetupMapper)
es.RegisterFilterEventMapper(aggregateType, failedType, SetupMapper) es.RegisterFilterEventMapper(aggregateType, failedType, SetupMapper)
es.RegisterFilterEventMapper(aggregateType, repeatableDoneType, SetupMapper) es.RegisterFilterEventMapper(aggregateType, repeatableDoneType, SetupMapper)

View File

@@ -12,7 +12,7 @@ import (
) )
const ( const (
startedType = eventstore.EventType("system.migration.started") StartedType = eventstore.EventType("system.migration.started")
doneType = eventstore.EventType("system.migration.done") doneType = eventstore.EventType("system.migration.done")
failedType = eventstore.EventType("system.migration.failed") failedType = eventstore.EventType("system.migration.failed")
repeatableDoneType = eventstore.EventType("system.migration.repeatable.done") repeatableDoneType = eventstore.EventType("system.migration.repeatable.done")
@@ -36,7 +36,7 @@ type RepeatableMigration interface {
} }
func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) { func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) {
logging.Infof("verify migration %s", migration.String()) logging.WithFields("name", migration.String()).Info("verify migration")
if should, err := checkExec(ctx, es, migration); !should || err != nil { if should, err := checkExec(ctx, es, migration); !should || err != nil {
return err return err
@@ -46,11 +46,11 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration
return err return err
} }
logging.Infof("starting migration %s", migration.String()) logging.WithFields("name", migration.String()).Info("starting migration")
err = migration.Execute(ctx) err = migration.Execute(ctx)
logging.OnError(err).Error("migration failed") logging.OnError(err).Error("migration failed")
_, pushErr := es.Push(ctx, setupDoneCmd(migration, err)) _, pushErr := es.Push(ctx, setupDoneCmd(ctx, migration, err))
logging.OnError(pushErr).Error("migration failed") logging.OnError(pushErr).Error("migration failed")
if err != nil { if err != nil {
return err return err
@@ -58,6 +58,48 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration
return pushErr return pushErr
} }
func LatestStep(ctx context.Context, es *eventstore.Eventstore) (*SetupStep, error) {
events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
Limit(1).
AddQuery().
AggregateTypes(aggregateType).
AggregateIDs(aggregateID).
EventTypes(StartedType, doneType, repeatableDoneType, failedType).
Builder())
if err != nil {
return nil, err
}
step, ok := events[0].(*SetupStep)
if !ok {
return nil, errors.ThrowInternal(nil, "MIGRA-hppLM", "setup step is malformed")
}
return step, nil
}
var _ Migration = (*cancelMigration)(nil)
type cancelMigration struct {
name string
}
// Execute implements Migration
func (*cancelMigration) Execute(context.Context) error {
return nil
}
// String implements Migration
func (m *cancelMigration) String() string {
return m.name
}
var errCancelStep = errors.ThrowError(nil, "MIGRA-zo86K", "migration canceled manually")
func CancelStep(ctx context.Context, es *eventstore.Eventstore, step *SetupStep) error {
_, err := es.Push(ctx, setupDoneCmd(ctx, &cancelMigration{name: step.Name}, errCancelStep))
return err
}
// checkExec ensures that only one setup step is done concurrently // checkExec ensures that only one setup step is done concurrently
// if a setup step is already started, it calls shouldExec after some time again // if a setup step is already started, it calls shouldExec after some time again
func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (bool, error) { func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (bool, error) {
@@ -88,7 +130,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat
AddQuery(). AddQuery().
AggregateTypes(aggregateType). AggregateTypes(aggregateType).
AggregateIDs(aggregateID). AggregateIDs(aggregateID).
EventTypes(startedType, doneType, repeatableDoneType, failedType). EventTypes(StartedType, doneType, repeatableDoneType, failedType).
Builder()) Builder())
if err != nil { if err != nil {
return false, err return false, err
@@ -106,7 +148,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat
} }
switch event.Type() { switch event.Type() {
case startedType, failedType: case StartedType, failedType:
isStarted = !isStarted isStarted = !isStarted
case doneType, case doneType,
repeatableDoneType: repeatableDoneType: