mirror of
https://github.com/zitadel/zitadel.git
synced 2025-02-28 21:17:23 +00:00
fix(cleanup): cleanup all stuck states (#7145)
* fix(setup): unmarshal of failed step * fix(cleanup): cleanup all stuck states * use lastRun for repeatable steps * typo --------- Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
parent
41215bdc0a
commit
a5d4b08a99
@ -42,10 +42,10 @@ func Cleanup(config *Config) {
|
||||
es := eventstore.NewEventstore(config.Eventstore)
|
||||
migration.RegisterMappers(es)
|
||||
|
||||
step, err := migration.LatestStep(ctx, es)
|
||||
step, err := migration.LastStuckStep(ctx, es)
|
||||
logging.OnError(err).Fatal("unable to query latest migration")
|
||||
|
||||
if step.BaseEvent.EventType != migration.StartedType {
|
||||
if step == nil {
|
||||
logging.Info("there is no stuck migration please run `zitadel setup`")
|
||||
return
|
||||
}
|
||||
|
@ -20,14 +20,11 @@ type externalConfigChange struct {
|
||||
defaults systemdefaults.SystemDefaults
|
||||
}
|
||||
|
||||
func (mig *externalConfigChange) SetLastExecution(lastRun map[string]interface{}) {
|
||||
func (mig *externalConfigChange) Check(lastRun map[string]interface{}) bool {
|
||||
mig.currentExternalDomain, _ = lastRun["externalDomain"].(string)
|
||||
externalPort, _ := lastRun["externalPort"].(float64)
|
||||
mig.currentExternalPort = uint16(externalPort)
|
||||
mig.currentExternalSecure, _ = lastRun["externalSecure"].(bool)
|
||||
}
|
||||
|
||||
func (mig *externalConfigChange) Check() bool {
|
||||
return mig.currentExternalSecure != mig.ExternalSecure ||
|
||||
mig.currentExternalPort != mig.ExternalPort ||
|
||||
mig.currentExternalDomain != mig.ExternalDomain
|
||||
|
@ -8,18 +8,14 @@ import (
|
||||
)
|
||||
|
||||
type projectionTables struct {
|
||||
es *eventstore.Eventstore
|
||||
currentVersion string
|
||||
es *eventstore.Eventstore
|
||||
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
func (mig *projectionTables) SetLastExecution(lastRun map[string]interface{}) {
|
||||
mig.currentVersion, _ = lastRun["version"].(string)
|
||||
}
|
||||
|
||||
func (mig *projectionTables) Check() bool {
|
||||
return mig.currentVersion != mig.Version
|
||||
func (mig *projectionTables) Check(lastRun map[string]interface{}) bool {
|
||||
currentVersion, _ := lastRun["version"].(string)
|
||||
return currentVersion != mig.Version
|
||||
}
|
||||
|
||||
func (mig *projectionTables) Execute(ctx context.Context) error {
|
||||
|
@ -13,9 +13,9 @@ import (
|
||||
type SetupStep struct {
|
||||
eventstore.BaseEvent `json:"-"`
|
||||
migration Migration
|
||||
Name string `json:"name"`
|
||||
Error string `json:"error,omitempty"`
|
||||
LastRun interface{} `json:"lastRun,omitempty"`
|
||||
Name string `json:"name"`
|
||||
Error any `json:"error,omitempty"`
|
||||
LastRun any `json:"lastRun,omitempty"`
|
||||
}
|
||||
|
||||
func setupStartedCmd(ctx context.Context, migration Migration) eventstore.Command {
|
||||
|
@ -36,8 +36,7 @@ type errCheckerMigration interface {
|
||||
|
||||
type RepeatableMigration interface {
|
||||
Migration
|
||||
SetLastExecution(lastRun map[string]interface{})
|
||||
Check() bool
|
||||
Check(lastRun map[string]interface{}) bool
|
||||
}
|
||||
|
||||
func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) {
|
||||
@ -51,7 +50,6 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration
|
||||
continueOnErr = errChecker.ContinueOnErr
|
||||
}
|
||||
|
||||
// if should, err := checkExec(ctx, es, migration); !should || err != nil {
|
||||
should, err := checkExec(ctx, es, migration)
|
||||
if err != nil && !continueOnErr(err) {
|
||||
return err
|
||||
@ -76,23 +74,18 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration
|
||||
return pushErr
|
||||
}
|
||||
|
||||
func LatestStep(ctx context.Context, es *eventstore.Eventstore) (*SetupStep, error) {
|
||||
events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
OrderDesc().
|
||||
Limit(1).
|
||||
AddQuery().
|
||||
AggregateTypes(aggregateType).
|
||||
AggregateIDs(aggregateID).
|
||||
EventTypes(StartedType, doneType, repeatableDoneType, failedType).
|
||||
Builder())
|
||||
func LastStuckStep(ctx context.Context, es *eventstore.Eventstore) (*SetupStep, error) {
|
||||
var states StepStates
|
||||
err := es.FilterToQueryReducer(ctx, &states)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
step, ok := events[0].(*SetupStep)
|
||||
if !ok {
|
||||
return nil, zerrors.ThrowInternal(nil, "MIGRA-hppLM", "setup step is malformed")
|
||||
step := states.lastByState(StepStarted)
|
||||
if step == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return step, nil
|
||||
|
||||
return step.SetupStep, nil
|
||||
}
|
||||
|
||||
var _ Migration = (*cancelMigration)(nil)
|
||||
@ -143,49 +136,26 @@ func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migrati
|
||||
}
|
||||
|
||||
func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) {
|
||||
events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
OrderAsc().
|
||||
InstanceID("").
|
||||
AddQuery().
|
||||
AggregateTypes(aggregateType).
|
||||
AggregateIDs(aggregateID).
|
||||
EventTypes(StartedType, doneType, repeatableDoneType, failedType).
|
||||
Builder())
|
||||
var states StepStates
|
||||
err = es.FilterToQueryReducer(ctx, &states)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var isStarted bool
|
||||
for _, event := range events {
|
||||
e, ok := event.(*SetupStep)
|
||||
if !ok {
|
||||
return false, zerrors.ThrowInternal(nil, "MIGRA-IJY3D", "Errors.Internal")
|
||||
}
|
||||
|
||||
if e.Name != migration.String() {
|
||||
continue
|
||||
}
|
||||
|
||||
switch event.Type() {
|
||||
case StartedType, failedType:
|
||||
isStarted = !isStarted
|
||||
case doneType,
|
||||
repeatableDoneType:
|
||||
repeatable, ok := migration.(RepeatableMigration)
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
isStarted = false
|
||||
repeatable.SetLastExecution(e.LastRun.(map[string]interface{}))
|
||||
}
|
||||
}
|
||||
|
||||
if isStarted {
|
||||
return false, errMigrationAlreadyStarted
|
||||
}
|
||||
repeatable, ok := migration.(RepeatableMigration)
|
||||
if !ok {
|
||||
step := states.byName(migration.String())
|
||||
if step == nil {
|
||||
return true, nil
|
||||
}
|
||||
return repeatable.Check(), nil
|
||||
if step.state == StepFailed {
|
||||
return true, nil
|
||||
}
|
||||
if step.state == StepStarted {
|
||||
return false, errMigrationAlreadyStarted
|
||||
}
|
||||
|
||||
repeatable, ok := migration.(RepeatableMigration)
|
||||
if !ok {
|
||||
return step.state != StepDone, nil
|
||||
}
|
||||
lastRun, _ := step.LastRun.(map[string]interface{})
|
||||
return repeatable.Check(lastRun), nil
|
||||
}
|
||||
|
85
internal/migration/step.go
Normal file
85
internal/migration/step.go
Normal file
@ -0,0 +1,85 @@
|
||||
package migration
|
||||
|
||||
import "github.com/zitadel/zitadel/internal/eventstore"
|
||||
|
||||
var _ eventstore.QueryReducer = (*StepStates)(nil)
|
||||
|
||||
type Step struct {
|
||||
*SetupStep
|
||||
|
||||
state StepState
|
||||
}
|
||||
|
||||
type StepStates struct {
|
||||
eventstore.ReadModel
|
||||
Steps []*Step
|
||||
}
|
||||
|
||||
// Query implements eventstore.QueryReducer.
|
||||
func (*StepStates) Query() *eventstore.SearchQueryBuilder {
|
||||
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
AddQuery().
|
||||
AggregateTypes(aggregateType).
|
||||
AggregateIDs(aggregateID).
|
||||
EventTypes(StartedType, doneType, repeatableDoneType, failedType).
|
||||
Builder()
|
||||
}
|
||||
|
||||
// Reduce implements eventstore.QueryReducer.
|
||||
func (s *StepStates) Reduce() error {
|
||||
for _, event := range s.Events {
|
||||
step := event.(*SetupStep)
|
||||
state := s.byName(step.Name)
|
||||
if state == nil {
|
||||
state = new(Step)
|
||||
s.Steps = append(s.Steps, state)
|
||||
}
|
||||
state.SetupStep = step
|
||||
switch step.EventType {
|
||||
case StartedType:
|
||||
state.state = StepStarted
|
||||
case doneType:
|
||||
state.state = StepDone
|
||||
case repeatableDoneType:
|
||||
state.state = StepDone
|
||||
case failedType:
|
||||
state.state = StepFailed
|
||||
}
|
||||
}
|
||||
return s.ReadModel.Reduce()
|
||||
}
|
||||
|
||||
func (s *StepStates) byName(name string) *Step {
|
||||
for _, step := range s.Steps {
|
||||
if step.Name == name {
|
||||
return step
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StepStates) lastByState(stepState StepState) (step *Step) {
|
||||
for _, state := range s.Steps {
|
||||
if state.state != stepState {
|
||||
continue
|
||||
}
|
||||
if step == nil {
|
||||
step = state
|
||||
continue
|
||||
}
|
||||
if step.CreatedAt().After(state.CreatedAt()) {
|
||||
continue
|
||||
}
|
||||
|
||||
step = state
|
||||
}
|
||||
return step
|
||||
}
|
||||
|
||||
type StepState int32
|
||||
|
||||
const (
|
||||
StepStarted StepState = iota
|
||||
StepDone
|
||||
StepFailed
|
||||
)
|
94
internal/migration/step_test.go
Normal file
94
internal/migration/step_test.go
Normal file
@ -0,0 +1,94 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
func TestStepStates_lastByState(t *testing.T) {
|
||||
now := time.Now()
|
||||
past := now.Add(-10 * time.Millisecond)
|
||||
tests := []struct {
|
||||
name string
|
||||
fields *StepStates
|
||||
arg StepState
|
||||
want *Step
|
||||
}{
|
||||
{
|
||||
name: "no events reduced invalid state",
|
||||
fields: &StepStates{},
|
||||
arg: -1,
|
||||
},
|
||||
{
|
||||
name: "no events reduced by valid state",
|
||||
fields: &StepStates{},
|
||||
arg: StepDone,
|
||||
},
|
||||
{
|
||||
name: "no state found",
|
||||
fields: &StepStates{
|
||||
Steps: []*Step{
|
||||
{
|
||||
SetupStep: &SetupStep{
|
||||
Name: "done",
|
||||
},
|
||||
state: StepDone,
|
||||
},
|
||||
{
|
||||
SetupStep: &SetupStep{
|
||||
Name: "failed",
|
||||
},
|
||||
state: StepFailed,
|
||||
},
|
||||
},
|
||||
},
|
||||
arg: StepStarted,
|
||||
},
|
||||
{
|
||||
name: "found",
|
||||
fields: &StepStates{
|
||||
Steps: []*Step{
|
||||
{
|
||||
SetupStep: &SetupStep{
|
||||
BaseEvent: eventstore.BaseEvent{
|
||||
Creation: past,
|
||||
},
|
||||
},
|
||||
state: StepStarted,
|
||||
},
|
||||
{
|
||||
SetupStep: &SetupStep{
|
||||
BaseEvent: eventstore.BaseEvent{
|
||||
Creation: now,
|
||||
},
|
||||
},
|
||||
state: StepStarted,
|
||||
},
|
||||
},
|
||||
},
|
||||
arg: StepStarted,
|
||||
want: &Step{
|
||||
state: StepStarted,
|
||||
SetupStep: &SetupStep{
|
||||
BaseEvent: eventstore.BaseEvent{
|
||||
Creation: now,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := &StepStates{
|
||||
ReadModel: tt.fields.ReadModel,
|
||||
Steps: tt.fields.Steps,
|
||||
}
|
||||
if gotStep := s.lastByState(tt.arg); !reflect.DeepEqual(gotStep, tt.want) {
|
||||
t.Errorf("StepStates.lastByState() = %v, want %v", *gotStep, *tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user