fix: improve performance (#4300)

## Note

This release requires a setup step to fully improve performance.
Be sure to start ZITADEL with an appropriate command (zitadel start-from-init / start-from-setup)

## Changes

- fix: only run projection scheduler on active instances
- fix: set default for concurrent instances of projections to 1 (for scheduling)
- fix: create more indexes on eventstore.events table
- fix: get current sequence for token check (improve reread performance)
This commit is contained in:
Livio Spring 2022-09-02 16:05:13 +02:00 committed by GitHub
parent f0250a3fdb
commit 5aa91ad105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 136 additions and 7 deletions

View File

@ -143,7 +143,7 @@ Projections:
RequeueEvery: 60s RequeueEvery: 60s
RetryFailedAfter: 1s RetryFailedAfter: 1s
MaxFailureCount: 5 MaxFailureCount: 5
ConcurrentInstances: 10 ConcurrentInstances: 1
BulkLimit: 200 BulkLimit: 200
MaxIterators: 1 MaxIterators: 1
Customizations: Customizations:

36
cmd/setup/04.go Normal file
View File

@ -0,0 +1,36 @@
package setup
import (
"context"
"database/sql"
"embed"
)
var (
//go:embed 04/cockroach/index.sql
//go:embed 04/postgres/index.sql
stmts embed.FS
)
type EventstoreIndexes struct {
dbClient *sql.DB
dbType string
}
func (mig *EventstoreIndexes) Execute(ctx context.Context) error {
stmt, err := readStmt(mig.dbType)
if err != nil {
return err
}
_, err = mig.dbClient.ExecContext(ctx, stmt)
return err
}
func (mig *EventstoreIndexes) String() string {
return "04_eventstore_indexes"
}
func readStmt(typ string) (string, error) {
stmt, err := stmts.ReadFile("04/" + typ + "/index.sql")
return string(stmt), err
}

View File

@ -0,0 +1,4 @@
CREATE INDEX IF NOT EXISTS write_model ON eventstore.events (instance_id, aggregate_type, aggregate_id, event_type, resource_owner)
STORING (id, aggregate_version, previous_aggregate_sequence, creation_date, event_data, editor_user, editor_service, previous_aggregate_type_sequence);
CREATE INDEX IF NOT EXISTS active_instances ON eventstore.events (creation_date desc, instance_id) USING HASH;

View File

@ -0,0 +1,3 @@
CREATE INDEX IF NOT EXISTS write_model ON eventstore.events (instance_id, aggregate_type, aggregate_id, event_type, resource_owner);
CREATE INDEX IF NOT EXISTS active_instances ON eventstore.events (creation_date, instance_id);

View File

@ -52,9 +52,10 @@ func MustNewConfig(v *viper.Viper) *Config {
} }
type Steps struct { type Steps struct {
s1ProjectionTable *ProjectionTable s1ProjectionTable *ProjectionTable
s2AssetsTable *AssetTable s2AssetsTable *AssetTable
FirstInstance *FirstInstance FirstInstance *FirstInstance
s4EventstoreIndexes *EventstoreIndexes
} }
type encryptionKeyConfig struct { type encryptionKeyConfig struct {

View File

@ -78,6 +78,8 @@ func Setup(config *Config, steps *Steps, masterKey string) {
steps.FirstInstance.externalSecure = config.ExternalSecure steps.FirstInstance.externalSecure = config.ExternalSecure
steps.FirstInstance.externalPort = config.ExternalPort steps.FirstInstance.externalPort = config.ExternalPort
steps.s4EventstoreIndexes = &EventstoreIndexes{dbClient: dbClient, dbType: config.Database.Type()}
repeatableSteps := []migration.RepeatableMigration{ repeatableSteps := []migration.RepeatableMigration{
&externalConfigChange{ &externalConfigChange{
es: eventstoreClient, es: eventstoreClient,
@ -94,6 +96,8 @@ func Setup(config *Config, steps *Steps, masterKey string) {
logging.OnError(err).Fatal("unable to migrate step 2") logging.OnError(err).Fatal("unable to migrate step 2")
err = migration.Migrate(ctx, eventstoreClient, steps.FirstInstance) err = migration.Migrate(ctx, eventstoreClient, steps.FirstInstance)
logging.OnError(err).Fatal("unable to migrate step 3") logging.OnError(err).Fatal("unable to migrate step 3")
err = migration.Migrate(ctx, eventstoreClient, steps.s4EventstoreIndexes)
logging.OnError(err).Fatal("unable to migrate step 4")
for _, repeatableStep := range repeatableSteps { for _, repeatableStep := range repeatableSteps {
err = migration.Migrate(ctx, eventstoreClient, repeatableStep) err = migration.Migrate(ctx, eventstoreClient, repeatableStep)

View File

@ -42,7 +42,14 @@ func (repo *TokenVerifierRepo) Health() error {
func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID string) (_ *usr_model.TokenView, err error) { func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID string) (_ *usr_model.TokenView, err error) {
ctx, span := tracing.NewSpan(ctx) ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }() defer func() { span.EndWithError(err) }()
token, viewErr := repo.View.TokenByID(tokenID, authz.GetInstance(ctx).InstanceID())
instanceID := authz.GetInstance(ctx).InstanceID()
sequence, err := repo.View.GetLatestTokenSequence(instanceID)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID").
OnError(err).
Errorf("could not get current sequence for token check")
token, viewErr := repo.View.TokenByID(tokenID, instanceID)
if viewErr != nil && !caos_errs.IsNotFound(viewErr) { if viewErr != nil && !caos_errs.IsNotFound(viewErr) {
return nil, viewErr return nil, viewErr
} }
@ -50,9 +57,12 @@ func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID st
token = new(model.TokenView) token = new(model.TokenView)
token.ID = tokenID token.ID = tokenID
token.UserID = userID token.UserID = userID
if sequence != nil {
token.Sequence = sequence.CurrentSequence
}
} }
events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence) events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.Sequence)
if caos_errs.IsNotFound(viewErr) && len(events) == 0 { if caos_errs.IsNotFound(viewErr) && len(events) == 0 {
return nil, caos_errs.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound") return nil, caos_errs.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound")
} }

View File

@ -182,13 +182,23 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
} }
cancel() cancel()
}() }()
// flag if projection has been successfully executed at least once since start
var succeededOnce bool
// get every instance id except empty (system)
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).AddQuery().ExcludedInstanceID("")
for range h.triggerProjection.C { for range h.triggerProjection.C {
ids, err := h.Eventstore.InstanceIDs(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).AddQuery().ExcludedInstanceID("").Builder()) if succeededOnce {
// since we have at least one successful run, we can restrict it to events not older than
// twice the requeue time (just to be sure not to miss an event)
query = query.CreationDateAfter(time.Now().Add(-2 * h.requeueAfter))
}
ids, err := h.Eventstore.InstanceIDs(ctx, query.Builder())
if err != nil { if err != nil {
logging.WithFields("projection", h.ProjectionName).WithError(err).Error("instance ids") logging.WithFields("projection", h.ProjectionName).WithError(err).Error("instance ids")
h.triggerProjection.Reset(h.requeueAfter) h.triggerProjection.Reset(h.requeueAfter)
continue continue
} }
var failed bool
for i := 0; i < len(ids); i = i + h.concurrentInstances { for i := 0; i < len(ids); i = i + h.concurrentInstances {
max := i + h.concurrentInstances max := i + h.concurrentInstances
if max > len(ids) { if max > len(ids) {
@ -201,18 +211,22 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
if err, ok := <-errs; err != nil || !ok { if err, ok := <-errs; err != nil || !ok {
cancelLock() cancelLock()
logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed") logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed")
failed = true
continue continue
} }
go h.cancelOnErr(lockCtx, errs, cancelLock) go h.cancelOnErr(lockCtx, errs, cancelLock)
err = h.Trigger(lockCtx, instances...) err = h.Trigger(lockCtx, instances...)
if err != nil { if err != nil {
logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed") logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed")
failed = true
} }
cancelLock() cancelLock()
unlockErr := h.unlock(instances...) unlockErr := h.unlock(instances...)
logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock") logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock")
} }
// it succeeded at least once if it has succeeded before or if it has succeeded now - not failed ;-)
succeededOnce = succeededOnce || !failed
h.triggerProjection.Reset(h.requeueAfter) h.triggerProjection.Reset(h.requeueAfter)
} }
} }

View File

@ -85,6 +85,8 @@ const (
FieldEventType FieldEventType
//FieldEventData represents the event data field //FieldEventData represents the event data field
FieldEventData FieldEventData
//FieldCreationDate represents the creation date field
FieldCreationDate
fieldCount fieldCount
) )

View File

@ -297,6 +297,8 @@ func (db *CRDB) columnName(col repository.Field) string {
return "event_type" return "event_type"
case repository.FieldEventData: case repository.FieldEventData:
return "event_data" return "event_data"
case repository.FieldCreationDate:
return "creation_date"
default: default:
return "" return ""
} }

View File

@ -2,6 +2,7 @@ package eventstore
import ( import (
"database/sql" "database/sql"
"time"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
@ -30,6 +31,7 @@ type SearchQuery struct {
eventSequenceLess uint64 eventSequenceLess uint64
eventTypes []EventType eventTypes []EventType
eventData map[string]interface{} eventData map[string]interface{}
creationDateAfter time.Time
} }
// Columns defines which fields of the event are needed for the query // Columns defines which fields of the event are needed for the query
@ -175,6 +177,12 @@ func (query *SearchQuery) ExcludedInstanceID(instanceIDs ...string) *SearchQuery
return query return query
} }
// CreationDateNewer filters for events which happened after the specified time
func (query *SearchQuery) CreationDateAfter(time time.Time) *SearchQuery {
query.creationDateAfter = time
return query
}
// EventTypes filters for events with the given event types // EventTypes filters for events with the given event types
func (query *SearchQuery) EventTypes(types ...EventType) *SearchQuery { func (query *SearchQuery) EventTypes(types ...EventType) *SearchQuery {
query.eventTypes = types query.eventTypes = types
@ -234,6 +242,7 @@ func (builder *SearchQueryBuilder) build(instanceID string) (*repository.SearchQ
query.eventSequenceLessFilter, query.eventSequenceLessFilter,
query.instanceIDFilter, query.instanceIDFilter,
query.excludedInstanceIDFilter, query.excludedInstanceIDFilter,
query.creationDateAfterFilter,
query.builder.resourceOwnerFilter, query.builder.resourceOwnerFilter,
query.builder.instanceIDFilter, query.builder.instanceIDFilter,
} { } {
@ -344,6 +353,13 @@ func (builder *SearchQueryBuilder) instanceIDFilter() *repository.Filter {
return repository.NewFilter(repository.FieldInstanceID, builder.instanceID, repository.OperationEquals) return repository.NewFilter(repository.FieldInstanceID, builder.instanceID, repository.OperationEquals)
} }
func (query *SearchQuery) creationDateAfterFilter() *repository.Filter {
if query.creationDateAfter.IsZero() {
return nil
}
return repository.NewFilter(repository.FieldCreationDate, query.creationDateAfter, repository.OperationGreater)
}
func (query *SearchQuery) eventDataFilter() *repository.Filter { func (query *SearchQuery) eventDataFilter() *repository.Filter {
if len(query.eventData) == 0 { if len(query.eventData) == 0 {
return nil return nil

View File

@ -4,6 +4,7 @@ import (
"math" "math"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
@ -86,6 +87,13 @@ func testSetResourceOwner(resourceOwner string) func(*SearchQueryBuilder) *Searc
} }
} }
func testSetCreationDateAfter(date time.Time) func(*SearchQuery) *SearchQuery {
return func(query *SearchQuery) *SearchQuery {
query = query.CreationDateAfter(date)
return query
}
}
func testSetSortOrder(asc bool) func(*SearchQueryBuilder) *SearchQueryBuilder { func testSetSortOrder(asc bool) func(*SearchQueryBuilder) *SearchQueryBuilder {
return func(query *SearchQueryBuilder) *SearchQueryBuilder { return func(query *SearchQueryBuilder) *SearchQueryBuilder {
if asc { if asc {
@ -224,6 +232,7 @@ func TestSearchQuerybuilderSetters(t *testing.T) {
} }
func TestSearchQuerybuilderBuild(t *testing.T) { func TestSearchQuerybuilderBuild(t *testing.T) {
testNow := time.Now()
type args struct { type args struct {
columns Columns columns Columns
setters []func(*SearchQueryBuilder) *SearchQueryBuilder setters []func(*SearchQueryBuilder) *SearchQueryBuilder
@ -648,6 +657,34 @@ func TestSearchQuerybuilderBuild(t *testing.T) {
}, },
}, },
}, },
{
name: "filter aggregate type, instanceID and creation date after",
args: args{
columns: ColumnsEvent,
setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{
testAddQuery(
testSetAggregateTypes("user"),
testSetCreationDateAfter(testNow),
),
},
instanceID: "instanceID",
},
res: res{
isErr: nil,
query: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Desc: false,
Limit: 0,
Filters: [][]*repository.Filter{
{
repository.NewFilter(repository.FieldAggregateType, repository.AggregateType("user"), repository.OperationEquals),
repository.NewFilter(repository.FieldCreationDate, testNow, repository.OperationGreater),
repository.NewFilter(repository.FieldInstanceID, "instanceID", repository.OperationEquals),
},
},
},
},
},
{ {
name: "column invalid", name: "column invalid",
args: args{ args: args{