fix(setup): init projections (#7194)

Even though this is a feature it's released as fix so that we can back port to earlier revisions.

As reported by multiple users startup of ZITADEL after leaded to downtime and worst case rollbacks to the previously deployed version.

The problem starts rising when there are too many events to process after the start of ZITADEL. The root cause are changes on projections (database tables) which must be recomputed. This PR solves this problem by adding a new step to the setup phase which prefills the projections. The step can be enabled by adding the `--init-projections`-flag to `setup`, `start-from-init` and `start-from-setup`. Setting this flag results in potentially longer duration of the setup phase but reduces the risk of the problems mentioned in the paragraph above.

(cherry picked from commit 17953e9040)
This commit is contained in:
Silvan
2024-01-25 17:28:20 +01:00
committed by Livio Spring
parent 50faf37921
commit d3bb9c9b3b
80 changed files with 1296 additions and 962 deletions

View File

@@ -9,6 +9,13 @@ import (
"github.com/zitadel/zitadel/internal/zerrors"
)
func init() {
eventstore.RegisterFilterEventMapper(SystemAggregate, StartedType, SetupMapper)
eventstore.RegisterFilterEventMapper(SystemAggregate, DoneType, SetupMapper)
eventstore.RegisterFilterEventMapper(SystemAggregate, failedType, SetupMapper)
eventstore.RegisterFilterEventMapper(SystemAggregate, repeatableDoneType, SetupMapper)
}
// SetupStep is the command pushed on the eventstore
type SetupStep struct {
eventstore.BaseEvent `json:"-"`
@@ -23,7 +30,7 @@ func setupStartedCmd(ctx context.Context, migration Migration) eventstore.Comman
return &SetupStep{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"),
eventstore.NewAggregate(ctx, SystemAggregateID, SystemAggregate, "v1"),
StartedType),
migration: migration,
Name: migration.String(),
@@ -32,7 +39,7 @@ func setupStartedCmd(ctx context.Context, migration Migration) eventstore.Comman
func setupDoneCmd(ctx context.Context, migration Migration, err error) eventstore.Command {
ctx = authz.SetCtxData(service.WithService(ctx, "system"), authz.CtxData{UserID: "system", OrgID: "SYSTEM", ResourceOwner: "SYSTEM"})
typ := doneType
typ := DoneType
var lastRun interface{}
if repeatable, ok := migration.(RepeatableMigration); ok {
typ = repeatableDoneType
@@ -51,7 +58,7 @@ func setupDoneCmd(ctx context.Context, migration Migration, err error) eventstor
s.BaseEvent = *eventstore.NewBaseEventForPush(
ctx,
eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"),
eventstore.NewAggregate(ctx, SystemAggregateID, SystemAggregate, "v1"),
typ)
return s
@@ -79,13 +86,6 @@ func (s *SetupStep) UniqueConstraints() []*eventstore.UniqueConstraint {
}
}
func RegisterMappers(es *eventstore.Eventstore) {
es.RegisterFilterEventMapper(aggregateType, StartedType, SetupMapper)
es.RegisterFilterEventMapper(aggregateType, doneType, SetupMapper)
es.RegisterFilterEventMapper(aggregateType, failedType, SetupMapper)
es.RegisterFilterEventMapper(aggregateType, repeatableDoneType, SetupMapper)
}
func SetupMapper(event eventstore.Event) (eventstore.Event, error) {
step := &SetupStep{
BaseEvent: *eventstore.BaseEventFromRepo(event),

View File

@@ -13,11 +13,11 @@ import (
const (
StartedType = eventstore.EventType("system.migration.started")
doneType = eventstore.EventType("system.migration.done")
DoneType = eventstore.EventType("system.migration.done")
failedType = eventstore.EventType("system.migration.failed")
repeatableDoneType = eventstore.EventType("system.migration.repeatable.done")
aggregateType = eventstore.AggregateType("system")
aggregateID = "SYSTEM"
SystemAggregate = eventstore.AggregateType("system")
SystemAggregateID = "SYSTEM"
)
var (
@@ -26,7 +26,7 @@ var (
type Migration interface {
String() string
Execute(context.Context) error
Execute(ctx context.Context, startedEvent eventstore.Event) error
}
type errCheckerMigration interface {
@@ -58,12 +58,13 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration
return nil
}
if _, err = es.Push(ctx, setupStartedCmd(ctx, migration)); err != nil && !continueOnErr(err) {
startedEvent, err := es.Push(ctx, setupStartedCmd(ctx, migration))
if err != nil && !continueOnErr(err) {
return err
}
logging.WithFields("name", migration.String()).Info("starting migration")
err = migration.Execute(ctx)
err = migration.Execute(ctx, startedEvent[0])
logging.WithFields("name", migration.String()).OnError(err).Error("migration failed")
_, pushErr := es.Push(ctx, setupDoneCmd(ctx, migration, err))
@@ -95,7 +96,7 @@ type cancelMigration struct {
}
// Execute implements Migration
func (*cancelMigration) Execute(context.Context) error {
func (*cancelMigration) Execute(context.Context, eventstore.Event) error {
return nil
}

View File

@@ -19,9 +19,9 @@ type StepStates struct {
func (*StepStates) Query() *eventstore.SearchQueryBuilder {
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AddQuery().
AggregateTypes(aggregateType).
AggregateIDs(aggregateID).
EventTypes(StartedType, doneType, repeatableDoneType, failedType).
AggregateTypes(SystemAggregate).
AggregateIDs(SystemAggregateID).
EventTypes(StartedType, DoneType, repeatableDoneType, failedType).
Builder()
}
@@ -38,7 +38,7 @@ func (s *StepStates) Reduce() error {
switch step.EventType {
case StartedType:
state.state = StepStarted
case doneType:
case DoneType:
state.state = StepDone
case repeatableDoneType:
state.state = StepDone