From c791f6de583ef9037a14c8b7b5d7cd617a83cf6e Mon Sep 17 00:00:00 2001 From: Livio Spring Date: Fri, 4 Nov 2022 10:21:58 +0100 Subject: [PATCH] fix: improve startup times by initializing projection tables during setup (#4642) * fix: improve startup times by initializing projections table during setup * add missing file --- .releaserc.js | 3 +- cmd/build/info.go | 2 +- cmd/setup/config.go | 2 + cmd/setup/projections.go | 31 +++++++ cmd/setup/setup.go | 11 ++- .../eventstore/handler/crdb/handler_stmt.go | 22 +++-- internal/eventstore/handler/crdb/init.go | 44 ++++----- .../eventstore/handler/handler_projection.go | 93 +++++++++++++++++-- .../handler/handler_projection_test.go | 47 +++++++++- internal/notification/projection.go | 2 + internal/query/projection/projection.go | 91 ++++++++++++++---- internal/query/query.go | 3 +- 12 files changed, 285 insertions(+), 66 deletions(-) create mode 100644 cmd/setup/projections.go diff --git a/.releaserc.js b/.releaserc.js index 250d6d71db..f0238e0dc9 100644 --- a/.releaserc.js +++ b/.releaserc.js @@ -1,7 +1,8 @@ module.exports = { branches: [ {name: 'main'}, - {name: '1.87.x', range: '1.87.x', channel: '1.87.x'} + {name: '1.87.x', range: '1.87.x', channel: '1.87.x'}, + {name: 'startup-times', prerelease: 'beta'} ], plugins: [ "@semantic-release/commit-analyzer" diff --git a/cmd/build/info.go b/cmd/build/info.go index aa91a25fb4..1bf5b18ded 100644 --- a/cmd/build/info.go +++ b/cmd/build/info.go @@ -3,7 +3,7 @@ package build import "time" var ( - version = "" + version = time.Now().Format(time.RFC3339) commit = "" date = "" dateTime time.Time diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 86d2e85f29..fe6341b452 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -15,6 +15,7 @@ import ( "github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/id" + "github.com/zitadel/zitadel/internal/query/projection" ) type Config struct { @@ -28,6 +29,7 @@ type Config struct { EncryptionKeys *encryptionKeyConfig DefaultInstance command.InstanceSetup Machine *id.Config + Projections projection.Config } func MustNewConfig(v *viper.Viper) *Config { diff --git a/cmd/setup/projections.go b/cmd/setup/projections.go new file mode 100644 index 0000000000..740fd47831 --- /dev/null +++ b/cmd/setup/projections.go @@ -0,0 +1,31 @@ +package setup + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/query/projection" +) + +type projectionTables struct { + es *eventstore.Eventstore + currentVersion string + + Version string `json:"version"` +} + +func (mig *projectionTables) SetLastExecution(lastRun map[string]interface{}) { + mig.currentVersion, _ = lastRun["version"].(string) +} + +func (mig *projectionTables) Check() bool { + return mig.currentVersion != mig.Version +} + +func (mig *projectionTables) Execute(ctx context.Context) error { + return projection.Init(ctx) +} + +func (mig *projectionTables) String() string { + return "projection_tables" +} diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 0010773524..5de3f619f7 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -8,11 +8,13 @@ import ( "github.com/spf13/viper" "github.com/zitadel/logging" + "github.com/zitadel/zitadel/cmd/build" "github.com/zitadel/zitadel/cmd/key" "github.com/zitadel/zitadel/cmd/tls" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/migration" + "github.com/zitadel/zitadel/internal/query/projection" ) var ( @@ -54,6 +56,7 @@ func Flags(cmd *cobra.Command) { } func Setup(config *Config, steps *Steps, masterKey string) { + ctx := context.Background() logging.Info("setup started") dbClient, err := database.Connect(config.Database, false) @@ -80,6 +83,9 @@ func Setup(config *Config, steps *Steps, masterKey string) { steps.s4EventstoreIndexes = &EventstoreIndexes{dbClient: dbClient, dbType: config.Database.Type()} + err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil) + logging.OnError(err).Fatal("unable to start projections") + repeatableSteps := []migration.RepeatableMigration{ &externalConfigChange{ es: eventstoreClient, @@ -87,9 +93,12 @@ func Setup(config *Config, steps *Steps, masterKey string) { ExternalPort: config.ExternalPort, ExternalSecure: config.ExternalSecure, }, + &projectionTables{ + es: eventstoreClient, + Version: build.Version(), + }, } - ctx := context.Background() err = migration.Migrate(ctx, eventstoreClient, steps.s1ProjectionTable) logging.OnError(err).Fatal("unable to migrate step 1") err = migration.Migrate(ctx, eventstoreClient, steps.s2AssetsTable) diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index fb1e165307..83d844a966 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -42,8 +42,10 @@ type StatementHandler struct { failureCountStmt string setFailureCountStmt string - aggregates []eventstore.AggregateType - reduces map[eventstore.EventType]handler.Reduce + aggregates []eventstore.AggregateType + reduces map[eventstore.EventType]handler.Reduce + initCheck *handler.Check + initialized chan bool bulkLimit uint64 } @@ -73,19 +75,21 @@ func NewStatementHandler( reduces: reduces, bulkLimit: config.BulkLimit, Locker: NewLocker(config.Client, config.LockTable, config.ProjectionName), + initCheck: config.InitCheck, + initialized: make(chan bool), } - initialized := make(chan bool) - h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, initialized) - - err := h.Init(ctx, initialized, config.InitCheck) - logging.OnError(err).WithField("projection", config.ProjectionName).Fatal("unable to initialize projections") - - h.Subscribe(h.aggregates...) + h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized) return h } +func (h *StatementHandler) Start() { + h.initialized <- true + close(h.initialized) + h.Subscribe(h.aggregates...) +} + func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) { sequences, err := h.currentSequences(ctx, h.client.QueryContext, instanceIDs) if err != nil { diff --git a/internal/eventstore/handler/crdb/init.go b/internal/eventstore/handler/crdb/init.go index 4c76e9f288..a57add8939 100644 --- a/internal/eventstore/handler/crdb/init.go +++ b/internal/eventstore/handler/crdb/init.go @@ -186,36 +186,28 @@ type ForeignKey struct { } // Init implements handler.Init -func (h *StatementHandler) Init(ctx context.Context, initialized chan<- bool, checks ...*handler.Check) error { - for _, check := range checks { - if check == nil || check.IsNoop() { - initialized <- true - close(initialized) - return nil - } - tx, err := h.client.BeginTx(ctx, nil) +func (h *StatementHandler) Init(ctx context.Context) error { + check := h.initCheck + if check == nil || check.IsNoop() { + return nil + } + tx, err := h.client.BeginTx(ctx, nil) + if err != nil { + return caos_errs.ThrowInternal(err, "CRDB-SAdf2", "begin failed") + } + for i, execute := range check.Executes { + logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("executing check") + next, err := execute(h.client, h.ProjectionName) if err != nil { - return caos_errs.ThrowInternal(err, "CRDB-SAdf2", "begin failed") - } - for i, execute := range check.Executes { - logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("executing check") - next, err := execute(h.client, h.ProjectionName) - if err != nil { - tx.Rollback() - return err - } - if !next { - logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("skipping next check") - break - } - } - if err := tx.Commit(); err != nil { + tx.Rollback() return err } + if !next { + logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("skipping next check") + break + } } - initialized <- true - close(initialized) - return nil + return tx.Commit() } func NewTableCheck(table *Table, opts ...execOption) *handler.Check { diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index b8084a26b1..c8c8dda7a5 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -12,7 +12,11 @@ import ( "github.com/zitadel/zitadel/internal/eventstore" ) -const systemID = "system" +const ( + schedulerSucceeded = eventstore.EventType("system.projections.scheduler.succeeded") + aggregateType = eventstore.AggregateType("system") + aggregateID = "SYSTEM" +) type ProjectionHandlerConfig struct { HandlerConfig @@ -188,9 +192,35 @@ func (h *ProjectionHandler) schedule(ctx context.Context) { }() // flag if projection has been successfully executed at least once since start var succeededOnce bool + var err error // get every instance id except empty (system) query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).AddQuery().ExcludedInstanceID("") for range h.triggerProjection.C { + if !succeededOnce { + // (re)check if it has succeeded in the meantime + succeededOnce, err = h.hasSucceededOnce(ctx) + if err != nil { + logging.WithFields("projection", h.ProjectionName, "err", err). + Error("schedule could not check if projection has already succeeded once") + h.triggerProjection.Reset(h.requeueAfter) + continue + } + } + lockCtx := ctx + var cancelLock context.CancelFunc + // if it still has not succeeded, lock the projection for the system + // so that only a single scheduler does a first schedule (of every instance) + if !succeededOnce { + lockCtx, cancelLock = context.WithCancel(ctx) + errs := h.lock(lockCtx, h.requeueAfter, "system") + if err, ok := <-errs; err != nil || !ok { + cancelLock() + logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed for first schedule") + h.triggerProjection.Reset(h.requeueAfter) + continue + } + go h.cancelOnErr(lockCtx, errs, cancelLock) + } if succeededOnce { // since we have at least one successful run, we can restrict it to events not older than // twice the requeue time (just to be sure not to miss an event) @@ -209,32 +239,80 @@ func (h *ProjectionHandler) schedule(ctx context.Context) { max = len(ids) } instances := ids[i:max] - lockCtx, cancelLock := context.WithCancel(ctx) - errs := h.lock(lockCtx, h.requeueAfter, instances...) + lockInstanceCtx, cancelInstanceLock := context.WithCancel(lockCtx) + errs := h.lock(lockInstanceCtx, h.requeueAfter, instances...) //wait until projection is locked if err, ok := <-errs; err != nil || !ok { - cancelLock() + cancelInstanceLock() logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed") failed = true continue } - go h.cancelOnErr(lockCtx, errs, cancelLock) - err = h.Trigger(lockCtx, instances...) + go h.cancelOnErr(lockInstanceCtx, errs, cancelInstanceLock) + err = h.Trigger(lockInstanceCtx, instances...) if err != nil { logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed") failed = true } - cancelLock() + cancelInstanceLock() unlockErr := h.unlock(instances...) logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock") } + // if the first schedule did not fail, store that in the eventstore, so we can check on later starts + if !succeededOnce { + if !failed { + err = h.setSucceededOnce(ctx) + logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("unable to push first schedule succeeded") + } + cancelLock() + unlockErr := h.unlock("system") + logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock first schedule") + } // it succeeded at least once if it has succeeded before or if it has succeeded now - not failed ;-) succeededOnce = succeededOnce || !failed h.triggerProjection.Reset(h.requeueAfter) } } +func (h *ProjectionHandler) hasSucceededOnce(ctx context.Context) (bool, error) { + events, err := h.Eventstore.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + AddQuery(). + AggregateTypes(aggregateType). + AggregateIDs(aggregateID). + EventTypes(schedulerSucceeded). + EventData(map[string]interface{}{ + "name": h.ProjectionName, + }). + Builder(), + ) + return len(events) > 0 && err == nil, err +} + +func (h *ProjectionHandler) setSucceededOnce(ctx context.Context) error { + _, err := h.Eventstore.Push(ctx, &ProjectionSucceededEvent{ + BaseEvent: *eventstore.NewBaseEventForPush(ctx, + eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"), + schedulerSucceeded, + ), + Name: h.ProjectionName, + }) + return err +} + +type ProjectionSucceededEvent struct { + eventstore.BaseEvent `json:"-"` + Name string `json:"name"` +} + +func (p *ProjectionSucceededEvent) Data() interface{} { + return p +} + +func (p *ProjectionSucceededEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { + return nil +} + func (h *ProjectionHandler) cancelOnErr(ctx context.Context, errs <-chan error, cancel func()) { for { select { @@ -248,7 +326,6 @@ func (h *ProjectionHandler) cancelOnErr(ctx context.Context, errs <-chan error, cancel() return } - } } diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index 7242dc9eba..c504a9dbd4 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -687,6 +687,25 @@ func TestProjection_schedule(t *testing.T) { }, want{}, }, + { + "filter succeeded once error", + args{ + ctx: context.Background(), + }, + fields{ + eventstore: func(t *testing.T) *eventstore.Eventstore { + return eventstore.NewEventstore( + es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter), + ) + }, + triggerProjection: time.NewTimer(0), + }, + want{ + locksCount: 0, + lockCanceled: false, + unlockCount: 0, + }, + }, { "filter instance ids error", args{ @@ -695,7 +714,15 @@ func TestProjection_schedule(t *testing.T) { fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { return eventstore.NewEventstore( - es_repo_mock.NewRepo(t).ExpectInstanceIDsError(ErrFilter), + es_repo_mock.NewRepo(t).ExpectFilterEvents( + &repository.Event{ + AggregateType: "system", + Sequence: 6, + PreviousAggregateSequence: 5, + InstanceID: "", + Type: "system.projections.scheduler.succeeded", + }). + ExpectInstanceIDsError(ErrFilter), ) }, triggerProjection: time.NewTimer(0), @@ -714,7 +741,14 @@ func TestProjection_schedule(t *testing.T) { fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { return eventstore.NewEventstore( - es_repo_mock.NewRepo(t).ExpectInstanceIDs("instanceID1"), + es_repo_mock.NewRepo(t).ExpectFilterEvents( + &repository.Event{ + AggregateType: "system", + Sequence: 6, + PreviousAggregateSequence: 5, + InstanceID: "", + Type: "system.projections.scheduler.succeeded", + }).ExpectInstanceIDs("instanceID1"), ) }, triggerProjection: time.NewTimer(0), @@ -738,7 +772,14 @@ func TestProjection_schedule(t *testing.T) { fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { return eventstore.NewEventstore( - es_repo_mock.NewRepo(t).ExpectInstanceIDs("instanceID1"), + es_repo_mock.NewRepo(t).ExpectFilterEvents( + &repository.Event{ + AggregateType: "system", + Sequence: 6, + PreviousAggregateSequence: 5, + InstanceID: "", + Type: "system.projections.scheduler.succeeded", + }).ExpectInstanceIDs("instanceID1"), ) }, triggerProjection: time.NewTimer(0), diff --git a/internal/notification/projection.go b/internal/notification/projection.go index 9f0aefe5ad..cbdd9de682 100644 --- a/internal/notification/projection.go +++ b/internal/notification/projection.go @@ -87,6 +87,8 @@ func newNotificationsProjection( p.fileSystemPath = fileSystemPath p.statikDir = statikDir + // needs to be started here as it is not part of the projection.projections / projection.newProjectionsList() + p.Start() return p } diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 803298390b..27c1bc1c19 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -3,7 +3,6 @@ package projection import ( "context" "database/sql" - "time" "github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/eventstore" @@ -63,7 +62,16 @@ var ( NotificationsProjection interface{} ) -func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, config Config, keyEncryptionAlgorithm crypto.EncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm) error { +type projection interface { + Start() + Init(ctx context.Context) error +} + +var ( + projections []projection +) + +func Create(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, config Config, keyEncryptionAlgorithm crypto.EncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm) error { projectionConfig = crdb.StatementHandlerConfig{ ProjectionHandlerConfig: handler.ProjectionHandlerConfig{ HandlerConfig: handler.HandlerConfig{ @@ -123,9 +131,25 @@ func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, co OIDCSettingsProjection = newOIDCSettingsProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["oidc_settings"])) DebugNotificationProviderProjection = newDebugNotificationProviderProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["debug_notification_provider"])) KeyProjection = newKeyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["keys"]), keyEncryptionAlgorithm, certEncryptionAlgorithm) + newProjectionsList() return nil } +func Init(ctx context.Context) error { + for _, p := range projections { + if err := p.Init(ctx); err != nil { + return err + } + } + return nil +} + +func Start() { + for _, projection := range projections { + projection.Start() + } +} + func ApplyCustomConfig(customConfig CustomConfig) crdb.StatementHandlerConfig { return applyCustomConfig(projectionConfig, customConfig) @@ -148,19 +172,54 @@ func applyCustomConfig(config crdb.StatementHandlerConfig, customConfig CustomCo return config } -func iteratorPool(workerCount int) chan func() { - if workerCount <= 0 { - return nil +// we know this is ugly, but we need to have a singleton slice of all projections +// and are only able to initialize it after all projections are created +// as setup and start currently create them individually, we make sure we get the right one +// will be refactored when changing to new id based projections +// +// NotificationsProjection is not added here, because it does not statement based / has no proprietary projection table +func newProjectionsList() { + projections = []projection{ + OrgProjection, + OrgMetadataProjection, + ActionProjection, + FlowProjection, + ProjectProjection, + PasswordComplexityProjection, + PasswordAgeProjection, + LockoutPolicyProjection, + PrivacyPolicyProjection, + DomainPolicyProjection, + LabelPolicyProjection, + ProjectGrantProjection, + ProjectRoleProjection, + OrgDomainProjection, + LoginPolicyProjection, + IDPProjection, + AppProjection, + IDPUserLinkProjection, + IDPLoginPolicyLinkProjection, + MailTemplateProjection, + MessageTextProjection, + CustomTextProjection, + UserProjection, + LoginNameProjection, + OrgMemberProjection, + InstanceDomainProjection, + InstanceMemberProjection, + ProjectMemberProjection, + ProjectGrantMemberProjection, + AuthNKeyProjection, + PersonalAccessTokenProjection, + UserGrantProjection, + UserMetadataProjection, + UserAuthMethodProjection, + InstanceProjection, + SecretGeneratorProjection, + SMTPConfigProjection, + SMSConfigProjection, + OIDCSettingsProjection, + DebugNotificationProviderProjection, + KeyProjection, } - - queue := make(chan func()) - for i := 0; i < workerCount; i++ { - go func() { - for iteration := range queue { - iteration() - time.Sleep(2 * time.Second) - } - }() - } - return queue } diff --git a/internal/query/query.go b/internal/query/query.go index 6e13d3527a..65907bd808 100644 --- a/internal/query/query.go +++ b/internal/query/query.go @@ -79,10 +79,11 @@ func StartQueries(ctx context.Context, es *eventstore.Eventstore, sqlClient *sql }, } - err = projection.Start(ctx, sqlClient, es, projections, keyEncryptionAlgorithm, certEncryptionAlgorithm) + err = projection.Create(ctx, sqlClient, es, projections, keyEncryptionAlgorithm, certEncryptionAlgorithm) if err != nil { return nil, err } + projection.Start() return repo, nil }