From a5d4b08a99636beec6c6e76b8db9991945624a0c Mon Sep 17 00:00:00 2001 From: Silvan Date: Fri, 5 Jan 2024 10:01:48 +0100 Subject: [PATCH] fix(cleanup): cleanup all stuck states (#7145) * fix(setup): unmarshal of failed step * fix(cleanup): cleanup all stuck states * use lastRun for repeatable steps * typo --------- Co-authored-by: Livio Spring --- cmd/setup/cleanup.go | 4 +- cmd/setup/config_change.go | 5 +- cmd/setup/projections.go | 12 ++--- internal/migration/command.go | 6 +-- internal/migration/migration.go | 82 +++++++++------------------- internal/migration/step.go | 85 +++++++++++++++++++++++++++++ internal/migration/step_test.go | 94 +++++++++++++++++++++++++++++++++ 7 files changed, 215 insertions(+), 73 deletions(-) create mode 100644 internal/migration/step.go create mode 100644 internal/migration/step_test.go diff --git a/cmd/setup/cleanup.go b/cmd/setup/cleanup.go index af93cdd1e2..267941a30a 100644 --- a/cmd/setup/cleanup.go +++ b/cmd/setup/cleanup.go @@ -42,10 +42,10 @@ func Cleanup(config *Config) { es := eventstore.NewEventstore(config.Eventstore) migration.RegisterMappers(es) - step, err := migration.LatestStep(ctx, es) + step, err := migration.LastStuckStep(ctx, es) logging.OnError(err).Fatal("unable to query latest migration") - if step.BaseEvent.EventType != migration.StartedType { + if step == nil { logging.Info("there is no stuck migration please run `zitadel setup`") return } diff --git a/cmd/setup/config_change.go b/cmd/setup/config_change.go index c6f107fcf6..3a1c0b6045 100644 --- a/cmd/setup/config_change.go +++ b/cmd/setup/config_change.go @@ -20,14 +20,11 @@ type externalConfigChange struct { defaults systemdefaults.SystemDefaults } -func (mig *externalConfigChange) SetLastExecution(lastRun map[string]interface{}) { +func (mig *externalConfigChange) Check(lastRun map[string]interface{}) bool { mig.currentExternalDomain, _ = lastRun["externalDomain"].(string) externalPort, _ := lastRun["externalPort"].(float64) mig.currentExternalPort = uint16(externalPort) mig.currentExternalSecure, _ = lastRun["externalSecure"].(bool) -} - -func (mig *externalConfigChange) Check() bool { return mig.currentExternalSecure != mig.ExternalSecure || mig.currentExternalPort != mig.ExternalPort || mig.currentExternalDomain != mig.ExternalDomain diff --git a/cmd/setup/projections.go b/cmd/setup/projections.go index 740fd47831..d8ffebb4a6 100644 --- a/cmd/setup/projections.go +++ b/cmd/setup/projections.go @@ -8,18 +8,14 @@ import ( ) type projectionTables struct { - es *eventstore.Eventstore - currentVersion string + es *eventstore.Eventstore Version string `json:"version"` } -func (mig *projectionTables) SetLastExecution(lastRun map[string]interface{}) { - mig.currentVersion, _ = lastRun["version"].(string) -} - -func (mig *projectionTables) Check() bool { - return mig.currentVersion != mig.Version +func (mig *projectionTables) Check(lastRun map[string]interface{}) bool { + currentVersion, _ := lastRun["version"].(string) + return currentVersion != mig.Version } func (mig *projectionTables) Execute(ctx context.Context) error { diff --git a/internal/migration/command.go b/internal/migration/command.go index 49aaebdf92..d48f9462dc 100644 --- a/internal/migration/command.go +++ b/internal/migration/command.go @@ -13,9 +13,9 @@ import ( type SetupStep struct { eventstore.BaseEvent `json:"-"` migration Migration - Name string `json:"name"` - Error string `json:"error,omitempty"` - LastRun interface{} `json:"lastRun,omitempty"` + Name string `json:"name"` + Error any `json:"error,omitempty"` + LastRun any `json:"lastRun,omitempty"` } func setupStartedCmd(ctx context.Context, migration Migration) eventstore.Command { diff --git a/internal/migration/migration.go b/internal/migration/migration.go index 3e94ea5bbd..5c03f28d55 100644 --- a/internal/migration/migration.go +++ b/internal/migration/migration.go @@ -36,8 +36,7 @@ type errCheckerMigration interface { type RepeatableMigration interface { Migration - SetLastExecution(lastRun map[string]interface{}) - Check() bool + Check(lastRun map[string]interface{}) bool } func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) { @@ -51,7 +50,6 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration continueOnErr = errChecker.ContinueOnErr } - // if should, err := checkExec(ctx, es, migration); !should || err != nil { should, err := checkExec(ctx, es, migration) if err != nil && !continueOnErr(err) { return err @@ -76,23 +74,18 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration return pushErr } -func LatestStep(ctx context.Context, es *eventstore.Eventstore) (*SetupStep, error) { - events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - OrderDesc(). - Limit(1). - AddQuery(). - AggregateTypes(aggregateType). - AggregateIDs(aggregateID). - EventTypes(StartedType, doneType, repeatableDoneType, failedType). - Builder()) +func LastStuckStep(ctx context.Context, es *eventstore.Eventstore) (*SetupStep, error) { + var states StepStates + err := es.FilterToQueryReducer(ctx, &states) if err != nil { return nil, err } - step, ok := events[0].(*SetupStep) - if !ok { - return nil, zerrors.ThrowInternal(nil, "MIGRA-hppLM", "setup step is malformed") + step := states.lastByState(StepStarted) + if step == nil { + return nil, nil } - return step, nil + + return step.SetupStep, nil } var _ Migration = (*cancelMigration)(nil) @@ -143,49 +136,26 @@ func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migrati } func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) { - events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - OrderAsc(). - InstanceID(""). - AddQuery(). - AggregateTypes(aggregateType). - AggregateIDs(aggregateID). - EventTypes(StartedType, doneType, repeatableDoneType, failedType). - Builder()) + var states StepStates + err = es.FilterToQueryReducer(ctx, &states) if err != nil { return false, err } - - var isStarted bool - for _, event := range events { - e, ok := event.(*SetupStep) - if !ok { - return false, zerrors.ThrowInternal(nil, "MIGRA-IJY3D", "Errors.Internal") - } - - if e.Name != migration.String() { - continue - } - - switch event.Type() { - case StartedType, failedType: - isStarted = !isStarted - case doneType, - repeatableDoneType: - repeatable, ok := migration.(RepeatableMigration) - if !ok { - return false, nil - } - isStarted = false - repeatable.SetLastExecution(e.LastRun.(map[string]interface{})) - } - } - - if isStarted { - return false, errMigrationAlreadyStarted - } - repeatable, ok := migration.(RepeatableMigration) - if !ok { + step := states.byName(migration.String()) + if step == nil { return true, nil } - return repeatable.Check(), nil + if step.state == StepFailed { + return true, nil + } + if step.state == StepStarted { + return false, errMigrationAlreadyStarted + } + + repeatable, ok := migration.(RepeatableMigration) + if !ok { + return step.state != StepDone, nil + } + lastRun, _ := step.LastRun.(map[string]interface{}) + return repeatable.Check(lastRun), nil } diff --git a/internal/migration/step.go b/internal/migration/step.go new file mode 100644 index 0000000000..ed0ec52ad6 --- /dev/null +++ b/internal/migration/step.go @@ -0,0 +1,85 @@ +package migration + +import "github.com/zitadel/zitadel/internal/eventstore" + +var _ eventstore.QueryReducer = (*StepStates)(nil) + +type Step struct { + *SetupStep + + state StepState +} + +type StepStates struct { + eventstore.ReadModel + Steps []*Step +} + +// Query implements eventstore.QueryReducer. +func (*StepStates) Query() *eventstore.SearchQueryBuilder { + return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + AddQuery(). + AggregateTypes(aggregateType). + AggregateIDs(aggregateID). + EventTypes(StartedType, doneType, repeatableDoneType, failedType). + Builder() +} + +// Reduce implements eventstore.QueryReducer. +func (s *StepStates) Reduce() error { + for _, event := range s.Events { + step := event.(*SetupStep) + state := s.byName(step.Name) + if state == nil { + state = new(Step) + s.Steps = append(s.Steps, state) + } + state.SetupStep = step + switch step.EventType { + case StartedType: + state.state = StepStarted + case doneType: + state.state = StepDone + case repeatableDoneType: + state.state = StepDone + case failedType: + state.state = StepFailed + } + } + return s.ReadModel.Reduce() +} + +func (s *StepStates) byName(name string) *Step { + for _, step := range s.Steps { + if step.Name == name { + return step + } + } + return nil +} + +func (s *StepStates) lastByState(stepState StepState) (step *Step) { + for _, state := range s.Steps { + if state.state != stepState { + continue + } + if step == nil { + step = state + continue + } + if step.CreatedAt().After(state.CreatedAt()) { + continue + } + + step = state + } + return step +} + +type StepState int32 + +const ( + StepStarted StepState = iota + StepDone + StepFailed +) diff --git a/internal/migration/step_test.go b/internal/migration/step_test.go new file mode 100644 index 0000000000..3df34c5a21 --- /dev/null +++ b/internal/migration/step_test.go @@ -0,0 +1,94 @@ +package migration + +import ( + "reflect" + "testing" + "time" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +func TestStepStates_lastByState(t *testing.T) { + now := time.Now() + past := now.Add(-10 * time.Millisecond) + tests := []struct { + name string + fields *StepStates + arg StepState + want *Step + }{ + { + name: "no events reduced invalid state", + fields: &StepStates{}, + arg: -1, + }, + { + name: "no events reduced by valid state", + fields: &StepStates{}, + arg: StepDone, + }, + { + name: "no state found", + fields: &StepStates{ + Steps: []*Step{ + { + SetupStep: &SetupStep{ + Name: "done", + }, + state: StepDone, + }, + { + SetupStep: &SetupStep{ + Name: "failed", + }, + state: StepFailed, + }, + }, + }, + arg: StepStarted, + }, + { + name: "found", + fields: &StepStates{ + Steps: []*Step{ + { + SetupStep: &SetupStep{ + BaseEvent: eventstore.BaseEvent{ + Creation: past, + }, + }, + state: StepStarted, + }, + { + SetupStep: &SetupStep{ + BaseEvent: eventstore.BaseEvent{ + Creation: now, + }, + }, + state: StepStarted, + }, + }, + }, + arg: StepStarted, + want: &Step{ + state: StepStarted, + SetupStep: &SetupStep{ + BaseEvent: eventstore.BaseEvent{ + Creation: now, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &StepStates{ + ReadModel: tt.fields.ReadModel, + Steps: tt.fields.Steps, + } + if gotStep := s.lastByState(tt.arg); !reflect.DeepEqual(gotStep, tt.want) { + t.Errorf("StepStates.lastByState() = %v, want %v", *gotStep, *tt.want) + } + }) + } +}