zitadel/internal/eventstore/search_query.go
Tim Möhlmann a84b259e8c
perf(oidc): nest position clause for session terminated query (#8738)
# Which Problems Are Solved

Optimize the query that checks for terminated sessions in the access
token verifier. The verifier is used in auth middleware, userinfo and
introspection.


# How the Problems Are Solved

The previous implementation built a query for certain events and then
appended a single `PositionAfter` clause. This caused the postgreSQL
planner to use indexes only for the instance ID, aggregate IDs,
aggregate types and event types. Followed by an expensive sequential
scan for the position. This resulting in internal over-fetching of rows
before the final filter was applied.


![Screenshot_20241007_105803](https://github.com/user-attachments/assets/f2d91976-be87-428b-b604-a211399b821c)

Furthermore, the query was searching for events which are not always
applicable. For example, there was always a session ID search and if
there was a user ID, we would also search for a browser fingerprint in
event payload (expensive). Even if those argument string would be empty.

This PR changes:

1. Nest the position query, so that a full `instance_id, aggregate_id,
aggregate_type, event_type, "position"` index can be matched.
2. Redefine the `es_wm` index to include the `position` column.
3. Only search for events for the IDs that actually have a value. Do not
search (noop) if none of session ID, user ID or fingerpint ID are set.

New query plan:


![Screenshot_20241007_110648](https://github.com/user-attachments/assets/c3234c33-1b76-4b33-a4a9-796f69f3d775)


# Additional Changes

- cleanup how we load multi-statement migrations and make that a bit
more reusable.

# Additional Context

- Related to https://github.com/zitadel/zitadel/issues/7639
2024-10-07 12:49:55 +00:00

374 lines
10 KiB
Go

package eventstore
import (
"context"
"database/sql"
"time"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/zerrors"
)
// SearchQueryBuilder represents the builder for your filter
// if invalid data are set the filter will fail
type SearchQueryBuilder struct {
columns Columns
limit uint64
offset uint32
desc bool
resourceOwner string
instanceID *string
instanceIDs []string
editorUser string
queries []*SearchQuery
tx *sql.Tx
allowTimeTravel bool
positionAfter float64
awaitOpenTransactions bool
creationDateAfter time.Time
creationDateBefore time.Time
eventSequenceGreater uint64
}
func (b *SearchQueryBuilder) GetColumns() Columns {
return b.columns
}
func (b *SearchQueryBuilder) GetLimit() uint64 {
return b.limit
}
func (b *SearchQueryBuilder) GetOffset() uint32 {
return b.offset
}
func (b *SearchQueryBuilder) GetDesc() bool {
return b.desc
}
func (b *SearchQueryBuilder) GetResourceOwner() string {
return b.resourceOwner
}
func (b *SearchQueryBuilder) GetInstanceID() *string {
return b.instanceID
}
func (b *SearchQueryBuilder) GetInstanceIDs() []string {
return b.instanceIDs
}
func (b *SearchQueryBuilder) GetEditorUser() string {
return b.editorUser
}
func (b *SearchQueryBuilder) GetQueries() []*SearchQuery {
return b.queries
}
func (b *SearchQueryBuilder) GetTx() *sql.Tx {
return b.tx
}
func (b *SearchQueryBuilder) GetAllowTimeTravel() bool {
return b.allowTimeTravel
}
func (b SearchQueryBuilder) GetPositionAfter() float64 {
return b.positionAfter
}
func (b SearchQueryBuilder) GetAwaitOpenTransactions() bool {
return b.awaitOpenTransactions
}
func (q SearchQueryBuilder) GetEventSequenceGreater() uint64 {
return q.eventSequenceGreater
}
func (q SearchQueryBuilder) GetCreationDateAfter() time.Time {
return q.creationDateAfter
}
func (q SearchQueryBuilder) GetCreationDateBefore() time.Time {
return q.creationDateBefore
}
// ensureInstanceID makes sure that the instance id is always set
func (b *SearchQueryBuilder) ensureInstanceID(ctx context.Context) {
if b.instanceID == nil && len(b.instanceIDs) == 0 && authz.GetInstance(ctx).InstanceID() != "" {
b.InstanceID(authz.GetInstance(ctx).InstanceID())
}
}
type SearchQuery struct {
builder *SearchQueryBuilder
aggregateTypes []AggregateType
aggregateIDs []string
eventTypes []EventType
eventData map[string]interface{}
positionAfter float64
}
func (q SearchQuery) GetAggregateTypes() []AggregateType {
return q.aggregateTypes
}
func (q SearchQuery) GetAggregateIDs() []string {
return q.aggregateIDs
}
func (q SearchQuery) GetEventTypes() []EventType {
return q.eventTypes
}
func (q SearchQuery) GetEventData() map[string]interface{} {
return q.eventData
}
func (q SearchQuery) GetPositionAfter() float64 {
return q.positionAfter
}
// Columns defines which fields of the event are needed for the query
type Columns int8
const (
//ColumnsEvent represents all fields of an event
ColumnsEvent = iota + 1
// ColumnsMaxSequence represents the latest sequence of the filtered events
ColumnsMaxSequence
// ColumnsInstanceIDs represents the instance ids of the filtered events
ColumnsInstanceIDs
columnsCount
)
func (c Columns) Validate() error {
if c <= 0 || c >= columnsCount {
return zerrors.ThrowPreconditionFailed(nil, "REPOS-x8R35", "column out of range")
}
return nil
}
// NewSearchQueryBuilder creates a new builder for event filters
// aggregateTypes must contain at least one aggregate type
func NewSearchQueryBuilder(columns Columns) *SearchQueryBuilder {
return &SearchQueryBuilder{
columns: columns,
}
}
func (builder *SearchQueryBuilder) Matches(commands ...Command) []Command {
matches := make([]Command, 0, len(commands))
for i, command := range commands {
if builder.limit > 0 && builder.limit <= uint64(len(matches)) {
break
}
if builder.offset > 0 && uint32(i) < builder.offset {
continue
}
if builder.matchCommand(command) {
matches = append(matches, command)
}
}
return matches
}
type sequencer interface {
Sequence() uint64
}
func (builder *SearchQueryBuilder) matchCommand(command Command) bool {
if builder.resourceOwner != "" && command.Aggregate().ResourceOwner != builder.resourceOwner {
return false
}
if command.Aggregate().InstanceID != "" && builder.instanceID != nil && *builder.instanceID != "" && command.Aggregate().InstanceID != *builder.instanceID {
return false
}
if seq, ok := command.(sequencer); ok {
if builder.eventSequenceGreater > 0 && seq.Sequence() <= builder.eventSequenceGreater {
return false
}
}
if len(builder.queries) == 0 {
return true
}
for _, query := range builder.queries {
if query.matches(command) {
return true
}
}
return false
}
// Columns defines which fields are set
func (builder *SearchQueryBuilder) Columns(columns Columns) *SearchQueryBuilder {
builder.columns = columns
return builder
}
// Limit defines how many events are returned maximally.
func (builder *SearchQueryBuilder) Limit(limit uint64) *SearchQueryBuilder {
builder.limit = limit
return builder
}
// Limit defines how many events are returned maximally.
func (builder *SearchQueryBuilder) Offset(offset uint32) *SearchQueryBuilder {
builder.offset = offset
return builder
}
// ResourceOwner defines the resource owner (org or instance) of the events
func (builder *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQueryBuilder {
builder.resourceOwner = resourceOwner
return builder
}
// InstanceID defines the instanceID (system) of the events
func (builder *SearchQueryBuilder) InstanceID(instanceID string) *SearchQueryBuilder {
builder.instanceID = &instanceID
return builder
}
// InstanceIDs defines the instanceIDs (system) of the events
func (builder *SearchQueryBuilder) InstanceIDs(instanceIDs []string) *SearchQueryBuilder {
builder.instanceIDs = instanceIDs
return builder
}
// OrderDesc changes the sorting order of the returned events to descending
func (builder *SearchQueryBuilder) OrderDesc() *SearchQueryBuilder {
builder.desc = true
return builder
}
// OrderAsc changes the sorting order of the returned events to ascending
func (builder *SearchQueryBuilder) OrderAsc() *SearchQueryBuilder {
builder.desc = false
return builder
}
// SetTx ensures that the eventstore library uses the existing transaction
func (builder *SearchQueryBuilder) SetTx(tx *sql.Tx) *SearchQueryBuilder {
builder.tx = tx
return builder
}
func (builder *SearchQueryBuilder) EditorUser(id string) *SearchQueryBuilder {
builder.editorUser = id
return builder
}
// AllowTimeTravel activates the time travel feature of the database if supported
// The queries will be made based on the call time
func (builder *SearchQueryBuilder) AllowTimeTravel() *SearchQueryBuilder {
builder.allowTimeTravel = true
return builder
}
// PositionAfter filters for events which happened after the specified time
func (builder *SearchQueryBuilder) PositionAfter(position float64) *SearchQueryBuilder {
builder.positionAfter = position
return builder
}
// AwaitOpenTransactions filters for events which are older than the oldest transaction of the database
func (builder *SearchQueryBuilder) AwaitOpenTransactions() *SearchQueryBuilder {
builder.awaitOpenTransactions = true
return builder
}
// SequenceGreater filters for events with sequence greater the requested sequence
func (builder *SearchQueryBuilder) SequenceGreater(sequence uint64) *SearchQueryBuilder {
builder.eventSequenceGreater = sequence
return builder
}
// CreationDateAfter filters for events which happened after the specified time
func (builder *SearchQueryBuilder) CreationDateAfter(creationDate time.Time) *SearchQueryBuilder {
if creationDate.IsZero() || creationDate.Unix() == 0 {
return builder
}
builder.creationDateAfter = creationDate
return builder
}
// CreationDateBefore filters for events which happened before the specified time
func (builder *SearchQueryBuilder) CreationDateBefore(creationDate time.Time) *SearchQueryBuilder {
if creationDate.IsZero() || creationDate.Unix() == 0 {
return builder
}
builder.creationDateBefore = creationDate
return builder
}
// AddQuery creates a new sub query.
// All fields in the sub query are AND-connected in the storage request.
// Multiple sub queries are OR-connected in the storage request.
func (builder *SearchQueryBuilder) AddQuery() *SearchQuery {
query := &SearchQuery{
builder: builder,
}
builder.queries = append(builder.queries, query)
return query
}
// Or creates a new sub query on the search query builder
func (query SearchQuery) Or() *SearchQuery {
return query.builder.AddQuery()
}
// AggregateTypes filters for events with the given aggregate types
func (query *SearchQuery) AggregateTypes(types ...AggregateType) *SearchQuery {
query.aggregateTypes = types
return query
}
// AggregateIDs filters for events with the given aggregate id's
func (query *SearchQuery) AggregateIDs(ids ...string) *SearchQuery {
query.aggregateIDs = ids
return query
}
// EventTypes filters for events with the given event types
func (query *SearchQuery) EventTypes(types ...EventType) *SearchQuery {
query.eventTypes = types
return query
}
// EventData filters for events with the given event data.
// Use this call with care as it will be slower than the other filters.
func (query *SearchQuery) EventData(data map[string]interface{}) *SearchQuery {
query.eventData = data
return query
}
func (query *SearchQuery) PositionAfter(position float64) *SearchQuery {
query.positionAfter = position
return query
}
// Builder returns the SearchQueryBuilder of the sub query
func (query *SearchQuery) Builder() *SearchQueryBuilder {
return query.builder
}
func (query *SearchQuery) matches(command Command) bool {
if ok := isAggregateTypes(command.Aggregate(), query.aggregateTypes...); len(query.aggregateTypes) > 0 && !ok {
return false
}
if ok := isAggregateIDs(command.Aggregate(), query.aggregateIDs...); len(query.aggregateIDs) > 0 && !ok {
return false
}
if ok := isEventTypes(command, query.eventTypes...); len(query.eventTypes) > 0 && !ok {
return false
}
return true
}