mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 00:57:33 +00:00
feat(eventstore): exclude aggregate IDs when event_type occurred (#8940)
# Which Problems Are Solved For truly event-based notification handler, we need to be able to filter out events of aggregates which are already handled. For example when an event like `notify.success` or `notify.failed` was created on an aggregate, we no longer require events from that aggregate ID. # How the Problems Are Solved Extend the query builder to use a `NOT IN` clause which excludes aggregate IDs when they have certain events for a certain aggregate type. For optimization and proper index usages, certain filters are inherited from the parent query, such as: - Instance ID - Instance IDs - Position offset This is a prettified query as used by the unit tests: ```sql SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN ( SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8 ) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9 ``` I used this query to run it against the `oidc_session` aggregate looking for added events, excluding aggregates where a token was revoked, against a recent position. It fully used index scans: <details> ```json [ { "Plan": { "Node Type": "Index Scan", "Parallel Aware": false, "Async Capable": false, "Scan Direction": "Forward", "Index Name": "es_projection", "Relation Name": "events2", "Alias": "events2", "Actual Rows": 2, "Actual Loops": 1, "Index Cond": "((instance_id = '286399006995644420'::text) AND (aggregate_type = 'oidc_session'::text) AND (event_type = 'oidc_session.added'::text) AND (\"position\" > 1731582100.784168))", "Rows Removed by Index Recheck": 0, "Filter": "(NOT (hashed SubPlan 1))", "Rows Removed by Filter": 1, "Plans": [ { "Node Type": "Index Scan", "Parent Relationship": "SubPlan", "Subplan Name": "SubPlan 1", "Parallel Aware": false, "Async Capable": false, "Scan Direction": "Forward", "Index Name": "es_projection", "Relation Name": "events2", "Alias": "events2_1", "Actual Rows": 1, "Actual Loops": 1, "Index Cond": "((instance_id = '286399006995644420'::text) AND (aggregate_type = 'oidc_session'::text) AND (event_type = 'oidc_session.access_token.revoked'::text) AND (\"position\" > 1731582100.784168))", "Rows Removed by Index Recheck": 0 } ] }, "Triggers": [ ] } ] ``` </details> # Additional Changes - None # Additional Context - Related to https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
This commit is contained in:
@@ -22,15 +22,16 @@ type SearchQuery struct {
|
||||
Offset uint32
|
||||
Desc bool
|
||||
|
||||
InstanceID *Filter
|
||||
InstanceIDs *Filter
|
||||
ExcludedInstances *Filter
|
||||
Creator *Filter
|
||||
Owner *Filter
|
||||
Position *Filter
|
||||
Sequence *Filter
|
||||
CreatedAfter *Filter
|
||||
CreatedBefore *Filter
|
||||
InstanceID *Filter
|
||||
InstanceIDs *Filter
|
||||
ExcludedInstances *Filter
|
||||
Creator *Filter
|
||||
Owner *Filter
|
||||
Position *Filter
|
||||
Sequence *Filter
|
||||
CreatedAfter *Filter
|
||||
CreatedBefore *Filter
|
||||
ExcludeAggregateIDs []*Filter
|
||||
}
|
||||
|
||||
// Filter represents all fields needed to compare a field of an event with a value
|
||||
@@ -171,6 +172,21 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err
|
||||
query.SubQueries[i] = append(query.SubQueries[i], filter)
|
||||
}
|
||||
}
|
||||
if excludeAggregateIDs := builder.GetExcludeAggregateIDs(); excludeAggregateIDs != nil {
|
||||
for _, f := range []func(query *eventstore.ExclusionQuery) *Filter{
|
||||
excludeAggregateTypeFilter,
|
||||
excludeEventTypeFilter,
|
||||
} {
|
||||
filter := f(excludeAggregateIDs)
|
||||
if filter == nil {
|
||||
continue
|
||||
}
|
||||
if err := filter.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
query.ExcludeAggregateIDs = append(query.ExcludeAggregateIDs, filter)
|
||||
}
|
||||
}
|
||||
|
||||
return query, nil
|
||||
}
|
||||
@@ -286,3 +302,23 @@ func eventPositionAfterFilter(query *eventstore.SearchQuery) *Filter {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func excludeEventTypeFilter(query *eventstore.ExclusionQuery) *Filter {
|
||||
if len(query.GetEventTypes()) < 1 {
|
||||
return nil
|
||||
}
|
||||
if len(query.GetEventTypes()) == 1 {
|
||||
return NewFilter(FieldEventType, query.GetEventTypes()[0], OperationEquals)
|
||||
}
|
||||
return NewFilter(FieldEventType, database.TextArray[eventstore.EventType](query.GetEventTypes()), OperationIn)
|
||||
}
|
||||
|
||||
func excludeAggregateTypeFilter(query *eventstore.ExclusionQuery) *Filter {
|
||||
if len(query.GetAggregateTypes()) < 1 {
|
||||
return nil
|
||||
}
|
||||
if len(query.GetAggregateTypes()) == 1 {
|
||||
return NewFilter(FieldAggregateType, query.GetAggregateTypes()[0], OperationEquals)
|
||||
}
|
||||
return NewFilter(FieldAggregateType, database.TextArray[eventstore.AggregateType](query.GetAggregateTypes()), OperationIn)
|
||||
}
|
||||
|
Reference in New Issue
Block a user