fix: use fixed active instances duration (#5567)

* fix: use fixed active instances duration

* fix active instances tests

* fix syntax error

* run pipeline

---------

Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
Elio Bischof
2023-03-30 13:01:27 +02:00
committed by GitHub
parent 9b30d6ad83
commit 887e2f474d
7 changed files with 107 additions and 101 deletions

View File

@@ -20,12 +20,12 @@ const (
type ProjectionHandlerConfig struct {
HandlerConfig
ProjectionName string
RequeueEvery time.Duration
RetryFailedAfter time.Duration
Retries uint
ConcurrentInstances uint
HandleInactiveInstances bool
ProjectionName string
RequeueEvery time.Duration
RetryFailedAfter time.Duration
Retries uint
ConcurrentInstances uint
HandleActiveInstances time.Duration
}
// Update updates the projection with the given statements
@@ -49,19 +49,19 @@ type NowFunc func() time.Time
type ProjectionHandler struct {
Handler
ProjectionName string
reduce Reduce
update Update
searchQuery SearchQuery
triggerProjection *time.Timer
lock Lock
unlock Unlock
requeueAfter time.Duration
retryFailedAfter time.Duration
retries int
concurrentInstances int
handleInactiveInstances bool
nowFunc NowFunc
ProjectionName string
reduce Reduce
update Update
searchQuery SearchQuery
triggerProjection *time.Timer
lock Lock
unlock Unlock
requeueAfter time.Duration
retryFailedAfter time.Duration
retries int
concurrentInstances int
handleActiveInstances time.Duration
nowFunc NowFunc
}
func NewProjectionHandler(
@@ -79,20 +79,20 @@ func NewProjectionHandler(
concurrentInstances = 1
}
h := &ProjectionHandler{
Handler: NewHandler(config.HandlerConfig),
ProjectionName: config.ProjectionName,
reduce: reduce,
update: update,
searchQuery: query,
lock: lock,
unlock: unlock,
requeueAfter: config.RequeueEvery,
triggerProjection: time.NewTimer(0), // first trigger is instant on startup
retryFailedAfter: config.RetryFailedAfter,
retries: int(config.Retries),
concurrentInstances: concurrentInstances,
handleInactiveInstances: config.HandleInactiveInstances,
nowFunc: time.Now,
Handler: NewHandler(config.HandlerConfig),
ProjectionName: config.ProjectionName,
reduce: reduce,
update: update,
searchQuery: query,
lock: lock,
unlock: unlock,
requeueAfter: config.RequeueEvery,
triggerProjection: time.NewTimer(0), // first trigger is instant on startup
retryFailedAfter: config.RetryFailedAfter,
retries: int(config.Retries),
concurrentInstances: concurrentInstances,
handleActiveInstances: config.HandleActiveInstances,
nowFunc: time.Now,
}
go func() {
@@ -229,11 +229,11 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
}
go h.cancelOnErr(lockCtx, errs, cancelLock)
}
if succeededOnce && !h.handleInactiveInstances {
if succeededOnce {
// since we have at least one successful run, we can restrict it to events not older than
// twice the requeue time (just to be sure not to miss an event)
// h.handleActiveInstances (just to be sure not to miss an event)
// This ensures that only instances with recent events on the handler are projected
query = query.CreationDateAfter(h.nowFunc().Add(-2 * h.requeueAfter))
query = query.CreationDateAfter(h.nowFunc().Add(-1 * h.handleActiveInstances))
}
ids, err := h.Eventstore.InstanceIDs(ctx, query.Builder())
if err != nil {

View File

@@ -668,13 +668,13 @@ func TestProjection_schedule(t *testing.T) {
ctx context.Context
}
type fields struct {
reduce Reduce
update Update
eventstore func(t *testing.T) *eventstore.Eventstore
lock *lockMock
unlock *unlockMock
query SearchQuery
handleInactiveInstances bool
reduce Reduce
update Update
eventstore func(t *testing.T) *eventstore.Eventstore
lock *lockMock
unlock *unlockMock
query SearchQuery
handleActiveInstances time.Duration
}
type want struct {
locksCount int
@@ -711,7 +711,7 @@ func TestProjection_schedule(t *testing.T) {
),
)
},
handleInactiveInstances: false,
handleActiveInstances: 2 * time.Minute,
},
want{
locksCount: 0,
@@ -739,7 +739,7 @@ func TestProjection_schedule(t *testing.T) {
),
)
},
handleInactiveInstances: false,
handleActiveInstances: 2 * time.Minute,
},
want{
locksCount: 0,
@@ -771,7 +771,7 @@ func TestProjection_schedule(t *testing.T) {
firstErr: ErrLock,
canceled: make(chan bool, 1),
},
handleInactiveInstances: false,
handleActiveInstances: 2 * time.Minute,
},
want{
locksCount: 1,
@@ -803,9 +803,9 @@ func TestProjection_schedule(t *testing.T) {
firstErr: nil,
errWait: 100 * time.Millisecond,
},
unlock: &unlockMock{},
query: testQuery(nil, 0, ErrQuery),
handleInactiveInstances: false,
unlock: &unlockMock{},
query: testQuery(nil, 0, ErrQuery),
handleActiveInstances: 2 * time.Minute,
},
want{
locksCount: 1,
@@ -837,7 +837,7 @@ func TestProjection_schedule(t *testing.T) {
}, {
Field: repository.FieldCreationDate,
Operation: repository.OperationGreater,
Value: now().Add(-2 * time.Hour),
Value: now().Add(-2 * time.Minute),
}},
"206626268110651755",
).
@@ -855,10 +855,10 @@ func TestProjection_schedule(t *testing.T) {
firstErr: nil,
errWait: 100 * time.Millisecond,
},
unlock: &unlockMock{},
handleInactiveInstances: false,
reduce: testReduce(newTestStatement("aggregate1", 1, 0)),
update: testUpdate(t, 1, 1, nil),
unlock: &unlockMock{},
handleActiveInstances: 2 * time.Minute,
reduce: testReduce(newTestStatement("aggregate1", 1, 0)),
update: testUpdate(t, 1, 1, nil),
query: testQuery(
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AddQuery().
@@ -894,6 +894,10 @@ func TestProjection_schedule(t *testing.T) {
Field: repository.FieldInstanceID,
Operation: repository.OperationNotIn,
Value: database.StringArray{""},
}, {
Field: repository.FieldCreationDate,
Operation: repository.OperationGreater,
Value: now().Add(-45 * time.Hour),
}}, "206626268110651755").
ExpectFilterEvents(&repository.Event{
AggregateType: "quota",
@@ -909,10 +913,10 @@ func TestProjection_schedule(t *testing.T) {
firstErr: nil,
errWait: 100 * time.Millisecond,
},
unlock: &unlockMock{},
handleInactiveInstances: true,
reduce: testReduce(newTestStatement("aggregate1", 1, 0)),
update: testUpdate(t, 1, 1, nil),
unlock: &unlockMock{},
handleActiveInstances: 45 * time.Hour,
reduce: testReduce(newTestStatement("aggregate1", 1, 0)),
update: testUpdate(t, 1, 1, nil),
query: testQuery(
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AddQuery().
@@ -936,17 +940,17 @@ func TestProjection_schedule(t *testing.T) {
EventQueue: make(chan eventstore.Event, 10),
Eventstore: tt.fields.eventstore(t),
},
reduce: tt.fields.reduce,
update: tt.fields.update,
searchQuery: tt.fields.query,
lock: tt.fields.lock.lock(),
unlock: tt.fields.unlock.unlock(),
triggerProjection: time.NewTimer(0), // immediately run an iteration
requeueAfter: time.Hour, // run only one iteration
concurrentInstances: 1,
handleInactiveInstances: tt.fields.handleInactiveInstances,
retries: 0,
nowFunc: now,
reduce: tt.fields.reduce,
update: tt.fields.update,
searchQuery: tt.fields.query,
lock: tt.fields.lock.lock(),
unlock: tt.fields.unlock.unlock(),
triggerProjection: time.NewTimer(0), // immediately run an iteration
requeueAfter: time.Hour, // run only one iteration
concurrentInstances: 1,
handleActiveInstances: tt.fields.handleActiveInstances,
retries: 0,
nowFunc: now,
}
ctx, cancel := context.WithCancel(tt.args.ctx)
go func() {