mirror of
https://github.com/zitadel/zitadel.git
synced 2025-07-18 22:38:39 +00:00
fix: Auto cleanup failed Setup steps if process is killed (#9736)
# Which Problems Are Solved When running a long-running Zitadel Setup, Kubernetes might decide to move a pod to a new node automatically. Currently, this puts any migrations into a broken state that an operator needs to manually run the "cleanup" command on - assuming they catch the error. The only super long running commands are typically projection pre-fill operations, which depending on the size of the event table for that projection, can take many hours - plenty of time for Kubernetes to make unexpected decisions, especially in a busy cluster. # How the Problems Are Solved This change listens on `os.Interrupt` and `syscall.SIGTERM`, cancels the current Setup context, and runs the `Cleanup` command. The logs then look something like this: ```shell ... INFO[0000] verify migration caller="/Users/zach/src/zitadel/internal/migration/migration.go:43" name=repeatable_delete_stale_org_fields INFO[0000] starting migration caller="/Users/zach/src/zitadel/internal/migration/migration.go:66" name=repeatable_delete_stale_org_fields INFO[0000] execute delete query caller="/Users/zach/src/zitadel/cmd/setup/39.go:37" instance_id=281297936179003398 migration=repeatable_delete_stale_org_fields progress=1/1 INFO[0000] verify migration caller="/Users/zach/src/zitadel/internal/migration/migration.go:43" name=repeatable_fill_fields_for_instance_domains INFO[0000] starting migration caller="/Users/zach/src/zitadel/internal/migration/migration.go:66" name=repeatable_fill_fields_for_instance_domains ----- SIGTERM signal issued ----- INFO[0000] received interrupt signal, shutting down: interrupt caller="/Users/zach/src/zitadel/cmd/setup/setup.go:121" INFO[0000] query failed caller="/Users/zach/src/zitadel/internal/eventstore/repository/sql/query.go:135" error="timeout: context already done: context canceled" DEBU[0000] filter eventstore failed caller="/Users/zach/src/zitadel/internal/eventstore/handler/v2/field_handler.go:155" error="ID=SQL-KyeAx Message=unable to filter events Parent=(timeout: context already done: context canceled)" projection=instance_domain_fields DEBU[0000] unable to rollback tx caller="/Users/zach/src/zitadel/internal/eventstore/handler/v2/field_handler.go:110" error="sql: transaction has already been committed or rolled back" projection=instance_domain_fields INFO[0000] process events failed caller="/Users/zach/src/zitadel/internal/eventstore/handler/v2/field_handler.go:72" error="ID=SQL-KyeAx Message=unable to filter events Parent=(timeout: context already done: context canceled)" projection=instance_domain_fields DEBU[0000] trigger iteration caller="/Users/zach/src/zitadel/internal/eventstore/handler/v2/field_handler.go:73" iteration=0 projection=instance_domain_fields ERRO[0000] migration failed caller="/Users/zach/src/zitadel/internal/migration/migration.go:68" error="ID=SQL-KyeAx Message=unable to filter events Parent=(timeout: context already done: context canceled)" name=repeatable_fill_fields_for_instance_domains ERRO[0000] migration finish failed caller="/Users/zach/src/zitadel/internal/migration/migration.go:71" error="context canceled" name=repeatable_fill_fields_for_instance_domains ----- Cleanup before exiting ----- INFO[0000] cleanup started caller="/Users/zach/src/zitadel/cmd/setup/cleanup.go:30" INFO[0000] cleanup migration caller="/Users/zach/src/zitadel/cmd/setup/cleanup.go:47" name=repeatable_fill_fields_for_instance_domains ``` # Additional Changes * `mustExecuteMigration` -> `executeMigration`: **must**Execute logged a Fatal error previously which calls os.Exit so no cleanup was possible. Instead, this PR returns an error and assigns it to a shared error in the Setup closure that defer can check. * `initProjections` now returns an error instead of exiting # Additional Context This behavior might be unwelcome or at least unexpected in some cases. Putting it behind a feature flag or config setting is likely a good followup. --------- Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com>
This commit is contained in:
parent
658ca3606b
commit
aa9ef8b49e
@ -21,14 +21,12 @@ func NewCleanup() *cobra.Command {
|
|||||||
Long: `cleans up migration if they got stuck`,
|
Long: `cleans up migration if they got stuck`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
config := MustNewConfig(viper.GetViper())
|
config := MustNewConfig(viper.GetViper())
|
||||||
Cleanup(config)
|
Cleanup(cmd.Context(), config)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Cleanup(config *Config) {
|
func Cleanup(ctx context.Context, config *Config) {
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
logging.Info("cleanup started")
|
logging.Info("cleanup started")
|
||||||
|
|
||||||
dbClient, err := database.Connect(config.Database, false)
|
dbClient, err := database.Connect(config.Database, false)
|
||||||
|
@ -5,8 +5,13 @@ import (
|
|||||||
"embed"
|
"embed"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
"path"
|
"path"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@ -102,8 +107,35 @@ func bindForMirror(cmd *cobra.Command) error {
|
|||||||
func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) {
|
func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) {
|
||||||
logging.Info("setup started")
|
logging.Info("setup started")
|
||||||
|
|
||||||
i18n.MustLoadSupportedLanguagesFromDir()
|
var setupErr error
|
||||||
|
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
stop()
|
||||||
|
|
||||||
|
if setupErr == nil {
|
||||||
|
logging.Info("setup completed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if setupErr != nil && !errors.Is(setupErr, context.Canceled) {
|
||||||
|
// If Setup failed for some other reason than the context being cancelled,
|
||||||
|
// then this could be a fatal error we should not retry
|
||||||
|
logging.WithFields("error", setupErr).Fatal("setup failed, skipping cleanup")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we're in the middle of long-running setup, run cleanup before exiting
|
||||||
|
// so if/when we're restarted we can pick up where we left off rather than
|
||||||
|
// booting into a broken state that requires manual intervention
|
||||||
|
// kubernetes will typically kill the pod after 30 seconds if the container does not exit
|
||||||
|
cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 10*time.Second)
|
||||||
|
defer cleanupCancel()
|
||||||
|
|
||||||
|
Cleanup(cleanupCtx, config)
|
||||||
|
}()
|
||||||
|
|
||||||
|
i18n.MustLoadSupportedLanguagesFromDir()
|
||||||
dbClient, err := database.Connect(config.Database, false)
|
dbClient, err := database.Connect(config.Database, false)
|
||||||
logging.OnError(err).Fatal("unable to connect to database")
|
logging.OnError(err).Fatal("unable to connect to database")
|
||||||
|
|
||||||
@ -223,7 +255,10 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
|
|||||||
steps.s52IDPTemplate6LDAP2,
|
steps.s52IDPTemplate6LDAP2,
|
||||||
steps.s53InitPermittedOrgsFunction,
|
steps.s53InitPermittedOrgsFunction,
|
||||||
} {
|
} {
|
||||||
mustExecuteMigration(ctx, eventstoreClient, step, "migration failed")
|
setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed")
|
||||||
|
if setupErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
commands, _, _, _ := startCommandsQueries(ctx, eventstoreClient, eventstoreV4, dbClient, masterKey, config)
|
commands, _, _, _ := startCommandsQueries(ctx, eventstoreClient, eventstoreV4, dbClient, masterKey, config)
|
||||||
@ -257,7 +292,10 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, repeatableStep := range repeatableSteps {
|
for _, repeatableStep := range repeatableSteps {
|
||||||
mustExecuteMigration(ctx, eventstoreClient, repeatableStep, "unable to migrate repeatable step")
|
setupErr = executeMigration(ctx, eventstoreClient, repeatableStep, "unable to migrate repeatable step")
|
||||||
|
if setupErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// These steps are executed after the repeatable steps because they add fields projections
|
// These steps are executed after the repeatable steps because they add fields projections
|
||||||
@ -273,22 +311,25 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
|
|||||||
steps.s43CreateFieldsDomainIndex,
|
steps.s43CreateFieldsDomainIndex,
|
||||||
steps.s48Apps7SAMLConfigsLoginVersion,
|
steps.s48Apps7SAMLConfigsLoginVersion,
|
||||||
} {
|
} {
|
||||||
mustExecuteMigration(ctx, eventstoreClient, step, "migration failed")
|
setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed")
|
||||||
|
if setupErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// projection initialization must be done last, since the steps above might add required columns to the projections
|
// projection initialization must be done last, since the steps above might add required columns to the projections
|
||||||
if !config.ForMirror && config.InitProjections.Enabled {
|
if !config.ForMirror && config.InitProjections.Enabled {
|
||||||
initProjections(
|
setupErr = initProjections(ctx, eventstoreClient)
|
||||||
ctx,
|
if setupErr != nil {
|
||||||
eventstoreClient,
|
return
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustExecuteMigration(ctx context.Context, eventstoreClient *eventstore.Eventstore, step migration.Migration, errorMsg string) {
|
func executeMigration(ctx context.Context, eventstoreClient *eventstore.Eventstore, step migration.Migration, errorMsg string) error {
|
||||||
err := migration.Migrate(ctx, eventstoreClient, step)
|
err := migration.Migrate(ctx, eventstoreClient, step)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
logFields := []any{
|
logFields := []any{
|
||||||
"name", step.String(),
|
"name", step.String(),
|
||||||
@ -303,7 +344,8 @@ func mustExecuteMigration(ctx context.Context, eventstoreClient *eventstore.Even
|
|||||||
"hint", pgErr.Hint,
|
"hint", pgErr.Hint,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
logging.WithFields(logFields...).WithError(err).Fatal(errorMsg)
|
logging.WithFields(logFields...).WithError(err).Error(errorMsg)
|
||||||
|
return fmt.Errorf("%s: %w", errorMsg, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// readStmt reads a single file from the embedded FS,
|
// readStmt reads a single file from the embedded FS,
|
||||||
@ -508,26 +550,36 @@ func startCommandsQueries(
|
|||||||
func initProjections(
|
func initProjections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
eventstoreClient *eventstore.Eventstore,
|
eventstoreClient *eventstore.Eventstore,
|
||||||
) {
|
) error {
|
||||||
logging.Info("init-projections is currently in beta")
|
logging.Info("init-projections is currently in beta")
|
||||||
|
|
||||||
for _, p := range projection.Projections() {
|
for _, p := range projection.Projections() {
|
||||||
err := migration.Migrate(ctx, eventstoreClient, p)
|
if err := migration.Migrate(ctx, eventstoreClient, p); err != nil {
|
||||||
logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", p.String()).OnError(err).Error("projection migration failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range admin_handler.Projections() {
|
for _, p := range admin_handler.Projections() {
|
||||||
err := migration.Migrate(ctx, eventstoreClient, p)
|
if err := migration.Migrate(ctx, eventstoreClient, p); err != nil {
|
||||||
logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", p.String()).OnError(err).Error("admin schema migration failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range auth_handler.Projections() {
|
for _, p := range auth_handler.Projections() {
|
||||||
err := migration.Migrate(ctx, eventstoreClient, p)
|
if err := migration.Migrate(ctx, eventstoreClient, p); err != nil {
|
||||||
logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", p.String()).OnError(err).Error("auth schema migration failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range notify_handler.Projections() {
|
for _, p := range notify_handler.Projections() {
|
||||||
err := migration.Migrate(ctx, eventstoreClient, p)
|
if err := migration.Migrate(ctx, eventstoreClient, p); err != nil {
|
||||||
logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", p.String()).OnError(err).Error("notification migration failed")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user