diff --git a/internal/migration/migration.go b/internal/migration/migration.go index 2f27a3715c..3608332a8a 100644 --- a/internal/migration/migration.go +++ b/internal/migration/migration.go @@ -2,6 +2,8 @@ package migration import ( "context" + errs "errors" + "time" "github.com/zitadel/logging" @@ -18,6 +20,10 @@ const ( aggregateID = "SYSTEM" ) +var ( + errMigrationAlreadyStarted = errs.New("already started") +) + type Migration interface { String() string Execute(context.Context) error @@ -32,7 +38,7 @@ type RepeatableMigration interface { func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) { logging.Infof("verify migration %s", migration.String()) - if should, err := shouldExec(ctx, es, migration); !should || err != nil { + if should, err := checkExec(ctx, es, migration); !should || err != nil { return err } @@ -52,6 +58,30 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration return pushErr } +// checkExec ensures that only one setup step is done concurrently +// if a setup step is already started, it calls shouldExec after some time again +func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (bool, error) { + timer := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return false, errors.ThrowInternal(nil, "MIGR-as3f7", "Errors.Internal") + case <-timer.C: + should, err := shouldExec(ctx, es, migration) + if err != nil { + if !errs.Is(err, errMigrationAlreadyStarted) { + return false, err + } + logging.WithFields("migration step", migration.String()). + Warn("migration already started, will check again in 5 seconds") + timer.Reset(5 * time.Second) + break + } + return should, nil + } + } +} + func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) { events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). OrderAsc(). @@ -90,7 +120,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat } if isStarted { - return false, nil + return false, errMigrationAlreadyStarted } repeatable, ok := migration.(RepeatableMigration) if !ok {