From f72560291e9f0910d3e4d98dd6645fbec9377793 Mon Sep 17 00:00:00 2001 From: Silvan Date: Wed, 6 Sep 2023 16:34:07 +0200 Subject: [PATCH] fix(eventstore): cache instances (#6501) * fix(eventstore): cache instances * fix: consider succeeded once during instance ids query * fix(eventstore): return correct instances (cherry picked from commit 0f06e84f4079150658e0808451cea2b36665c91c) --- internal/eventstore/eventstore.go | 36 +++++++++++++++++-- .../eventstore/handler/handler_projection.go | 2 +- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 67ab235910..f602a53f12 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -23,6 +23,10 @@ type Eventstore struct { eventTypes []string aggregateTypes []string PushTimeout time.Duration + + instancesMu sync.Mutex + instances []string + lastInstancesQuery time.Time } type eventTypeInterceptors struct { @@ -73,7 +77,16 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error } func (es *Eventstore) NewInstance(ctx context.Context, instanceID string) error { - return es.repo.CreateInstance(ctx, instanceID) + err := es.repo.CreateInstance(ctx, instanceID) + if err != nil { + return err + } + + es.instancesMu.Lock() + es.instances = append(es.instances, instanceID) + es.instancesMu.Unlock() + + return nil } func (es *Eventstore) EventTypes() []string { @@ -208,12 +221,29 @@ func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQu } // InstanceIDs returns the instance ids found by the search query -func (es *Eventstore) InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error) { +// forceDBCall forces to query the database, the instance ids are not cached +func (es *Eventstore) InstanceIDs(ctx context.Context, maxAge time.Duration, forceDBCall bool, queryFactory *SearchQueryBuilder) ([]string, error) { + es.instancesMu.Lock() + defer es.instancesMu.Unlock() + + if !forceDBCall && time.Since(es.lastInstancesQuery) <= maxAge { + return es.instances, nil + } + query, err := queryFactory.build(authz.GetInstance(ctx).InstanceID()) if err != nil { return nil, err } - return es.repo.InstanceIDs(ctx, query) + instances, err := es.repo.InstanceIDs(ctx, query) + if err != nil { + return nil, err + } + if !forceDBCall { + es.instances = instances + es.lastInstancesQuery = time.Now() + } + + return instances, nil } type QueryReducer interface { diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index dcd24b3d7b..2dbcfc9aa0 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -279,7 +279,7 @@ func (h *ProjectionHandler) schedule(ctx context.Context) { // This ensures that only instances with recent events on the handler are projected query = query.CreationDateAfter(h.nowFunc().Add(-1 * h.handleActiveInstances)) } - ids, err := h.Eventstore.InstanceIDs(ctx, query.Builder()) + ids, err := h.Eventstore.InstanceIDs(ctx, h.requeueAfter, !succeededOnce, query.Builder()) if err != nil { logging.WithFields("projection", h.ProjectionName).WithError(err).Error("instance ids") h.triggerProjection.Reset(h.requeueAfter)