Tim Möhlmann ff70ede7c7
feat(eventstore): exclude aggregate IDs when event_type occurred (#8940)
# Which Problems Are Solved

For truly event-based notification handler, we need to be able to filter
out events of aggregates which are already handled. For example when an
event like `notify.success` or `notify.failed` was created on an
aggregate, we no longer require events from that aggregate ID.

# How the Problems Are Solved

Extend the query builder to use a `NOT IN` clause which excludes
aggregate IDs when they have certain events for a certain aggregate
type. For optimization and proper index usages, certain filters are
inherited from the parent query, such as:

- Instance ID
- Instance IDs
- Position offset

This is a prettified query as used by the unit tests:

```sql
SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision
FROM eventstore.events2
WHERE instance_id = $1
    AND aggregate_type = $2 
    AND event_type = $3
    AND "position" > $4
    AND aggregate_id NOT IN (
        SELECT aggregate_id
        FROM eventstore.events2
        WHERE aggregate_type = $5
        AND event_type = ANY($6)
        AND instance_id = $7
        AND "position" > $8
    )
ORDER BY "position" DESC, in_tx_order DESC
LIMIT $9
```

I used this query to run it against the `oidc_session` aggregate looking
for added events, excluding aggregates where a token was revoked,
against a recent position. It fully used index scans:

<details>

```json
[
  {
    "Plan": {
      "Node Type": "Index Scan",
      "Parallel Aware": false,
      "Async Capable": false,
      "Scan Direction": "Forward",
      "Index Name": "es_projection",
      "Relation Name": "events2",
      "Alias": "events2",
      "Actual Rows": 2,
      "Actual Loops": 1,
      "Index Cond": "((instance_id = '286399006995644420'::text) AND (aggregate_type = 'oidc_session'::text) AND (event_type = 'oidc_session.added'::text) AND (\"position\" > 1731582100.784168))",
      "Rows Removed by Index Recheck": 0,
      "Filter": "(NOT (hashed SubPlan 1))",
      "Rows Removed by Filter": 1,
      "Plans": [
        {
          "Node Type": "Index Scan",
          "Parent Relationship": "SubPlan",
          "Subplan Name": "SubPlan 1",
          "Parallel Aware": false,
          "Async Capable": false,
          "Scan Direction": "Forward",
          "Index Name": "es_projection",
          "Relation Name": "events2",
          "Alias": "events2_1",
          "Actual Rows": 1,
          "Actual Loops": 1,
          "Index Cond": "((instance_id = '286399006995644420'::text) AND (aggregate_type = 'oidc_session'::text) AND (event_type = 'oidc_session.access_token.revoked'::text) AND (\"position\" > 1731582100.784168))",
          "Rows Removed by Index Recheck": 0
        }
      ]
    },
    "Triggers": [
    ]
  }
]
```

</details>

# Additional Changes

- None

# Additional Context

- Related to https://github.com/zitadel/zitadel/issues/8931

---------

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
2024-11-25 15:25:11 +00:00

325 lines
9.8 KiB
Go

package repository
import (
"database/sql"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
// SearchQuery defines the which and how data are queried
type SearchQuery struct {
Columns eventstore.Columns
SubQueries [][]*Filter
Tx *sql.Tx
LockRows bool
LockOption eventstore.LockOption
AllowTimeTravel bool
AwaitOpenTransactions bool
Limit uint64
Offset uint32
Desc bool
InstanceID *Filter
InstanceIDs *Filter
ExcludedInstances *Filter
Creator *Filter
Owner *Filter
Position *Filter
Sequence *Filter
CreatedAfter *Filter
CreatedBefore *Filter
ExcludeAggregateIDs []*Filter
}
// Filter represents all fields needed to compare a field of an event with a value
type Filter struct {
Field Field
Value interface{}
Operation Operation
}
// Operation defines how fields are compared
type Operation int32
const (
// OperationEquals compares two values for equality
OperationEquals Operation = iota + 1
// OperationGreater compares if the given values is greater than the stored one
OperationGreater
// OperationLess compares if the given values is less than the stored one
OperationLess
//OperationIn checks if a stored value matches one of the passed value list
OperationIn
//OperationJSONContains checks if a stored value matches the given json
OperationJSONContains
//OperationNotIn checks if a stored value does not match one of the passed value list
OperationNotIn
operationCount
)
// Field is the representation of a field from the event
type Field int32
const (
//FieldAggregateType represents the aggregate type field
FieldAggregateType Field = iota + 1
//FieldAggregateID represents the aggregate id field
FieldAggregateID
//FieldSequence represents the sequence field
FieldSequence
//FieldResourceOwner represents the resource owner field
FieldResourceOwner
//FieldInstanceID represents the instance id field
FieldInstanceID
//FieldEditorService represents the editor service field
FieldEditorService
//FieldEditorUser represents the editor user field
FieldEditorUser
//FieldEventType represents the event type field
FieldEventType
//FieldEventData represents the event data field
FieldEventData
//FieldCreationDate represents the creation date field
FieldCreationDate
// FieldPosition represents the field of the global sequence
FieldPosition
fieldCount
)
// NewFilter is used in tests. Use searchQuery.*Filter() instead
func NewFilter(field Field, value interface{}, operation Operation) *Filter {
return &Filter{
Field: field,
Value: value,
Operation: operation,
}
}
// Validate checks if the fields of the filter have valid values
func (f *Filter) Validate() error {
if f == nil {
return zerrors.ThrowPreconditionFailed(nil, "REPO-z6KcG", "filter is nil")
}
if f.Field <= 0 || f.Field >= fieldCount {
return zerrors.ThrowPreconditionFailed(nil, "REPO-zw62U", "field not definded")
}
if f.Value == nil {
return zerrors.ThrowPreconditionFailed(nil, "REPO-GJ9ct", "no value definded")
}
if f.Operation <= 0 || f.Operation >= operationCount {
return zerrors.ThrowPreconditionFailed(nil, "REPO-RrQTy", "operation not definded")
}
return nil
}
func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, error) {
if builder == nil ||
builder.GetColumns().Validate() != nil {
return nil, zerrors.ThrowPreconditionFailed(nil, "MODEL-4m9gs", "builder invalid")
}
query := &SearchQuery{
Columns: builder.GetColumns(),
Limit: builder.GetLimit(),
Offset: builder.GetOffset(),
Desc: builder.GetDesc(),
Tx: builder.GetTx(),
AllowTimeTravel: builder.GetAllowTimeTravel(),
AwaitOpenTransactions: builder.GetAwaitOpenTransactions(),
SubQueries: make([][]*Filter, len(builder.GetQueries())),
}
query.LockRows, query.LockOption = builder.GetLockRows()
for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{
instanceIDFilter,
instanceIDsFilter,
editorUserFilter,
resourceOwnerFilter,
positionAfterFilter,
eventSequenceGreaterFilter,
creationDateAfterFilter,
creationDateBeforeFilter,
} {
filter := f(builder, query)
if filter == nil {
continue
}
if err := filter.Validate(); err != nil {
return nil, err
}
}
for i, q := range builder.GetQueries() {
for _, f := range []func(query *eventstore.SearchQuery) *Filter{
aggregateTypeFilter,
aggregateIDFilter,
eventTypeFilter,
eventDataFilter,
eventPositionAfterFilter,
} {
filter := f(q)
if filter == nil {
continue
}
if err := filter.Validate(); err != nil {
return nil, err
}
query.SubQueries[i] = append(query.SubQueries[i], filter)
}
}
if excludeAggregateIDs := builder.GetExcludeAggregateIDs(); excludeAggregateIDs != nil {
for _, f := range []func(query *eventstore.ExclusionQuery) *Filter{
excludeAggregateTypeFilter,
excludeEventTypeFilter,
} {
filter := f(excludeAggregateIDs)
if filter == nil {
continue
}
if err := filter.Validate(); err != nil {
return nil, err
}
query.ExcludeAggregateIDs = append(query.ExcludeAggregateIDs, filter)
}
}
return query, nil
}
func eventSequenceGreaterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetEventSequenceGreater() == 0 {
return nil
}
sortOrder := OperationGreater
if builder.GetDesc() {
sortOrder = OperationLess
}
query.Sequence = NewFilter(FieldSequence, builder.GetEventSequenceGreater(), sortOrder)
return query.Sequence
}
func creationDateAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetCreationDateAfter().IsZero() {
return nil
}
query.CreatedAfter = NewFilter(FieldCreationDate, builder.GetCreationDateAfter(), OperationGreater)
return query.CreatedAfter
}
func creationDateBeforeFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetCreationDateBefore().IsZero() {
return nil
}
query.CreatedBefore = NewFilter(FieldCreationDate, builder.GetCreationDateBefore(), OperationLess)
return query.CreatedBefore
}
func resourceOwnerFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetResourceOwner() == "" {
return nil
}
query.Owner = NewFilter(FieldResourceOwner, builder.GetResourceOwner(), OperationEquals)
return query.Owner
}
func editorUserFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetEditorUser() == "" {
return nil
}
query.Creator = NewFilter(FieldEditorUser, builder.GetEditorUser(), OperationEquals)
return query.Creator
}
func instanceIDFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetInstanceID() == nil {
return nil
}
query.InstanceID = NewFilter(FieldInstanceID, *builder.GetInstanceID(), OperationEquals)
return query.InstanceID
}
func instanceIDsFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetInstanceIDs() == nil {
return nil
}
query.InstanceIDs = NewFilter(FieldInstanceID, database.TextArray[string](builder.GetInstanceIDs()), OperationIn)
return query.InstanceIDs
}
func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetPositionAfter() == 0 {
return nil
}
query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreater)
return query.Position
}
func aggregateIDFilter(query *eventstore.SearchQuery) *Filter {
if len(query.GetAggregateIDs()) < 1 {
return nil
}
if len(query.GetAggregateIDs()) == 1 {
return NewFilter(FieldAggregateID, query.GetAggregateIDs()[0], OperationEquals)
}
return NewFilter(FieldAggregateID, database.TextArray[string](query.GetAggregateIDs()), OperationIn)
}
func eventTypeFilter(query *eventstore.SearchQuery) *Filter {
if len(query.GetEventTypes()) < 1 {
return nil
}
if len(query.GetEventTypes()) == 1 {
return NewFilter(FieldEventType, query.GetEventTypes()[0], OperationEquals)
}
return NewFilter(FieldEventType, database.TextArray[eventstore.EventType](query.GetEventTypes()), OperationIn)
}
func aggregateTypeFilter(query *eventstore.SearchQuery) *Filter {
if len(query.GetAggregateTypes()) < 1 {
return nil
}
if len(query.GetAggregateTypes()) == 1 {
return NewFilter(FieldAggregateType, query.GetAggregateTypes()[0], OperationEquals)
}
return NewFilter(FieldAggregateType, database.TextArray[eventstore.AggregateType](query.GetAggregateTypes()), OperationIn)
}
func eventDataFilter(query *eventstore.SearchQuery) *Filter {
if len(query.GetEventData()) == 0 {
return nil
}
return NewFilter(FieldEventData, query.GetEventData(), OperationJSONContains)
}
func eventPositionAfterFilter(query *eventstore.SearchQuery) *Filter {
if pos := query.GetPositionAfter(); pos != 0 {
return NewFilter(FieldPosition, pos, OperationGreater)
}
return nil
}
func excludeEventTypeFilter(query *eventstore.ExclusionQuery) *Filter {
if len(query.GetEventTypes()) < 1 {
return nil
}
if len(query.GetEventTypes()) == 1 {
return NewFilter(FieldEventType, query.GetEventTypes()[0], OperationEquals)
}
return NewFilter(FieldEventType, database.TextArray[eventstore.EventType](query.GetEventTypes()), OperationIn)
}
func excludeAggregateTypeFilter(query *eventstore.ExclusionQuery) *Filter {
if len(query.GetAggregateTypes()) < 1 {
return nil
}
if len(query.GetAggregateTypes()) == 1 {
return NewFilter(FieldAggregateType, query.GetAggregateTypes()[0], OperationEquals)
}
return NewFilter(FieldAggregateType, database.TextArray[eventstore.AggregateType](query.GetAggregateTypes()), OperationIn)
}