From 60ce32ca4fbd818a64524294653923e0ffa81688 Mon Sep 17 00:00:00 2001 From: Silvan <27845747+adlerhurst@users.noreply.github.com> Date: Thu, 8 May 2025 17:13:57 +0200 Subject: [PATCH] fix(setup): reenable index creation (#9868) # Which Problems Are Solved We saw high CPU usage if many events were created on the database. This was caused by the new actions which query for all event types and aggregate types. # How the Problems Are Solved - the handler of action execution does not filter for aggregate and event types. - the index for `instance_id` and `position` is reenabled. # Additional Changes none # Additional Context none --- cmd/setup/54.go | 2 +- cmd/setup/54.sql | 2 +- internal/eventstore/handler/v2/handler.go | 14 ++++++++++++++ internal/execution/handlers.go | 3 +++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/cmd/setup/54.go b/cmd/setup/54.go index 9d65264941..e4a2e43862 100644 --- a/cmd/setup/54.go +++ b/cmd/setup/54.go @@ -23,5 +23,5 @@ func (mig *InstancePositionIndex) Execute(ctx context.Context, _ eventstore.Even } func (mig *InstancePositionIndex) String() string { - return "54_instance_position_index_remove" + return "54_instance_position_index_again" } diff --git a/cmd/setup/54.sql b/cmd/setup/54.sql index 927bd2aa9b..1dca8c7575 100644 --- a/cmd/setup/54.sql +++ b/cmd/setup/54.sql @@ -1 +1 @@ -DROP INDEX IF EXISTS eventstore.es_instance_position; +CREATE INDEX CONCURRENTLY IF NOT EXISTS es_instance_position ON eventstore.events2 (instance_id, position); diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 43c3e58b3b..fb696ad090 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -60,6 +60,7 @@ type Handler struct { requeueEvery time.Duration txDuration time.Duration now nowFunc + queryGlobal bool triggeredInstancesSync sync.Map @@ -143,6 +144,11 @@ type Projection interface { Reducers() []AggregateReducer } +type GlobalProjection interface { + Projection + FilterGlobalEvents() +} + func NewHandler( ctx context.Context, config *Config, @@ -185,6 +191,10 @@ func NewHandler( metrics: metrics, } + if _, ok := projection.(GlobalProjection); ok { + handler.queryGlobal = true + } + return handler } @@ -676,6 +686,10 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder } } + if h.queryGlobal { + return builder + } + aggregateTypes := make([]eventstore.AggregateType, 0, len(h.eventTypes)) eventTypes := make([]eventstore.EventType, 0, len(h.eventTypes)) diff --git a/internal/execution/handlers.go b/internal/execution/handlers.go index 7ffb4cc6ff..030e6d5186 100644 --- a/internal/execution/handlers.go +++ b/internal/execution/handlers.go @@ -84,6 +84,9 @@ func (u *eventHandler) Reducers() []handler.AggregateReducer { return aggReducers } +// FilterGlobalEvents implements [handler.GlobalProjection] +func (u *eventHandler) FilterGlobalEvents() {} + func groupsFromEventType(s string) []string { parts := strings.Split(s, ".") groups := make([]string, len(parts))