diff --git a/cmd/setup/54.go b/cmd/setup/54.go index 9d65264941..e4a2e43862 100644 --- a/cmd/setup/54.go +++ b/cmd/setup/54.go @@ -23,5 +23,5 @@ func (mig *InstancePositionIndex) Execute(ctx context.Context, _ eventstore.Even } func (mig *InstancePositionIndex) String() string { - return "54_instance_position_index_remove" + return "54_instance_position_index_again" } diff --git a/cmd/setup/54.sql b/cmd/setup/54.sql index 927bd2aa9b..1dca8c7575 100644 --- a/cmd/setup/54.sql +++ b/cmd/setup/54.sql @@ -1 +1 @@ -DROP INDEX IF EXISTS eventstore.es_instance_position; +CREATE INDEX CONCURRENTLY IF NOT EXISTS es_instance_position ON eventstore.events2 (instance_id, position); diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 43c3e58b3b..fb696ad090 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -60,6 +60,7 @@ type Handler struct { requeueEvery time.Duration txDuration time.Duration now nowFunc + queryGlobal bool triggeredInstancesSync sync.Map @@ -143,6 +144,11 @@ type Projection interface { Reducers() []AggregateReducer } +type GlobalProjection interface { + Projection + FilterGlobalEvents() +} + func NewHandler( ctx context.Context, config *Config, @@ -185,6 +191,10 @@ func NewHandler( metrics: metrics, } + if _, ok := projection.(GlobalProjection); ok { + handler.queryGlobal = true + } + return handler } @@ -676,6 +686,10 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder } } + if h.queryGlobal { + return builder + } + aggregateTypes := make([]eventstore.AggregateType, 0, len(h.eventTypes)) eventTypes := make([]eventstore.EventType, 0, len(h.eventTypes)) diff --git a/internal/execution/handlers.go b/internal/execution/handlers.go index 7ffb4cc6ff..030e6d5186 100644 --- a/internal/execution/handlers.go +++ b/internal/execution/handlers.go @@ -84,6 +84,9 @@ func (u *eventHandler) Reducers() []handler.AggregateReducer { return aggReducers } +// FilterGlobalEvents implements [handler.GlobalProjection] +func (u *eventHandler) FilterGlobalEvents() {} + func groupsFromEventType(s string) []string { parts := strings.Split(s, ".") groups := make([]string, len(parts))