mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-12 19:03:40 +00:00
fix: ensure setup steps are done in order and one at a time (#4749)
* fix: ensure setup steps are done in order and one at a time * fix err handling
This commit is contained in:
parent
c64a29a0a0
commit
f86d057a92
@ -2,6 +2,8 @@ package migration
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
errs "errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/zitadel/logging"
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
@ -18,6 +20,10 @@ const (
|
|||||||
aggregateID = "SYSTEM"
|
aggregateID = "SYSTEM"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errMigrationAlreadyStarted = errs.New("already started")
|
||||||
|
)
|
||||||
|
|
||||||
type Migration interface {
|
type Migration interface {
|
||||||
String() string
|
String() string
|
||||||
Execute(context.Context) error
|
Execute(context.Context) error
|
||||||
@ -32,7 +38,7 @@ type RepeatableMigration interface {
|
|||||||
func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) {
|
func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) {
|
||||||
logging.Infof("verify migration %s", migration.String())
|
logging.Infof("verify migration %s", migration.String())
|
||||||
|
|
||||||
if should, err := shouldExec(ctx, es, migration); !should || err != nil {
|
if should, err := checkExec(ctx, es, migration); !should || err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,6 +58,30 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration
|
|||||||
return pushErr
|
return pushErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkExec ensures that only one setup step is done concurrently
|
||||||
|
// if a setup step is already started, it calls shouldExec after some time again
|
||||||
|
func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (bool, error) {
|
||||||
|
timer := time.NewTimer(0)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, errors.ThrowInternal(nil, "MIGR-as3f7", "Errors.Internal")
|
||||||
|
case <-timer.C:
|
||||||
|
should, err := shouldExec(ctx, es, migration)
|
||||||
|
if err != nil {
|
||||||
|
if !errs.Is(err, errMigrationAlreadyStarted) {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
logging.WithFields("migration step", migration.String()).
|
||||||
|
Warn("migration already started, will check again in 5 seconds")
|
||||||
|
timer.Reset(5 * time.Second)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return should, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) {
|
func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) {
|
||||||
events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||||
OrderAsc().
|
OrderAsc().
|
||||||
@ -90,7 +120,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat
|
|||||||
}
|
}
|
||||||
|
|
||||||
if isStarted {
|
if isStarted {
|
||||||
return false, nil
|
return false, errMigrationAlreadyStarted
|
||||||
}
|
}
|
||||||
repeatable, ok := migration.(RepeatableMigration)
|
repeatable, ok := migration.(RepeatableMigration)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user