mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 21:37:32 +00:00
fix(setup): init projections (#7194)
Even though this is a feature it's released as fix so that we can back port to earlier revisions. As reported by multiple users startup of ZITADEL after leaded to downtime and worst case rollbacks to the previously deployed version. The problem starts rising when there are too many events to process after the start of ZITADEL. The root cause are changes on projections (database tables) which must be recomputed. This PR solves this problem by adding a new step to the setup phase which prefills the projections. The step can be enabled by adding the `--init-projections`-flag to `setup`, `start-from-init` and `start-from-setup`. Setting this flag results in potentially longer duration of the setup phase but reduces the risk of the problems mentioned in the paragraph above.
This commit is contained in:
@@ -12,13 +12,7 @@ import (
|
||||
// Eventstore abstracts all functions needed to store valid events
|
||||
// and filters the stored events
|
||||
type Eventstore struct {
|
||||
// TODO: get rid of this mutex,
|
||||
// or if we scale to >4vCPU use a sync.Map
|
||||
interceptorMutex sync.RWMutex
|
||||
eventInterceptors map[EventType]eventTypeInterceptors
|
||||
eventTypes []string
|
||||
aggregateTypes []string
|
||||
PushTimeout time.Duration
|
||||
PushTimeout time.Duration
|
||||
|
||||
pusher Pusher
|
||||
querier Querier
|
||||
@@ -28,14 +22,36 @@ type Eventstore struct {
|
||||
instancesMu sync.Mutex
|
||||
}
|
||||
|
||||
var (
|
||||
eventInterceptors map[EventType]eventTypeInterceptors
|
||||
eventTypes []string
|
||||
aggregateTypes []string
|
||||
)
|
||||
|
||||
// RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
|
||||
func RegisterFilterEventMapper(aggregateType AggregateType, eventType EventType, mapper func(Event) (Event, error)) {
|
||||
if mapper == nil || eventType == "" {
|
||||
return
|
||||
}
|
||||
|
||||
appendEventType(eventType)
|
||||
appendAggregateType(aggregateType)
|
||||
|
||||
if eventInterceptors == nil {
|
||||
eventInterceptors = make(map[EventType]eventTypeInterceptors)
|
||||
}
|
||||
interceptor := eventInterceptors[eventType]
|
||||
interceptor.eventMapper = mapper
|
||||
eventInterceptors[eventType] = interceptor
|
||||
}
|
||||
|
||||
type eventTypeInterceptors struct {
|
||||
eventMapper func(Event) (Event, error)
|
||||
}
|
||||
|
||||
func NewEventstore(config *Config) *Eventstore {
|
||||
return &Eventstore{
|
||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||
PushTimeout: config.PushTimeout,
|
||||
PushTimeout: config.PushTimeout,
|
||||
|
||||
pusher: config.Pusher,
|
||||
querier: config.Querier,
|
||||
@@ -75,11 +91,11 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error
|
||||
}
|
||||
|
||||
func (es *Eventstore) EventTypes() []string {
|
||||
return es.eventTypes
|
||||
return eventTypes
|
||||
}
|
||||
|
||||
func (es *Eventstore) AggregateTypes() []string {
|
||||
return es.aggregateTypes
|
||||
return aggregateTypes
|
||||
}
|
||||
|
||||
// Filter filters the stored events based on the searchQuery
|
||||
@@ -105,28 +121,21 @@ func (es *Eventstore) Filter(ctx context.Context, searchQuery *SearchQueryBuilde
|
||||
|
||||
func (es *Eventstore) mapEvents(events []Event) (mappedEvents []Event, err error) {
|
||||
mappedEvents = make([]Event, len(events))
|
||||
|
||||
es.interceptorMutex.RLock()
|
||||
defer es.interceptorMutex.RUnlock()
|
||||
|
||||
for i, event := range events {
|
||||
mappedEvents[i], err = es.mapEventLocked(event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return mappedEvents, nil
|
||||
}
|
||||
|
||||
func (es *Eventstore) mapEvent(event Event) (Event, error) {
|
||||
es.interceptorMutex.RLock()
|
||||
defer es.interceptorMutex.RUnlock()
|
||||
return es.mapEventLocked(event)
|
||||
}
|
||||
|
||||
func (es *Eventstore) mapEventLocked(event Event) (Event, error) {
|
||||
interceptors, ok := es.eventInterceptors[event.Type()]
|
||||
interceptors, ok := eventInterceptors[event.Type()]
|
||||
if !ok || interceptors.eventMapper == nil {
|
||||
return BaseEventFromRepo(event), nil
|
||||
}
|
||||
@@ -204,24 +213,6 @@ func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer)
|
||||
return es.FilterToReducer(ctx, r.Query(), r)
|
||||
}
|
||||
|
||||
// RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
|
||||
func (es *Eventstore) RegisterFilterEventMapper(aggregateType AggregateType, eventType EventType, mapper func(Event) (Event, error)) *Eventstore {
|
||||
if mapper == nil || eventType == "" {
|
||||
return es
|
||||
}
|
||||
es.interceptorMutex.Lock()
|
||||
defer es.interceptorMutex.Unlock()
|
||||
|
||||
es.appendEventType(eventType)
|
||||
es.appendAggregateType(aggregateType)
|
||||
|
||||
interceptor := es.eventInterceptors[eventType]
|
||||
interceptor.eventMapper = mapper
|
||||
es.eventInterceptors[eventType] = interceptor
|
||||
|
||||
return es
|
||||
}
|
||||
|
||||
type Reducer func(event Event) error
|
||||
|
||||
type Querier interface {
|
||||
@@ -242,18 +233,18 @@ type Pusher interface {
|
||||
Push(ctx context.Context, commands ...Command) (_ []Event, err error)
|
||||
}
|
||||
|
||||
func (es *Eventstore) appendEventType(typ EventType) {
|
||||
i := sort.SearchStrings(es.eventTypes, string(typ))
|
||||
if i < len(es.eventTypes) && es.eventTypes[i] == string(typ) {
|
||||
func appendEventType(typ EventType) {
|
||||
i := sort.SearchStrings(eventTypes, string(typ))
|
||||
if i < len(eventTypes) && eventTypes[i] == string(typ) {
|
||||
return
|
||||
}
|
||||
es.eventTypes = append(es.eventTypes[:i], append([]string{string(typ)}, es.eventTypes[i:]...)...)
|
||||
eventTypes = append(eventTypes[:i], append([]string{string(typ)}, eventTypes[i:]...)...)
|
||||
}
|
||||
|
||||
func (es *Eventstore) appendAggregateType(typ AggregateType) {
|
||||
i := sort.SearchStrings(es.aggregateTypes, string(typ))
|
||||
if len(es.aggregateTypes) > i && es.aggregateTypes[i] == string(typ) {
|
||||
func appendAggregateType(typ AggregateType) {
|
||||
i := sort.SearchStrings(aggregateTypes, string(typ))
|
||||
if len(aggregateTypes) > i && aggregateTypes[i] == string(typ) {
|
||||
return
|
||||
}
|
||||
es.aggregateTypes = append(es.aggregateTypes[:i], append([]string{string(typ)}, es.aggregateTypes[i:]...)...)
|
||||
aggregateTypes = append(aggregateTypes[:i], append([]string{string(typ)}, aggregateTypes[i:]...)...)
|
||||
}
|
||||
|
@@ -139,12 +139,11 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
es := &Eventstore{
|
||||
eventInterceptors: tt.fields.eventMapper,
|
||||
}
|
||||
es = es.RegisterFilterEventMapper("test", tt.args.eventType, tt.args.mapper)
|
||||
if len(es.eventInterceptors) != tt.res.mapperCount {
|
||||
t.Errorf("unexpected mapper count: want %d, got %d", tt.res.mapperCount, len(es.eventInterceptors))
|
||||
|
||||
eventInterceptors = tt.fields.eventMapper
|
||||
RegisterFilterEventMapper("test", tt.args.eventType, tt.args.mapper)
|
||||
if len(eventInterceptors) != tt.res.mapperCount {
|
||||
t.Errorf("unexpected mapper count: want %d, got %d", tt.res.mapperCount, len(eventInterceptors))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -152,7 +151,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
mapper := es.eventInterceptors[tt.args.eventType]
|
||||
mapper := eventInterceptors[tt.args.eventType]
|
||||
event, err := mapper.eventMapper(nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
@@ -694,15 +693,15 @@ func TestEventstore_Push(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
eventInterceptors = map[EventType]eventTypeInterceptors{}
|
||||
es := &Eventstore{
|
||||
pusher: tt.fields.pusher,
|
||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||
pusher: tt.fields.pusher,
|
||||
}
|
||||
for eventType, mapper := range tt.fields.eventMapper {
|
||||
es = es.RegisterFilterEventMapper("test", eventType, mapper)
|
||||
RegisterFilterEventMapper("test", eventType, mapper)
|
||||
}
|
||||
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||
if len(eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
@@ -825,16 +824,16 @@ func TestEventstore_FilterEvents(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
eventInterceptors = map[EventType]eventTypeInterceptors{}
|
||||
es := &Eventstore{
|
||||
querier: tt.fields.repo,
|
||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||
querier: tt.fields.repo,
|
||||
}
|
||||
|
||||
for eventType, mapper := range tt.fields.eventMapper {
|
||||
es = es.RegisterFilterEventMapper("test", eventType, mapper)
|
||||
RegisterFilterEventMapper("test", eventType, mapper)
|
||||
}
|
||||
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||
if len(eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
@@ -1130,14 +1129,13 @@ func TestEventstore_FilterToReducer(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
es := &Eventstore{
|
||||
querier: tt.fields.repo,
|
||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||
querier: tt.fields.repo,
|
||||
}
|
||||
for eventType, mapper := range tt.fields.eventMapper {
|
||||
es = es.RegisterFilterEventMapper("test", eventType, mapper)
|
||||
RegisterFilterEventMapper("test", eventType, mapper)
|
||||
}
|
||||
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||
if len(eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
@@ -1246,14 +1244,12 @@ func TestEventstore_mapEvents(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
es := &Eventstore{
|
||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||
}
|
||||
es := &Eventstore{}
|
||||
for eventType, mapper := range tt.fields.eventMapper {
|
||||
es = es.RegisterFilterEventMapper("test", eventType, mapper)
|
||||
RegisterFilterEventMapper("test", eventType, mapper)
|
||||
}
|
||||
if len(es.eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(es.eventInterceptors))
|
||||
if len(eventInterceptors) != len(tt.fields.eventMapper) {
|
||||
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
|
@@ -294,10 +294,10 @@ func TestUserReadModel(t *testing.T) {
|
||||
},
|
||||
)
|
||||
|
||||
es.RegisterFilterEventMapper(UserAddedEventMapper()).
|
||||
RegisterFilterEventMapper(UserFirstNameChangedMapper()).
|
||||
RegisterFilterEventMapper(UserPasswordCheckedMapper()).
|
||||
RegisterFilterEventMapper(UserDeletedMapper())
|
||||
eventstore.RegisterFilterEventMapper(UserAddedEventMapper())
|
||||
eventstore.RegisterFilterEventMapper(UserFirstNameChangedMapper())
|
||||
eventstore.RegisterFilterEventMapper(UserPasswordCheckedMapper())
|
||||
eventstore.RegisterFilterEventMapper(UserDeletedMapper())
|
||||
|
||||
events, err := es.Push(context.Background(),
|
||||
NewUserAddedEvent("1", "hodor"),
|
||||
|
@@ -1,52 +0,0 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
const (
|
||||
schedulerSucceeded = eventstore.EventType("system.projections.scheduler.succeeded")
|
||||
aggregateType = eventstore.AggregateType("system")
|
||||
aggregateID = "SYSTEM"
|
||||
)
|
||||
|
||||
func (h *Handler) didProjectionInitialize(ctx context.Context) bool {
|
||||
events, err := h.es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
InstanceID("").
|
||||
AddQuery().
|
||||
AggregateTypes(aggregateType).
|
||||
AggregateIDs(aggregateID).
|
||||
EventTypes(schedulerSucceeded).
|
||||
EventData(map[string]interface{}{
|
||||
"name": h.projection.Name(),
|
||||
}).
|
||||
Builder(),
|
||||
)
|
||||
return len(events) > 0 && err == nil
|
||||
}
|
||||
|
||||
func (h *Handler) setSucceededOnce(ctx context.Context) error {
|
||||
_, err := h.es.Push(ctx, &ProjectionSucceededEvent{
|
||||
BaseEvent: *eventstore.NewBaseEventForPush(ctx,
|
||||
eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"),
|
||||
schedulerSucceeded,
|
||||
),
|
||||
Name: h.projection.Name(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
type ProjectionSucceededEvent struct {
|
||||
eventstore.BaseEvent `json:"-"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
func (p *ProjectionSucceededEvent) Payload() interface{} {
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ProjectionSucceededEvent) UniqueConstraints() []*eventstore.UniqueConstraint {
|
||||
return nil
|
||||
}
|
@@ -6,20 +6,26 @@ import (
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/api/call"
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/migration"
|
||||
"github.com/zitadel/zitadel/internal/repository/instance"
|
||||
"github.com/zitadel/zitadel/internal/repository/pseudo"
|
||||
)
|
||||
|
||||
type EventStore interface {
|
||||
InstanceIDs(ctx context.Context, maxAge time.Duration, forceLoad bool, query *eventstore.SearchQueryBuilder) ([]string, error)
|
||||
FilterToQueryReducer(ctx context.Context, reducer eventstore.QueryReducer) error
|
||||
Filter(ctx context.Context, queryFactory *eventstore.SearchQueryBuilder) ([]eventstore.Event, error)
|
||||
Push(ctx context.Context, cmds ...eventstore.Command) ([]eventstore.Event, error)
|
||||
}
|
||||
@@ -58,6 +64,70 @@ type Handler struct {
|
||||
triggerWithoutEvents Reduce
|
||||
}
|
||||
|
||||
var _ migration.Migration = (*Handler)(nil)
|
||||
|
||||
// Execute implements migration.Migration.
|
||||
func (h *Handler) Execute(ctx context.Context, startedEvent eventstore.Event) error {
|
||||
start := time.Now()
|
||||
logging.WithFields("projection", h.ProjectionName()).Info("projection starts prefilling")
|
||||
logTicker := time.NewTicker(30 * time.Second)
|
||||
go func() {
|
||||
for range logTicker.C {
|
||||
logging.WithFields("projection", h.ProjectionName()).Info("projection is prefilling")
|
||||
}
|
||||
}()
|
||||
|
||||
instanceIDs, err := h.existingInstances(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// default amount of workers is 10
|
||||
workerCount := 10
|
||||
|
||||
if h.client.DB.Stats().MaxOpenConnections > 0 {
|
||||
workerCount = h.client.DB.Stats().MaxOpenConnections / 4
|
||||
}
|
||||
// ensure that at least one worker is active
|
||||
if workerCount == 0 {
|
||||
workerCount = 1
|
||||
}
|
||||
// spawn less workers if not all workers needed
|
||||
if workerCount > len(instanceIDs) {
|
||||
workerCount = len(instanceIDs)
|
||||
}
|
||||
|
||||
instances := make(chan string, workerCount)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(workerCount)
|
||||
for i := 0; i < workerCount; i++ {
|
||||
go h.executeInstances(ctx, instances, startedEvent, &wg)
|
||||
}
|
||||
|
||||
for _, instance := range instanceIDs {
|
||||
instances <- instance
|
||||
}
|
||||
|
||||
close(instances)
|
||||
wg.Wait()
|
||||
|
||||
logTicker.Stop()
|
||||
logging.WithFields("projection", h.ProjectionName(), "took", time.Since(start)).Info("projections ended prefilling")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) executeInstances(ctx context.Context, instances <-chan string, startedEvent eventstore.Event, wg *sync.WaitGroup) {
|
||||
for instance := range instances {
|
||||
h.triggerInstances(ctx, []string{instance}, WithMaxPosition(startedEvent.Position()))
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
// String implements migration.Migration.
|
||||
func (h *Handler) String() string {
|
||||
return h.ProjectionName()
|
||||
}
|
||||
|
||||
// nowFunc makes [time.Now] mockable
|
||||
type nowFunc func() time.Time
|
||||
|
||||
@@ -111,21 +181,56 @@ func (h *Handler) Start(ctx context.Context) {
|
||||
go h.subscribe(ctx)
|
||||
}
|
||||
|
||||
func (h *Handler) schedule(ctx context.Context) {
|
||||
// if there was no run before trigger within half a second
|
||||
start := randomizeStart(0, 0.5)
|
||||
t := time.NewTimer(start)
|
||||
didInitialize := h.didProjectionInitialize(ctx)
|
||||
if didInitialize {
|
||||
if !t.Stop() {
|
||||
<-t.C
|
||||
}
|
||||
// if there was a trigger before, start the projection
|
||||
// after a second (should generally be after the not initialized projections)
|
||||
// and its configured `RequeueEvery`
|
||||
reset := randomizeStart(1, h.requeueEvery.Seconds())
|
||||
t.Reset(reset)
|
||||
type checkInit struct {
|
||||
didInit bool
|
||||
projectionName string
|
||||
}
|
||||
|
||||
// AppendEvents implements eventstore.QueryReducer.
|
||||
func (ci *checkInit) AppendEvents(...eventstore.Event) {
|
||||
ci.didInit = true
|
||||
}
|
||||
|
||||
// Query implements eventstore.QueryReducer.
|
||||
func (ci *checkInit) Query() *eventstore.SearchQueryBuilder {
|
||||
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
Limit(1).
|
||||
InstanceID("").
|
||||
AddQuery().
|
||||
AggregateTypes(migration.SystemAggregate).
|
||||
AggregateIDs(migration.SystemAggregateID).
|
||||
EventTypes(migration.DoneType).
|
||||
EventData(map[string]interface{}{
|
||||
"name": ci.projectionName,
|
||||
}).
|
||||
Builder()
|
||||
}
|
||||
|
||||
// Reduce implements eventstore.QueryReducer.
|
||||
func (*checkInit) Reduce() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ eventstore.QueryReducer = (*checkInit)(nil)
|
||||
|
||||
func (h *Handler) didInitialize(ctx context.Context) bool {
|
||||
initiated := checkInit{
|
||||
projectionName: h.ProjectionName(),
|
||||
}
|
||||
err := h.es.FilterToQueryReducer(ctx, &initiated)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return initiated.didInit
|
||||
}
|
||||
|
||||
func (h *Handler) schedule(ctx context.Context) {
|
||||
// start the projection and its configured `RequeueEvery`
|
||||
reset := randomizeStart(0, h.requeueEvery.Seconds())
|
||||
if !h.didInitialize(ctx) {
|
||||
reset = randomizeStart(0, 0.5)
|
||||
}
|
||||
t := time.NewTimer(reset)
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -133,40 +238,34 @@ func (h *Handler) schedule(ctx context.Context) {
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
instances, err := h.queryInstances(ctx, didInitialize)
|
||||
instances, err := h.queryInstances(ctx)
|
||||
h.log().OnError(err).Debug("unable to query instances")
|
||||
|
||||
var instanceFailed bool
|
||||
scheduledCtx := call.WithTimestamp(ctx)
|
||||
for _, instance := range instances {
|
||||
instanceCtx := authz.WithInstanceID(scheduledCtx, instance)
|
||||
|
||||
// simple implementation of do while
|
||||
_, err = h.Trigger(instanceCtx)
|
||||
instanceFailed = instanceFailed || err != nil
|
||||
h.log().WithField("instance", instance).OnError(err).Info("scheduled trigger failed")
|
||||
time.Sleep(h.retryFailedAfter)
|
||||
// retry if trigger failed
|
||||
for ; err != nil; _, err = h.Trigger(instanceCtx) {
|
||||
time.Sleep(h.retryFailedAfter)
|
||||
instanceFailed = instanceFailed || err != nil
|
||||
h.log().WithField("instance", instance).OnError(err).Info("scheduled trigger failed")
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !didInitialize && !instanceFailed {
|
||||
err = h.setSucceededOnce(ctx)
|
||||
h.log().OnError(err).Debug("unable to set succeeded once")
|
||||
didInitialize = err == nil
|
||||
}
|
||||
h.triggerInstances(call.WithTimestamp(ctx), instances)
|
||||
t.Reset(h.requeueEvery)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) triggerInstances(ctx context.Context, instances []string, triggerOpts ...TriggerOpt) {
|
||||
for _, instance := range instances {
|
||||
instanceCtx := authz.WithInstanceID(ctx, instance)
|
||||
|
||||
// simple implementation of do while
|
||||
_, err := h.Trigger(instanceCtx, triggerOpts...)
|
||||
h.log().WithField("instance", instance).OnError(err).Debug("trigger failed")
|
||||
time.Sleep(h.retryFailedAfter)
|
||||
// retry if trigger failed
|
||||
for ; err != nil; _, err = h.Trigger(instanceCtx, triggerOpts...) {
|
||||
time.Sleep(h.retryFailedAfter)
|
||||
h.log().WithField("instance", instance).OnError(err).Debug("trigger failed")
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func randomizeStart(min, maxSeconds float64) time.Duration {
|
||||
d := min + rand.Float64()*(maxSeconds-min)
|
||||
return time.Duration(d*1000) * time.Millisecond
|
||||
@@ -223,31 +322,84 @@ func checkAdditionalEvents(eventQueue chan eventstore.Event, event eventstore.Ev
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) queryInstances(ctx context.Context, didInitialize bool) ([]string, error) {
|
||||
type existingInstances []string
|
||||
|
||||
// AppendEvents implements eventstore.QueryReducer.
|
||||
func (ai *existingInstances) AppendEvents(events ...eventstore.Event) {
|
||||
for _, event := range events {
|
||||
switch event.Type() {
|
||||
case instance.InstanceAddedEventType:
|
||||
*ai = append(*ai, event.Aggregate().InstanceID)
|
||||
case instance.InstanceRemovedEventType:
|
||||
slices.DeleteFunc(*ai, func(s string) bool {
|
||||
return s == event.Aggregate().InstanceID
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Query implements eventstore.QueryReducer.
|
||||
func (*existingInstances) Query() *eventstore.SearchQueryBuilder {
|
||||
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
AddQuery().
|
||||
AggregateTypes(instance.AggregateType).
|
||||
EventTypes(
|
||||
instance.InstanceAddedEventType,
|
||||
instance.InstanceRemovedEventType,
|
||||
).
|
||||
Builder()
|
||||
}
|
||||
|
||||
// Reduce implements eventstore.QueryReducer.
|
||||
// reduce is not used as events are reduced during AppendEvents
|
||||
func (*existingInstances) Reduce() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ eventstore.QueryReducer = (*existingInstances)(nil)
|
||||
|
||||
func (h *Handler) queryInstances(ctx context.Context) ([]string, error) {
|
||||
if h.handleActiveInstances == 0 {
|
||||
return h.existingInstances(ctx)
|
||||
}
|
||||
|
||||
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).
|
||||
AwaitOpenTransactions().
|
||||
AllowTimeTravel().
|
||||
ExcludedInstanceID("")
|
||||
if didInitialize && h.handleActiveInstances > 0 {
|
||||
query = query.
|
||||
CreationDateAfter(h.now().Add(-1 * h.handleActiveInstances))
|
||||
CreationDateAfter(h.now().Add(-1 * h.handleActiveInstances))
|
||||
|
||||
return h.es.InstanceIDs(ctx, h.requeueEvery, false, query)
|
||||
}
|
||||
|
||||
func (h *Handler) existingInstances(ctx context.Context) ([]string, error) {
|
||||
ai := existingInstances{}
|
||||
if err := h.es.FilterToQueryReducer(ctx, &ai); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return h.es.InstanceIDs(ctx, h.requeueEvery, !didInitialize, query)
|
||||
|
||||
return ai, nil
|
||||
}
|
||||
|
||||
type triggerConfig struct {
|
||||
awaitRunning bool
|
||||
maxPosition float64
|
||||
}
|
||||
|
||||
type triggerOpt func(conf *triggerConfig)
|
||||
type TriggerOpt func(conf *triggerConfig)
|
||||
|
||||
func WithAwaitRunning() triggerOpt {
|
||||
func WithAwaitRunning() TriggerOpt {
|
||||
return func(conf *triggerConfig) {
|
||||
conf.awaitRunning = true
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) Trigger(ctx context.Context, opts ...triggerOpt) (_ context.Context, err error) {
|
||||
func WithMaxPosition(position float64) TriggerOpt {
|
||||
return func(conf *triggerConfig) {
|
||||
conf.maxPosition = position
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) {
|
||||
config := new(triggerConfig)
|
||||
for _, opt := range opts {
|
||||
opt(config)
|
||||
@@ -261,7 +413,7 @@ func (h *Handler) Trigger(ctx context.Context, opts ...triggerOpt) (_ context.Co
|
||||
|
||||
for i := 0; ; i++ {
|
||||
additionalIteration, err := h.processEvents(ctx, config)
|
||||
h.log().OnError(err).Warn("process events failed")
|
||||
h.log().OnError(err).Info("process events failed")
|
||||
h.log().WithField("iteration", i).Debug("trigger iteration")
|
||||
if !additionalIteration || err != nil {
|
||||
return call.ResetTimestamp(ctx), err
|
||||
@@ -350,6 +502,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
}
|
||||
return additionalIteration, err
|
||||
}
|
||||
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt
|
||||
if config.maxPosition != 0 && currentState.position >= config.maxPosition {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var statements []*Statement
|
||||
statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState)
|
||||
@@ -362,6 +518,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
}
|
||||
|
||||
lastProcessedIndex, err := h.executeStatements(ctx, tx, currentState, statements)
|
||||
h.log().OnError(err).WithField("lastProcessedIndex", lastProcessedIndex).Debug("execution of statements failed")
|
||||
if lastProcessedIndex < 0 {
|
||||
return false, err
|
||||
}
|
||||
|
@@ -134,7 +134,6 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err
|
||||
for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{
|
||||
instanceIDFilter,
|
||||
instanceIDsFilter,
|
||||
excludedInstanceIDFilter,
|
||||
editorUserFilter,
|
||||
resourceOwnerFilter,
|
||||
positionAfterFilter,
|
||||
@@ -184,14 +183,6 @@ func eventSequenceGreaterFilter(builder *eventstore.SearchQueryBuilder, query *S
|
||||
return query.Sequence
|
||||
}
|
||||
|
||||
func excludedInstanceIDFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
|
||||
if len(builder.GetExcludedInstanceIDs()) == 0 {
|
||||
return nil
|
||||
}
|
||||
query.ExcludedInstances = NewFilter(FieldInstanceID, database.TextArray[string](builder.GetExcludedInstanceIDs()), OperationNotIn)
|
||||
return query.ExcludedInstances
|
||||
}
|
||||
|
||||
func creationDateAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
|
||||
if builder.GetCreationDateAfter().IsZero() {
|
||||
return nil
|
||||
|
@@ -19,7 +19,6 @@ type SearchQueryBuilder struct {
|
||||
resourceOwner string
|
||||
instanceID *string
|
||||
instanceIDs []string
|
||||
excludedInstanceIDs []string
|
||||
editorUser string
|
||||
queries []*SearchQuery
|
||||
tx *sql.Tx
|
||||
@@ -83,10 +82,6 @@ func (b SearchQueryBuilder) GetAwaitOpenTransactions() bool {
|
||||
return b.awaitOpenTransactions
|
||||
}
|
||||
|
||||
func (q SearchQueryBuilder) GetExcludedInstanceIDs() []string {
|
||||
return q.excludedInstanceIDs
|
||||
}
|
||||
|
||||
func (q SearchQueryBuilder) GetEventSequenceGreater() uint64 {
|
||||
return q.eventSequenceGreater
|
||||
}
|
||||
@@ -289,12 +284,6 @@ func (builder *SearchQueryBuilder) SequenceGreater(sequence uint64) *SearchQuery
|
||||
return builder
|
||||
}
|
||||
|
||||
// ExcludedInstanceID filters for events not having the given instanceIDs
|
||||
func (builder *SearchQueryBuilder) ExcludedInstanceID(instanceIDs ...string) *SearchQueryBuilder {
|
||||
builder.excludedInstanceIDs = instanceIDs
|
||||
return builder
|
||||
}
|
||||
|
||||
// CreationDateAfter filters for events which happened after the specified time
|
||||
func (builder *SearchQueryBuilder) CreationDateAfter(creationDate time.Time) *SearchQueryBuilder {
|
||||
if creationDate.IsZero() || creationDate.Unix() == 0 {
|
||||
|
Reference in New Issue
Block a user