fix(eventstore): cache instances (#6501)

* fix(eventstore): cache instances

* fix: consider succeeded once during instance ids query

* fix(eventstore): return correct instances
This commit is contained in:
Silvan 2023-09-06 16:34:07 +02:00 committed by GitHub
parent f7e7af0083
commit 0f06e84f40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 4 deletions

View File

@ -23,6 +23,10 @@ type Eventstore struct {
eventTypes []string eventTypes []string
aggregateTypes []string aggregateTypes []string
PushTimeout time.Duration PushTimeout time.Duration
instancesMu sync.Mutex
instances []string
lastInstancesQuery time.Time
} }
type eventTypeInterceptors struct { type eventTypeInterceptors struct {
@ -73,7 +77,16 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error
} }
func (es *Eventstore) NewInstance(ctx context.Context, instanceID string) error { func (es *Eventstore) NewInstance(ctx context.Context, instanceID string) error {
return es.repo.CreateInstance(ctx, instanceID) err := es.repo.CreateInstance(ctx, instanceID)
if err != nil {
return err
}
es.instancesMu.Lock()
es.instances = append(es.instances, instanceID)
es.instancesMu.Unlock()
return nil
} }
func (es *Eventstore) EventTypes() []string { func (es *Eventstore) EventTypes() []string {
@ -208,12 +221,29 @@ func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQu
} }
// InstanceIDs returns the instance ids found by the search query // InstanceIDs returns the instance ids found by the search query
func (es *Eventstore) InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error) { // forceDBCall forces to query the database, the instance ids are not cached
func (es *Eventstore) InstanceIDs(ctx context.Context, maxAge time.Duration, forceDBCall bool, queryFactory *SearchQueryBuilder) ([]string, error) {
es.instancesMu.Lock()
defer es.instancesMu.Unlock()
if !forceDBCall && time.Since(es.lastInstancesQuery) <= maxAge {
return es.instances, nil
}
query, err := queryFactory.build(authz.GetInstance(ctx).InstanceID()) query, err := queryFactory.build(authz.GetInstance(ctx).InstanceID())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return es.repo.InstanceIDs(ctx, query) instances, err := es.repo.InstanceIDs(ctx, query)
if err != nil {
return nil, err
}
if !forceDBCall {
es.instances = instances
es.lastInstancesQuery = time.Now()
}
return instances, nil
} }
type QueryReducer interface { type QueryReducer interface {

View File

@ -279,7 +279,7 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
// This ensures that only instances with recent events on the handler are projected // This ensures that only instances with recent events on the handler are projected
query = query.CreationDateAfter(h.nowFunc().Add(-1 * h.handleActiveInstances)) query = query.CreationDateAfter(h.nowFunc().Add(-1 * h.handleActiveInstances))
} }
ids, err := h.Eventstore.InstanceIDs(ctx, query.Builder()) ids, err := h.Eventstore.InstanceIDs(ctx, h.requeueAfter, !succeededOnce, query.Builder())
if err != nil { if err != nil {
logging.WithFields("projection", h.ProjectionName).WithError(err).Error("instance ids") logging.WithFields("projection", h.ProjectionName).WithError(err).Error("instance ids")
h.triggerProjection.Reset(h.requeueAfter) h.triggerProjection.Reset(h.requeueAfter)