mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 19:17:32 +00:00
fix(projections): handle every instance by default and randomize start (#7093)
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -111,11 +112,19 @@ func (h *Handler) Start(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (h *Handler) schedule(ctx context.Context) {
|
||||
// if there was no run before trigger instantly
|
||||
t := time.NewTimer(0)
|
||||
// if there was no run before trigger within half a second
|
||||
start := randomizeStart(0, 0.5)
|
||||
t := time.NewTimer(start)
|
||||
didInitialize := h.didProjectionInitialize(ctx)
|
||||
if didInitialize {
|
||||
t.Reset(h.requeueEvery)
|
||||
if !t.Stop() {
|
||||
<-t.C
|
||||
}
|
||||
// if there was a trigger before, start the projection
|
||||
// after a second (should generally be after the not initialized projections)
|
||||
// and its configured `RequeueEvery`
|
||||
reset := randomizeStart(1, h.requeueEvery.Seconds())
|
||||
t.Reset(reset)
|
||||
}
|
||||
|
||||
for {
|
||||
@@ -157,6 +166,11 @@ func (h *Handler) schedule(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func randomizeStart(min, maxSeconds float64) time.Duration {
|
||||
d := min + rand.Float64()*(maxSeconds-min)
|
||||
return time.Duration(d*1000) * time.Millisecond
|
||||
}
|
||||
|
||||
func (h *Handler) subscribe(ctx context.Context) {
|
||||
queue := make(chan eventstore.Event, 100)
|
||||
subscription := eventstore.SubscribeEventTypes(queue, h.eventTypes)
|
||||
@@ -213,7 +227,7 @@ func (h *Handler) queryInstances(ctx context.Context, didInitialize bool) ([]str
|
||||
AwaitOpenTransactions().
|
||||
AllowTimeTravel().
|
||||
ExcludedInstanceID("")
|
||||
if didInitialize {
|
||||
if didInitialize && h.handleActiveInstances > 0 {
|
||||
query = query.
|
||||
CreationDateAfter(h.now().Add(-1 * h.handleActiveInstances))
|
||||
}
|
||||
|
Reference in New Issue
Block a user