diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index e171c6e611..eaf88cc8be 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -143,7 +143,7 @@ Projections: RequeueEvery: 60s RetryFailedAfter: 1s MaxFailureCount: 5 - ConcurrentInstances: 10 + ConcurrentInstances: 1 BulkLimit: 200 MaxIterators: 1 Customizations: diff --git a/cmd/setup/04.go b/cmd/setup/04.go new file mode 100644 index 0000000000..818663e0ee --- /dev/null +++ b/cmd/setup/04.go @@ -0,0 +1,36 @@ +package setup + +import ( + "context" + "database/sql" + "embed" +) + +var ( + //go:embed 04/cockroach/index.sql + //go:embed 04/postgres/index.sql + stmts embed.FS +) + +type EventstoreIndexes struct { + dbClient *sql.DB + dbType string +} + +func (mig *EventstoreIndexes) Execute(ctx context.Context) error { + stmt, err := readStmt(mig.dbType) + if err != nil { + return err + } + _, err = mig.dbClient.ExecContext(ctx, stmt) + return err +} + +func (mig *EventstoreIndexes) String() string { + return "04_eventstore_indexes" +} + +func readStmt(typ string) (string, error) { + stmt, err := stmts.ReadFile("04/" + typ + "/index.sql") + return string(stmt), err +} diff --git a/cmd/setup/04/cockroach/index.sql b/cmd/setup/04/cockroach/index.sql new file mode 100644 index 0000000000..f68ed948a5 --- /dev/null +++ b/cmd/setup/04/cockroach/index.sql @@ -0,0 +1,4 @@ +CREATE INDEX IF NOT EXISTS write_model ON eventstore.events (instance_id, aggregate_type, aggregate_id, event_type, resource_owner) + STORING (id, aggregate_version, previous_aggregate_sequence, creation_date, event_data, editor_user, editor_service, previous_aggregate_type_sequence); + +CREATE INDEX IF NOT EXISTS active_instances ON eventstore.events (creation_date desc, instance_id) USING HASH; diff --git a/cmd/setup/04/postgres/index.sql b/cmd/setup/04/postgres/index.sql new file mode 100644 index 0000000000..d4c933ece8 --- /dev/null +++ b/cmd/setup/04/postgres/index.sql @@ -0,0 +1,3 @@ +CREATE INDEX IF NOT EXISTS write_model ON eventstore.events (instance_id, aggregate_type, aggregate_id, event_type, resource_owner); + +CREATE INDEX IF NOT EXISTS active_instances ON eventstore.events (creation_date, instance_id); diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 2977eeb48d..86d2e85f29 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -52,9 +52,10 @@ func MustNewConfig(v *viper.Viper) *Config { } type Steps struct { - s1ProjectionTable *ProjectionTable - s2AssetsTable *AssetTable - FirstInstance *FirstInstance + s1ProjectionTable *ProjectionTable + s2AssetsTable *AssetTable + FirstInstance *FirstInstance + s4EventstoreIndexes *EventstoreIndexes } type encryptionKeyConfig struct { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 8e7b81ba8c..0010773524 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -78,6 +78,8 @@ func Setup(config *Config, steps *Steps, masterKey string) { steps.FirstInstance.externalSecure = config.ExternalSecure steps.FirstInstance.externalPort = config.ExternalPort + steps.s4EventstoreIndexes = &EventstoreIndexes{dbClient: dbClient, dbType: config.Database.Type()} + repeatableSteps := []migration.RepeatableMigration{ &externalConfigChange{ es: eventstoreClient, @@ -94,6 +96,8 @@ func Setup(config *Config, steps *Steps, masterKey string) { logging.OnError(err).Fatal("unable to migrate step 2") err = migration.Migrate(ctx, eventstoreClient, steps.FirstInstance) logging.OnError(err).Fatal("unable to migrate step 3") + err = migration.Migrate(ctx, eventstoreClient, steps.s4EventstoreIndexes) + logging.OnError(err).Fatal("unable to migrate step 4") for _, repeatableStep := range repeatableSteps { err = migration.Migrate(ctx, eventstoreClient, repeatableStep) diff --git a/internal/authz/repository/eventsourcing/eventstore/token_verifier.go b/internal/authz/repository/eventsourcing/eventstore/token_verifier.go index 6c856640ef..721b0401e8 100644 --- a/internal/authz/repository/eventsourcing/eventstore/token_verifier.go +++ b/internal/authz/repository/eventsourcing/eventstore/token_verifier.go @@ -42,7 +42,14 @@ func (repo *TokenVerifierRepo) Health() error { func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID string) (_ *usr_model.TokenView, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() - token, viewErr := repo.View.TokenByID(tokenID, authz.GetInstance(ctx).InstanceID()) + + instanceID := authz.GetInstance(ctx).InstanceID() + sequence, err := repo.View.GetLatestTokenSequence(instanceID) + logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID"). + OnError(err). + Errorf("could not get current sequence for token check") + + token, viewErr := repo.View.TokenByID(tokenID, instanceID) if viewErr != nil && !caos_errs.IsNotFound(viewErr) { return nil, viewErr } @@ -50,9 +57,12 @@ func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID st token = new(model.TokenView) token.ID = tokenID token.UserID = userID + if sequence != nil { + token.Sequence = sequence.CurrentSequence + } } - events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence) + events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.Sequence) if caos_errs.IsNotFound(viewErr) && len(events) == 0 { return nil, caos_errs.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound") } diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index ae2c4c2c80..856cf3042d 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -182,13 +182,23 @@ func (h *ProjectionHandler) schedule(ctx context.Context) { } cancel() }() + // flag if projection has been successfully executed at least once since start + var succeededOnce bool + // get every instance id except empty (system) + query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).AddQuery().ExcludedInstanceID("") for range h.triggerProjection.C { - ids, err := h.Eventstore.InstanceIDs(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).AddQuery().ExcludedInstanceID("").Builder()) + if succeededOnce { + // since we have at least one successful run, we can restrict it to events not older than + // twice the requeue time (just to be sure not to miss an event) + query = query.CreationDateAfter(time.Now().Add(-2 * h.requeueAfter)) + } + ids, err := h.Eventstore.InstanceIDs(ctx, query.Builder()) if err != nil { logging.WithFields("projection", h.ProjectionName).WithError(err).Error("instance ids") h.triggerProjection.Reset(h.requeueAfter) continue } + var failed bool for i := 0; i < len(ids); i = i + h.concurrentInstances { max := i + h.concurrentInstances if max > len(ids) { @@ -201,18 +211,22 @@ func (h *ProjectionHandler) schedule(ctx context.Context) { if err, ok := <-errs; err != nil || !ok { cancelLock() logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed") + failed = true continue } go h.cancelOnErr(lockCtx, errs, cancelLock) err = h.Trigger(lockCtx, instances...) if err != nil { logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed") + failed = true } cancelLock() unlockErr := h.unlock(instances...) logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock") } + // it succeeded at least once if it has succeeded before or if it has succeeded now - not failed ;-) + succeededOnce = succeededOnce || !failed h.triggerProjection.Reset(h.requeueAfter) } } diff --git a/internal/eventstore/repository/search_query.go b/internal/eventstore/repository/search_query.go index d517e7e59e..08a04bd7ce 100644 --- a/internal/eventstore/repository/search_query.go +++ b/internal/eventstore/repository/search_query.go @@ -85,6 +85,8 @@ const ( FieldEventType //FieldEventData represents the event data field FieldEventData + //FieldCreationDate represents the creation date field + FieldCreationDate fieldCount ) diff --git a/internal/eventstore/repository/sql/crdb.go b/internal/eventstore/repository/sql/crdb.go index cc42611385..afd475dde3 100644 --- a/internal/eventstore/repository/sql/crdb.go +++ b/internal/eventstore/repository/sql/crdb.go @@ -297,6 +297,8 @@ func (db *CRDB) columnName(col repository.Field) string { return "event_type" case repository.FieldEventData: return "event_data" + case repository.FieldCreationDate: + return "creation_date" default: return "" } diff --git a/internal/eventstore/search_query.go b/internal/eventstore/search_query.go index 38013580b9..e82e46c29b 100644 --- a/internal/eventstore/search_query.go +++ b/internal/eventstore/search_query.go @@ -2,6 +2,7 @@ package eventstore import ( "database/sql" + "time" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/errors" @@ -30,6 +31,7 @@ type SearchQuery struct { eventSequenceLess uint64 eventTypes []EventType eventData map[string]interface{} + creationDateAfter time.Time } // Columns defines which fields of the event are needed for the query @@ -175,6 +177,12 @@ func (query *SearchQuery) ExcludedInstanceID(instanceIDs ...string) *SearchQuery return query } +// CreationDateNewer filters for events which happened after the specified time +func (query *SearchQuery) CreationDateAfter(time time.Time) *SearchQuery { + query.creationDateAfter = time + return query +} + // EventTypes filters for events with the given event types func (query *SearchQuery) EventTypes(types ...EventType) *SearchQuery { query.eventTypes = types @@ -234,6 +242,7 @@ func (builder *SearchQueryBuilder) build(instanceID string) (*repository.SearchQ query.eventSequenceLessFilter, query.instanceIDFilter, query.excludedInstanceIDFilter, + query.creationDateAfterFilter, query.builder.resourceOwnerFilter, query.builder.instanceIDFilter, } { @@ -344,6 +353,13 @@ func (builder *SearchQueryBuilder) instanceIDFilter() *repository.Filter { return repository.NewFilter(repository.FieldInstanceID, builder.instanceID, repository.OperationEquals) } +func (query *SearchQuery) creationDateAfterFilter() *repository.Filter { + if query.creationDateAfter.IsZero() { + return nil + } + return repository.NewFilter(repository.FieldCreationDate, query.creationDateAfter, repository.OperationGreater) +} + func (query *SearchQuery) eventDataFilter() *repository.Filter { if len(query.eventData) == 0 { return nil diff --git a/internal/eventstore/search_query_test.go b/internal/eventstore/search_query_test.go index c5e295c0d7..62153161ba 100644 --- a/internal/eventstore/search_query_test.go +++ b/internal/eventstore/search_query_test.go @@ -4,6 +4,7 @@ import ( "math" "reflect" "testing" + "time" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/errors" @@ -86,6 +87,13 @@ func testSetResourceOwner(resourceOwner string) func(*SearchQueryBuilder) *Searc } } +func testSetCreationDateAfter(date time.Time) func(*SearchQuery) *SearchQuery { + return func(query *SearchQuery) *SearchQuery { + query = query.CreationDateAfter(date) + return query + } +} + func testSetSortOrder(asc bool) func(*SearchQueryBuilder) *SearchQueryBuilder { return func(query *SearchQueryBuilder) *SearchQueryBuilder { if asc { @@ -224,6 +232,7 @@ func TestSearchQuerybuilderSetters(t *testing.T) { } func TestSearchQuerybuilderBuild(t *testing.T) { + testNow := time.Now() type args struct { columns Columns setters []func(*SearchQueryBuilder) *SearchQueryBuilder @@ -648,6 +657,34 @@ func TestSearchQuerybuilderBuild(t *testing.T) { }, }, }, + { + name: "filter aggregate type, instanceID and creation date after", + args: args{ + columns: ColumnsEvent, + setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{ + testAddQuery( + testSetAggregateTypes("user"), + testSetCreationDateAfter(testNow), + ), + }, + instanceID: "instanceID", + }, + res: res{ + isErr: nil, + query: &repository.SearchQuery{ + Columns: repository.ColumnsEvent, + Desc: false, + Limit: 0, + Filters: [][]*repository.Filter{ + { + repository.NewFilter(repository.FieldAggregateType, repository.AggregateType("user"), repository.OperationEquals), + repository.NewFilter(repository.FieldCreationDate, testNow, repository.OperationGreater), + repository.NewFilter(repository.FieldInstanceID, "instanceID", repository.OperationEquals), + }, + }, + }, + }, + }, { name: "column invalid", args: args{