diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 3b2265d10e..625a03dafc 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -159,10 +159,10 @@ Projections: ConcurrentInstances: 1 # Limit of returned events per query BulkLimit: 200 - # If HandleInactiveInstances this is false, only instances are projected, - # for which at least a projection relevant event exists withing the timeframe - # from twice the RequeueEvery time in the past until the projections current time - HandleInactiveInstances: false + # Only instance are projected, for which at least a projection relevant event exists withing the timeframe + # from HandleActiveInstances duration in the past until the projections current time + # Defaults to twice the RequeueEvery duration + HandleActiveInstances: 120s # In the Customizations section, all settings from above can be overwritten for each specific projection Customizations: Projects: @@ -174,7 +174,8 @@ Projections: # The NotificationsQuotas projection is used for calling quota webhooks NotificationsQuotas: # Delivery guarantee requirements are probably higher for quota webhooks - HandleInactiveInstances: true + # Defaults to 45 days + HandleActiveInstances: 1080h # As quota notification projections don't result in database statements, retries don't have an effect MaxFailureCount: 0 # Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much. diff --git a/docs/docs/self-hosting/manage/production.md b/docs/docs/self-hosting/manage/production.md index 5fbd7465ef..8e2aae0be7 100644 --- a/docs/docs/self-hosting/manage/production.md +++ b/docs/docs/self-hosting/manage/production.md @@ -84,14 +84,14 @@ Projections: RetryFailedAfter: 1s # Retried execution number of database statements resulting from projected events MaxFailureCount: 5 - # Number of concurrent projection routines + # Number of concurrent projection routines. Values of 0 and below are overwritten to 1 ConcurrentInstances: 1 # Limit of returned events per query BulkLimit: 200 - # If HandleInactiveInstances this is false, only instances are projected, - # for which at least a projection relevant event exists withing the timeframe - # from twice the RequeueEvery time in the past until the projections current time - HandleInactiveInstances: false + # Only instance are projected, for which at least a projection relevant event exists withing the timeframe + # from HandleActiveInstances duration in the past until the projections current time + # Defaults to twice the RequeueEvery duration + HandleActiveInstances: 120s # In the Customizations section, all settings from above can be overwritten for each specific projection Customizations: Projects: @@ -103,7 +103,8 @@ Projections: # The NotificationsQuotas projection is used for calling quota webhooks NotificationsQuotas: # Delivery guarantee requirements are probably higher for quota webhooks - HandleInactiveInstances: true + # Defaults to 45 days + HandleActiveInstances: 1080h # As quota notification projections don't result in database statements, retries don't have an effect MaxFailureCount: 0 # Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much. diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index d98ee2d166..2b4160586f 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -20,12 +20,12 @@ const ( type ProjectionHandlerConfig struct { HandlerConfig - ProjectionName string - RequeueEvery time.Duration - RetryFailedAfter time.Duration - Retries uint - ConcurrentInstances uint - HandleInactiveInstances bool + ProjectionName string + RequeueEvery time.Duration + RetryFailedAfter time.Duration + Retries uint + ConcurrentInstances uint + HandleActiveInstances time.Duration } // Update updates the projection with the given statements @@ -49,19 +49,19 @@ type NowFunc func() time.Time type ProjectionHandler struct { Handler - ProjectionName string - reduce Reduce - update Update - searchQuery SearchQuery - triggerProjection *time.Timer - lock Lock - unlock Unlock - requeueAfter time.Duration - retryFailedAfter time.Duration - retries int - concurrentInstances int - handleInactiveInstances bool - nowFunc NowFunc + ProjectionName string + reduce Reduce + update Update + searchQuery SearchQuery + triggerProjection *time.Timer + lock Lock + unlock Unlock + requeueAfter time.Duration + retryFailedAfter time.Duration + retries int + concurrentInstances int + handleActiveInstances time.Duration + nowFunc NowFunc } func NewProjectionHandler( @@ -79,20 +79,20 @@ func NewProjectionHandler( concurrentInstances = 1 } h := &ProjectionHandler{ - Handler: NewHandler(config.HandlerConfig), - ProjectionName: config.ProjectionName, - reduce: reduce, - update: update, - searchQuery: query, - lock: lock, - unlock: unlock, - requeueAfter: config.RequeueEvery, - triggerProjection: time.NewTimer(0), // first trigger is instant on startup - retryFailedAfter: config.RetryFailedAfter, - retries: int(config.Retries), - concurrentInstances: concurrentInstances, - handleInactiveInstances: config.HandleInactiveInstances, - nowFunc: time.Now, + Handler: NewHandler(config.HandlerConfig), + ProjectionName: config.ProjectionName, + reduce: reduce, + update: update, + searchQuery: query, + lock: lock, + unlock: unlock, + requeueAfter: config.RequeueEvery, + triggerProjection: time.NewTimer(0), // first trigger is instant on startup + retryFailedAfter: config.RetryFailedAfter, + retries: int(config.Retries), + concurrentInstances: concurrentInstances, + handleActiveInstances: config.HandleActiveInstances, + nowFunc: time.Now, } go func() { @@ -229,11 +229,11 @@ func (h *ProjectionHandler) schedule(ctx context.Context) { } go h.cancelOnErr(lockCtx, errs, cancelLock) } - if succeededOnce && !h.handleInactiveInstances { + if succeededOnce { // since we have at least one successful run, we can restrict it to events not older than - // twice the requeue time (just to be sure not to miss an event) + // h.handleActiveInstances (just to be sure not to miss an event) // This ensures that only instances with recent events on the handler are projected - query = query.CreationDateAfter(h.nowFunc().Add(-2 * h.requeueAfter)) + query = query.CreationDateAfter(h.nowFunc().Add(-1 * h.handleActiveInstances)) } ids, err := h.Eventstore.InstanceIDs(ctx, query.Builder()) if err != nil { diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index 1131d98f92..3ab88ef4a1 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -668,13 +668,13 @@ func TestProjection_schedule(t *testing.T) { ctx context.Context } type fields struct { - reduce Reduce - update Update - eventstore func(t *testing.T) *eventstore.Eventstore - lock *lockMock - unlock *unlockMock - query SearchQuery - handleInactiveInstances bool + reduce Reduce + update Update + eventstore func(t *testing.T) *eventstore.Eventstore + lock *lockMock + unlock *unlockMock + query SearchQuery + handleActiveInstances time.Duration } type want struct { locksCount int @@ -711,7 +711,7 @@ func TestProjection_schedule(t *testing.T) { ), ) }, - handleInactiveInstances: false, + handleActiveInstances: 2 * time.Minute, }, want{ locksCount: 0, @@ -739,7 +739,7 @@ func TestProjection_schedule(t *testing.T) { ), ) }, - handleInactiveInstances: false, + handleActiveInstances: 2 * time.Minute, }, want{ locksCount: 0, @@ -771,7 +771,7 @@ func TestProjection_schedule(t *testing.T) { firstErr: ErrLock, canceled: make(chan bool, 1), }, - handleInactiveInstances: false, + handleActiveInstances: 2 * time.Minute, }, want{ locksCount: 1, @@ -803,9 +803,9 @@ func TestProjection_schedule(t *testing.T) { firstErr: nil, errWait: 100 * time.Millisecond, }, - unlock: &unlockMock{}, - query: testQuery(nil, 0, ErrQuery), - handleInactiveInstances: false, + unlock: &unlockMock{}, + query: testQuery(nil, 0, ErrQuery), + handleActiveInstances: 2 * time.Minute, }, want{ locksCount: 1, @@ -837,7 +837,7 @@ func TestProjection_schedule(t *testing.T) { }, { Field: repository.FieldCreationDate, Operation: repository.OperationGreater, - Value: now().Add(-2 * time.Hour), + Value: now().Add(-2 * time.Minute), }}, "206626268110651755", ). @@ -855,10 +855,10 @@ func TestProjection_schedule(t *testing.T) { firstErr: nil, errWait: 100 * time.Millisecond, }, - unlock: &unlockMock{}, - handleInactiveInstances: false, - reduce: testReduce(newTestStatement("aggregate1", 1, 0)), - update: testUpdate(t, 1, 1, nil), + unlock: &unlockMock{}, + handleActiveInstances: 2 * time.Minute, + reduce: testReduce(newTestStatement("aggregate1", 1, 0)), + update: testUpdate(t, 1, 1, nil), query: testQuery( eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AddQuery(). @@ -894,6 +894,10 @@ func TestProjection_schedule(t *testing.T) { Field: repository.FieldInstanceID, Operation: repository.OperationNotIn, Value: database.StringArray{""}, + }, { + Field: repository.FieldCreationDate, + Operation: repository.OperationGreater, + Value: now().Add(-45 * time.Hour), }}, "206626268110651755"). ExpectFilterEvents(&repository.Event{ AggregateType: "quota", @@ -909,10 +913,10 @@ func TestProjection_schedule(t *testing.T) { firstErr: nil, errWait: 100 * time.Millisecond, }, - unlock: &unlockMock{}, - handleInactiveInstances: true, - reduce: testReduce(newTestStatement("aggregate1", 1, 0)), - update: testUpdate(t, 1, 1, nil), + unlock: &unlockMock{}, + handleActiveInstances: 45 * time.Hour, + reduce: testReduce(newTestStatement("aggregate1", 1, 0)), + update: testUpdate(t, 1, 1, nil), query: testQuery( eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AddQuery(). @@ -936,17 +940,17 @@ func TestProjection_schedule(t *testing.T) { EventQueue: make(chan eventstore.Event, 10), Eventstore: tt.fields.eventstore(t), }, - reduce: tt.fields.reduce, - update: tt.fields.update, - searchQuery: tt.fields.query, - lock: tt.fields.lock.lock(), - unlock: tt.fields.unlock.unlock(), - triggerProjection: time.NewTimer(0), // immediately run an iteration - requeueAfter: time.Hour, // run only one iteration - concurrentInstances: 1, - handleInactiveInstances: tt.fields.handleInactiveInstances, - retries: 0, - nowFunc: now, + reduce: tt.fields.reduce, + update: tt.fields.update, + searchQuery: tt.fields.query, + lock: tt.fields.lock.lock(), + unlock: tt.fields.unlock.unlock(), + triggerProjection: time.NewTimer(0), // immediately run an iteration + requeueAfter: time.Hour, // run only one iteration + concurrentInstances: 1, + handleActiveInstances: tt.fields.handleActiveInstances, + retries: 0, + nowFunc: now, } ctx, cancel := context.WithCancel(tt.args.ctx) go func() { diff --git a/internal/logstore/service_test.go b/internal/logstore/service_test.go index ecf8d287af..ef6fcd128e 100644 --- a/internal/logstore/service_test.go +++ b/internal/logstore/service_test.go @@ -252,7 +252,7 @@ func given(t *testing.T, args args, want want) (context.Context, *clock.Mock, *e svc := logstore.New( quotaqueriermock.NewNoopQuerier(&args.config, periodStart), - logstore.UsageReporterFunc(func(context.Context, []*quota.NotifiedEvent) error { return nil }), + logstore.UsageReporterFunc(func(context.Context, []*quota.NotificationDueEvent) error { return nil }), mainEmitter, secondaryEmitter) diff --git a/internal/query/projection/config.go b/internal/query/projection/config.go index 9039c8ba97..355913639f 100644 --- a/internal/query/projection/config.go +++ b/internal/query/projection/config.go @@ -5,20 +5,20 @@ import ( ) type Config struct { - RequeueEvery time.Duration - RetryFailedAfter time.Duration - MaxFailureCount uint - ConcurrentInstances uint - BulkLimit uint64 - Customizations map[string]CustomConfig - HandleInactiveInstances bool + RequeueEvery time.Duration + RetryFailedAfter time.Duration + MaxFailureCount uint + ConcurrentInstances uint + BulkLimit uint64 + Customizations map[string]CustomConfig + HandleActiveInstances time.Duration } type CustomConfig struct { - RequeueEvery *time.Duration - RetryFailedAfter *time.Duration - MaxFailureCount *uint - ConcurrentInstances *uint - BulkLimit *uint64 - HandleInactiveInstances *bool + RequeueEvery *time.Duration + RetryFailedAfter *time.Duration + MaxFailureCount *uint + ConcurrentInstances *uint + BulkLimit *uint64 + HandleActiveInstances *time.Duration } diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 049c6a80df..e3c5cd4f71 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -81,11 +81,11 @@ func Create(ctx context.Context, sqlClient *database.DB, es *eventstore.Eventsto HandlerConfig: handler.HandlerConfig{ Eventstore: es, }, - RequeueEvery: config.RequeueEvery, - RetryFailedAfter: config.RetryFailedAfter, - Retries: config.MaxFailureCount, - ConcurrentInstances: config.ConcurrentInstances, - HandleInactiveInstances: config.HandleInactiveInstances, + RequeueEvery: config.RequeueEvery, + RetryFailedAfter: config.RetryFailedAfter, + Retries: config.MaxFailureCount, + ConcurrentInstances: config.ConcurrentInstances, + HandleActiveInstances: config.HandleActiveInstances, }, Client: sqlClient, SequenceTable: CurrentSeqTable, @@ -175,8 +175,8 @@ func applyCustomConfig(config crdb.StatementHandlerConfig, customConfig CustomCo if customConfig.RetryFailedAfter != nil { config.RetryFailedAfter = *customConfig.RetryFailedAfter } - if customConfig.HandleInactiveInstances != nil { - config.HandleInactiveInstances = *customConfig.HandleInactiveInstances + if customConfig.HandleActiveInstances != nil { + config.HandleActiveInstances = *customConfig.HandleActiveInstances } return config