mirror of
https://github.com/zitadel/zitadel.git
synced 2025-05-22 12:18:19 +00:00
fix(eventstore): increase performance on push (#7125)
This commit is contained in:
parent
6d3ce8d5ab
commit
cc2dd8b20b
26
cmd/setup/19.go
Normal file
26
cmd/setup/19.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package setup
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
_ "embed"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
//go:embed 19.sql
|
||||||
|
addCurrentSequencesIndex string
|
||||||
|
)
|
||||||
|
|
||||||
|
type AddCurrentSequencesIndex struct {
|
||||||
|
dbClient *database.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *AddCurrentSequencesIndex) Execute(ctx context.Context) error {
|
||||||
|
_, err := mig.dbClient.ExecContext(ctx, addCurrentSequencesIndex)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *AddCurrentSequencesIndex) String() string {
|
||||||
|
return "19_add_current_sequences_index"
|
||||||
|
}
|
1
cmd/setup/19.sql
Normal file
1
cmd/setup/19.sql
Normal file
@ -0,0 +1 @@
|
|||||||
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS events2_current_sequence ON eventstore.events2 ("sequence" DESC, aggregate_id, aggregate_type, instance_id);
|
@ -76,6 +76,7 @@ type Steps struct {
|
|||||||
s16UniqueConstraintsLower *UniqueConstraintToLower
|
s16UniqueConstraintsLower *UniqueConstraintToLower
|
||||||
s17AddOffsetToUniqueConstraints *AddOffsetToCurrentStates
|
s17AddOffsetToUniqueConstraints *AddOffsetToCurrentStates
|
||||||
s18AddLowerFieldsToLoginNames *AddLowerFieldsToLoginNames
|
s18AddLowerFieldsToLoginNames *AddLowerFieldsToLoginNames
|
||||||
|
s19AddCurrentStatesIndex *AddCurrentSequencesIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
type encryptionKeyConfig struct {
|
type encryptionKeyConfig struct {
|
||||||
|
@ -109,6 +109,7 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
|||||||
steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: queryDBClient}
|
steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: queryDBClient}
|
||||||
steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: queryDBClient}
|
steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: queryDBClient}
|
||||||
steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: queryDBClient}
|
steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: queryDBClient}
|
||||||
|
steps.s19AddCurrentStatesIndex = &AddCurrentSequencesIndex{dbClient: queryDBClient}
|
||||||
|
|
||||||
err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
|
err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
|
||||||
logging.OnError(err).Fatal("unable to start projections")
|
logging.OnError(err).Fatal("unable to start projections")
|
||||||
@ -153,6 +154,8 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
|||||||
logging.WithFields("name", steps.s16UniqueConstraintsLower.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", steps.s16UniqueConstraintsLower.String()).OnError(err).Fatal("migration failed")
|
||||||
err = migration.Migrate(ctx, eventstoreClient, steps.s17AddOffsetToUniqueConstraints)
|
err = migration.Migrate(ctx, eventstoreClient, steps.s17AddOffsetToUniqueConstraints)
|
||||||
logging.WithFields("name", steps.s17AddOffsetToUniqueConstraints.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", steps.s17AddOffsetToUniqueConstraints.String()).OnError(err).Fatal("migration failed")
|
||||||
|
err = migration.Migrate(ctx, eventstoreClient, steps.s19AddCurrentStatesIndex)
|
||||||
|
logging.WithFields("name", steps.s19AddCurrentStatesIndex.String()).OnError(err).Fatal("migration failed")
|
||||||
|
|
||||||
for _, repeatableStep := range repeatableSteps {
|
for _, repeatableStep := range repeatableSteps {
|
||||||
err = migration.Migrate(ctx, eventstoreClient, repeatableStep)
|
err = migration.Migrate(ctx, eventstoreClient, repeatableStep)
|
||||||
|
@ -26,7 +26,7 @@ func latestSequences(ctx context.Context, tx *sql.Tx, commands []eventstore.Comm
|
|||||||
sequences := commandsToSequences(ctx, commands)
|
sequences := commandsToSequences(ctx, commands)
|
||||||
|
|
||||||
conditions, args := sequencesToSql(sequences)
|
conditions, args := sequencesToSql(sequences)
|
||||||
rows, err := tx.QueryContext(ctx, fmt.Sprintf(latestSequencesStmt, strings.Join(conditions, " OR ")), args...)
|
rows, err := tx.QueryContext(ctx, fmt.Sprintf(latestSequencesStmt, strings.Join(conditions, " UNION ALL ")), args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, zerrors.ThrowInternal(err, "V3-5jU5z", "Errors.Internal")
|
return nil, zerrors.ThrowInternal(err, "V3-5jU5z", "Errors.Internal")
|
||||||
}
|
}
|
||||||
@ -92,7 +92,7 @@ func sequencesToSql(sequences []*latestSequence) (conditions []string, args []an
|
|||||||
conditions = make([]string, len(sequences))
|
conditions = make([]string, len(sequences))
|
||||||
|
|
||||||
for i, sequence := range sequences {
|
for i, sequence := range sequences {
|
||||||
conditions[i] = fmt.Sprintf("(instance_id = $%d AND aggregate_type = $%d AND aggregate_id = $%d)",
|
conditions[i] = fmt.Sprintf(`(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $%d AND aggregate_type = $%d AND aggregate_id = $%d ORDER BY "sequence" DESC LIMIT 1)`,
|
||||||
i*argsPerCondition+1,
|
i*argsPerCondition+1,
|
||||||
i*argsPerCondition+2,
|
i*argsPerCondition+2,
|
||||||
i*argsPerCondition+3,
|
i*argsPerCondition+3,
|
||||||
|
@ -247,7 +247,7 @@ func Test_sequencesToSql(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantConditions: []string{
|
wantConditions: []string{
|
||||||
"(instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3)",
|
`(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3 ORDER BY "sequence" DESC LIMIT 1)`,
|
||||||
},
|
},
|
||||||
wantArgs: []any{
|
wantArgs: []any{
|
||||||
"instance",
|
"instance",
|
||||||
@ -266,8 +266,8 @@ func Test_sequencesToSql(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantConditions: []string{
|
wantConditions: []string{
|
||||||
"(instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3)",
|
`(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3 ORDER BY "sequence" DESC LIMIT 1)`,
|
||||||
"(instance_id = $4 AND aggregate_type = $5 AND aggregate_id = $6)",
|
`(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $4 AND aggregate_type = $5 AND aggregate_id = $6 ORDER BY "sequence" DESC LIMIT 1)`,
|
||||||
},
|
},
|
||||||
wantArgs: []any{
|
wantArgs: []any{
|
||||||
"instance",
|
"instance",
|
||||||
|
@ -1,17 +1,5 @@
|
|||||||
with existing as (
|
WITH existing AS (
|
||||||
SELECT
|
%s
|
||||||
instance_id
|
|
||||||
, aggregate_type
|
|
||||||
, aggregate_id
|
|
||||||
, MAX("sequence") "sequence"
|
|
||||||
FROM
|
|
||||||
eventstore.events2 existing
|
|
||||||
WHERE
|
|
||||||
%s
|
|
||||||
GROUP BY
|
|
||||||
instance_id
|
|
||||||
, aggregate_type
|
|
||||||
, aggregate_id
|
|
||||||
) SELECT
|
) SELECT
|
||||||
e.instance_id
|
e.instance_id
|
||||||
, e.owner
|
, e.owner
|
||||||
@ -23,8 +11,8 @@ FROM
|
|||||||
JOIN
|
JOIN
|
||||||
existing
|
existing
|
||||||
ON
|
ON
|
||||||
e.instance_id = existing.instance_id
|
e.instance_id = existing.instance_id
|
||||||
AND e.aggregate_type = existing.aggregate_type
|
AND e.aggregate_type = existing.aggregate_type
|
||||||
AND e.aggregate_id = existing.aggregate_id
|
AND e.aggregate_id = existing.aggregate_id
|
||||||
AND e.sequence = existing.sequence
|
AND e.sequence = existing.sequence
|
||||||
FOR UPDATE;
|
FOR UPDATE;
|
Loading…
x
Reference in New Issue
Block a user