fix: add aggregate type to subquery to utilize indexes (#9226)

# Which Problems Are Solved

The subquery of the notification requested and retry requested is
missing the aggregate_type filter that would allow it to utilize the
`es_projection` or `active_instances_events` on the eventstore.events2
table.

# How the Problems Are Solved

Add additional filter on subquery. Final query: 
```sql
SELECT <all the fields omitted> FROM eventstore.events2
WHERE
    instance_id = $1
    AND aggregate_type = $2
    AND event_type = $3
    AND created_at > $4
    AND aggregate_id NOT IN (
        SELECT aggregate_id
        FROM eventstore.events2
        WHERE
            aggregate_type = $5 <-- NB: previously missing
            AND event_type = ANY ($6)
            AND instance_id = $7
            AND created_at > $8
    )
ORDER BY "position", in_tx_order
LIMIT $9
FOR UPDATE SKIP LOCKED
```

# Additional Changes

# Additional Context

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Zach Hirschtritt 2025-01-22 11:02:37 -05:00 committed by GitHub
parent c9aa5db2a5
commit e4bbfcccc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -436,6 +436,7 @@ func (w *NotificationWorker) searchEvents(ctx context.Context, tx *sql.Tx, retry
Builder().
ExcludeAggregateIDs().
EventTypes(notification.RetryRequestedType, notification.CanceledType, notification.SentType).
AggregateTypes(notification.AggregateType).
Builder()
//nolint:staticcheck
return w.es.Filter(ctx, searchQuery)
@ -454,6 +455,7 @@ func (w *NotificationWorker) searchRetryEvents(ctx context.Context, tx *sql.Tx)
Builder().
ExcludeAggregateIDs().
EventTypes(notification.CanceledType, notification.SentType).
AggregateTypes(notification.AggregateType).
Builder()
//nolint:staticcheck
events, err := w.es.Filter(ctx, searchQuery)