fix(eventstore): order by sequence if filter by aggregate id (#8019)

# Which Problems Are Solved

Queriying events by an aggregate id can produce high loads on the
database if the aggregate id contains many events (count > 1000000).

# How the Problems Are Solved

Instead of using the postion and in_tx_order columns we use the sequence
column which guarantees correct ordering in a single aggregate and uses
more optimised indexes.

# Additional Context

Closes https://github.com/zitadel/DevOps/issues/50

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Silvan 2024-05-28 08:49:30 +02:00 committed by adlerhurst
parent 3e84020065
commit 43404d960c
2 changed files with 22 additions and 5 deletions

View File

@ -282,17 +282,23 @@ func (db *CRDB) db() *database.DB {
return db.DB return db.DB
} }
func (db *CRDB) orderByEventSequence(desc, useV1 bool) string { func (db *CRDB) orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string {
if useV1 { if useV1 {
if desc { if desc {
return ` ORDER BY event_sequence DESC` return ` ORDER BY event_sequence DESC`
} }
return ` ORDER BY event_sequence` return ` ORDER BY event_sequence`
} }
if shouldOrderBySequence {
if desc {
return ` ORDER BY "sequence" DESC`
}
return ` ORDER BY "sequence"`
}
if desc { if desc {
return ` ORDER BY "position" DESC, in_tx_order DESC` return ` ORDER BY "position" DESC, in_tx_order DESC`
} }
return ` ORDER BY "position", in_tx_order` return ` ORDER BY "position", in_tx_order`
} }

View File

@ -28,7 +28,7 @@ type querier interface {
maxSequenceQuery(useV1 bool) string maxSequenceQuery(useV1 bool) string
instanceIDsQuery(useV1 bool) string instanceIDsQuery(useV1 bool) string
db() *database.DB db() *database.DB
orderByEventSequence(desc, useV1 bool) string orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string
dialect.Database dialect.Database
} }
@ -59,6 +59,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
if err != nil { if err != nil {
return err return err
} }
query, rowScanner := prepareColumns(criteria, q.Columns, useV1) query, rowScanner := prepareColumns(criteria, q.Columns, useV1)
where, values := prepareConditions(criteria, q, useV1) where, values := prepareConditions(criteria, q, useV1)
if where == "" || query == "" { if where == "" || query == "" {
@ -78,10 +79,20 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
q.Desc = true q.Desc = true
} }
// if there is only one subquery we can optimize the query ordering by ordering by sequence
var shouldOrderBySequence bool
if len(q.SubQueries) == 1 {
for _, filter := range q.SubQueries[0] {
if filter.Field == repository.FieldAggregateID {
shouldOrderBySequence = filter.Operation == repository.OperationEquals
}
}
}
switch q.Columns { switch q.Columns {
case eventstore.ColumnsEvent, case eventstore.ColumnsEvent,
eventstore.ColumnsMaxSequence: eventstore.ColumnsMaxSequence:
query += criteria.orderByEventSequence(q.Desc, useV1) query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1)
} }
if q.Limit > 0 { if q.Limit > 0 {
@ -220,7 +231,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
} }
} }
func prepareConditions(criteria querier, query *repository.SearchQuery, useV1 bool) (string, []any) { func prepareConditions(criteria querier, query *repository.SearchQuery, useV1 bool) (_ string, args []any) {
clauses, args := prepareQuery(criteria, useV1, query.InstanceID, query.InstanceIDs, query.ExcludedInstances) clauses, args := prepareQuery(criteria, useV1, query.InstanceID, query.InstanceIDs, query.ExcludedInstances)
if clauses != "" && len(query.SubQueries) > 0 { if clauses != "" && len(query.SubQueries) > 0 {
clauses += " AND " clauses += " AND "