mirror of
https://github.com/zitadel/zitadel.git
synced 2025-10-17 21:32:13 +00:00
fix: scheduling (#3978)
* fix: improve scheduling * build pre-release * fix: locker * fix: user handler and print stack in case of panic in reducer * chore: remove sentry * fix: improve handler projection and implement tests * more tests * fix: race condition in tests * Update internal/eventstore/repository/sql/query.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * fix: implemented suggested changes * fix: lock statement Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
@@ -2,13 +2,13 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
@@ -16,241 +16,207 @@ const systemID = "system"
|
||||
|
||||
type ProjectionHandlerConfig struct {
|
||||
HandlerConfig
|
||||
ProjectionName string
|
||||
RequeueEvery time.Duration
|
||||
RetryFailedAfter time.Duration
|
||||
ProjectionName string
|
||||
RequeueEvery time.Duration
|
||||
RetryFailedAfter time.Duration
|
||||
Retries uint
|
||||
ConcurrentInstances uint
|
||||
}
|
||||
|
||||
//Update updates the projection with the given statements
|
||||
type Update func(context.Context, []*Statement, Reduce) (unexecutedStmts []*Statement, err error)
|
||||
type Update func(context.Context, []*Statement, Reduce) (index int, err error)
|
||||
|
||||
//Reduce reduces the given event to a statement
|
||||
//which is used to update the projection
|
||||
type Reduce func(eventstore.Event) (*Statement, error)
|
||||
|
||||
//SearchQuery generates the search query to lookup for events
|
||||
type SearchQuery func(ctx context.Context, instanceIDs []string) (query *eventstore.SearchQueryBuilder, queryLimit uint64, err error)
|
||||
|
||||
//Lock is used for mutex handling if needed on the projection
|
||||
type Lock func(context.Context, time.Duration, string) <-chan error
|
||||
type Lock func(context.Context, time.Duration, ...string) <-chan error
|
||||
|
||||
//Unlock releases the mutex of the projection
|
||||
type Unlock func(string) error
|
||||
|
||||
//SearchQuery generates the search query to lookup for events
|
||||
type SearchQuery func(ctx context.Context) (query *eventstore.SearchQueryBuilder, queryLimit uint64, err error)
|
||||
type Unlock func(...string) error
|
||||
|
||||
type ProjectionHandler struct {
|
||||
Handler
|
||||
|
||||
requeueAfter time.Duration
|
||||
shouldBulk *time.Timer
|
||||
bulkMu sync.Mutex
|
||||
bulkLocked bool
|
||||
execBulk executeBulk
|
||||
|
||||
retryFailedAfter time.Duration
|
||||
shouldPush *time.Timer
|
||||
pushSet bool
|
||||
|
||||
ProjectionName string
|
||||
|
||||
lockMu sync.Mutex
|
||||
stmts []*Statement
|
||||
ProjectionName string
|
||||
reduce Reduce
|
||||
update Update
|
||||
searchQuery SearchQuery
|
||||
triggerProjection *time.Timer
|
||||
lock Lock
|
||||
unlock Unlock
|
||||
requeueAfter time.Duration
|
||||
retryFailedAfter time.Duration
|
||||
retries int
|
||||
concurrentInstances int
|
||||
}
|
||||
|
||||
func NewProjectionHandler(
|
||||
ctx context.Context,
|
||||
config ProjectionHandlerConfig,
|
||||
reduce Reduce,
|
||||
update Update,
|
||||
query SearchQuery,
|
||||
lock Lock,
|
||||
unlock Unlock,
|
||||
) *ProjectionHandler {
|
||||
concurrentInstances := int(config.ConcurrentInstances)
|
||||
if concurrentInstances < 1 {
|
||||
concurrentInstances = 1
|
||||
}
|
||||
h := &ProjectionHandler{
|
||||
Handler: NewHandler(config.HandlerConfig),
|
||||
ProjectionName: config.ProjectionName,
|
||||
requeueAfter: config.RequeueEvery,
|
||||
// first bulk is instant on startup
|
||||
shouldBulk: time.NewTimer(0),
|
||||
shouldPush: time.NewTimer(0),
|
||||
retryFailedAfter: config.RetryFailedAfter,
|
||||
Handler: NewHandler(config.HandlerConfig),
|
||||
ProjectionName: config.ProjectionName,
|
||||
reduce: reduce,
|
||||
update: update,
|
||||
searchQuery: query,
|
||||
lock: lock,
|
||||
unlock: unlock,
|
||||
requeueAfter: config.RequeueEvery,
|
||||
triggerProjection: time.NewTimer(0), // first trigger is instant on startup
|
||||
retryFailedAfter: config.RetryFailedAfter,
|
||||
retries: int(config.Retries),
|
||||
concurrentInstances: concurrentInstances,
|
||||
}
|
||||
|
||||
h.execBulk = h.prepareExecuteBulk(query, reduce, update)
|
||||
go h.subscribe(ctx)
|
||||
|
||||
//unitialized timer
|
||||
//https://github.com/golang/go/issues/12721
|
||||
<-h.shouldPush.C
|
||||
go h.schedule(ctx)
|
||||
|
||||
if config.RequeueEvery <= 0 {
|
||||
if !h.shouldBulk.Stop() {
|
||||
<-h.shouldBulk.C
|
||||
}
|
||||
logging.WithFields("projection", h.ProjectionName).Info("starting handler without requeue")
|
||||
return h
|
||||
} else if config.RequeueEvery < 500*time.Millisecond {
|
||||
logging.WithFields("projection", h.ProjectionName).Fatal("requeue every must be greater 500ms or <= 0")
|
||||
}
|
||||
logging.WithFields("projection", h.ProjectionName).Info("starting handler")
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) ResetShouldBulk() {
|
||||
if h.requeueAfter > 0 {
|
||||
h.shouldBulk.Reset(h.requeueAfter)
|
||||
//Trigger handles all events for the provided instances (or current instance from context if non specified)
|
||||
//by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit
|
||||
func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) error {
|
||||
ids := []string{authz.GetInstance(ctx).InstanceID()}
|
||||
if len(instances) > 0 {
|
||||
ids = instances
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) triggerShouldPush(after time.Duration) {
|
||||
if !h.pushSet {
|
||||
h.pushSet = true
|
||||
h.shouldPush.Reset(after)
|
||||
}
|
||||
}
|
||||
|
||||
//Process waits for several conditions:
|
||||
// if context is canceled the function gracefully shuts down
|
||||
// if an event occures it reduces the event
|
||||
// if the internal timer expires the handler will check
|
||||
// for unprocessed events on eventstore
|
||||
func (h *ProjectionHandler) Process(
|
||||
ctx context.Context,
|
||||
reduce Reduce,
|
||||
update Update,
|
||||
lock Lock,
|
||||
unlock Unlock,
|
||||
query SearchQuery,
|
||||
) {
|
||||
//handle panic
|
||||
defer func() {
|
||||
cause := recover()
|
||||
logging.WithFields("projection", h.ProjectionName, "cause", cause, "stack", string(debug.Stack())).Error("projection handler paniced")
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if h.pushSet {
|
||||
h.push(context.Background(), update, reduce)
|
||||
events, hasLimitExceeded, err := h.FetchEvents(ctx, ids...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(events) == 0 {
|
||||
return nil
|
||||
}
|
||||
_, err = h.Process(ctx, events...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !hasLimitExceeded {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Process handles multiple events by reducing them to statements and updating the projection
|
||||
func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Event) (index int, err error) {
|
||||
if len(events) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
index = -1
|
||||
statements := make([]*Statement, len(events))
|
||||
for i, event := range events {
|
||||
statements[i], err = h.reduce(event)
|
||||
if err != nil {
|
||||
return index, err
|
||||
}
|
||||
}
|
||||
for retry := 0; retry <= h.retries; retry++ {
|
||||
index, err = h.update(ctx, statements[index+1:], h.reduce)
|
||||
if err != nil && !errors.Is(err, ErrSomeStmtsFailed) {
|
||||
return index, err
|
||||
}
|
||||
if err == nil {
|
||||
return index, nil
|
||||
}
|
||||
time.Sleep(h.retryFailedAfter)
|
||||
}
|
||||
return index, err
|
||||
}
|
||||
|
||||
//FetchEvents checks the current sequences and filters for newer events
|
||||
func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error) {
|
||||
eventQuery, eventsLimit, err := h.searchQuery(ctx, instances)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
events, err := h.Eventstore.Filter(ctx, eventQuery)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return events, int(eventsLimit) == len(events), err
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) subscribe(ctx context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err != nil {
|
||||
h.Handler.Unsubscribe()
|
||||
logging.WithFields("projection", h.ProjectionName).Errorf("subscription panicked: %v", err)
|
||||
}
|
||||
cancel()
|
||||
}()
|
||||
for firstEvent := range h.EventQueue {
|
||||
events := checkAdditionalEvents(h.EventQueue, firstEvent)
|
||||
|
||||
index, err := h.Process(ctx, events...)
|
||||
if err != nil || index < len(events)-1 {
|
||||
logging.WithFields("projection", h.ProjectionName).WithError(err).Error("unable to process all events from subscription")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) schedule(ctx context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName, "cause", err, "stack", string(debug.Stack())).Error("schedule panicked")
|
||||
}
|
||||
cancel()
|
||||
}()
|
||||
for range h.triggerProjection.C {
|
||||
ids, err := h.Eventstore.InstanceIDs(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).AddQuery().ExcludedInstanceID("").Builder())
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName).WithError(err).Error("instance ids")
|
||||
h.triggerProjection.Reset(h.requeueAfter)
|
||||
continue
|
||||
}
|
||||
for i := 0; i < len(ids); i = i + h.concurrentInstances {
|
||||
max := i + h.concurrentInstances
|
||||
if max > len(ids) {
|
||||
max = len(ids)
|
||||
}
|
||||
h.shutdown()
|
||||
return
|
||||
case event := <-h.EventQueue:
|
||||
if err := h.processEvent(ctx, event, reduce); err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("process failed")
|
||||
instances := ids[i:max]
|
||||
lockCtx, cancelLock := context.WithCancel(ctx)
|
||||
errs := h.lock(lockCtx, h.requeueAfter, instances...)
|
||||
//wait until projection is locked
|
||||
if err, ok := <-errs; err != nil || !ok {
|
||||
cancelLock()
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed")
|
||||
continue
|
||||
}
|
||||
h.triggerShouldPush(0)
|
||||
case <-h.shouldBulk.C:
|
||||
h.bulkMu.Lock()
|
||||
h.bulkLocked = true
|
||||
h.bulk(ctx, lock, unlock)
|
||||
h.ResetShouldBulk()
|
||||
h.bulkLocked = false
|
||||
h.bulkMu.Unlock()
|
||||
default:
|
||||
//lower prio select with push
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if h.pushSet {
|
||||
h.push(context.Background(), update, reduce)
|
||||
}
|
||||
h.shutdown()
|
||||
return
|
||||
case event := <-h.EventQueue:
|
||||
if err := h.processEvent(ctx, event, reduce); err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("process failed")
|
||||
continue
|
||||
}
|
||||
h.triggerShouldPush(0)
|
||||
case <-h.shouldBulk.C:
|
||||
h.bulkMu.Lock()
|
||||
h.bulkLocked = true
|
||||
h.bulk(ctx, lock, unlock)
|
||||
h.ResetShouldBulk()
|
||||
h.bulkLocked = false
|
||||
h.bulkMu.Unlock()
|
||||
case <-h.shouldPush.C:
|
||||
h.push(ctx, update, reduce)
|
||||
h.ResetShouldBulk()
|
||||
go h.cancelOnErr(lockCtx, errs, cancelLock)
|
||||
err = h.Trigger(lockCtx, instances...)
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed")
|
||||
}
|
||||
|
||||
cancelLock()
|
||||
unlockErr := h.unlock(instances...)
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock")
|
||||
}
|
||||
h.triggerProjection.Reset(h.requeueAfter)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) processEvent(
|
||||
ctx context.Context,
|
||||
event eventstore.Event,
|
||||
reduce Reduce,
|
||||
) error {
|
||||
stmt, err := reduce(event)
|
||||
if err != nil {
|
||||
logging.New().WithError(err).Warn("unable to process event")
|
||||
return err
|
||||
}
|
||||
|
||||
h.lockMu.Lock()
|
||||
defer h.lockMu.Unlock()
|
||||
|
||||
h.stmts = append(h.stmts, stmt)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) TriggerBulk(
|
||||
ctx context.Context,
|
||||
lock Lock,
|
||||
unlock Unlock,
|
||||
) error {
|
||||
if !h.shouldBulk.Stop() {
|
||||
//make sure to flush shouldBulk chan
|
||||
select {
|
||||
case <-h.shouldBulk.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
defer h.ResetShouldBulk()
|
||||
|
||||
h.bulkMu.Lock()
|
||||
if h.bulkLocked {
|
||||
logging.WithFields("projection", h.ProjectionName).Debugf("waiting for existing bulk to finish")
|
||||
h.bulkMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
h.bulkLocked = true
|
||||
defer func() {
|
||||
h.bulkLocked = false
|
||||
h.bulkMu.Unlock()
|
||||
}()
|
||||
|
||||
return h.bulk(ctx, lock, unlock)
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) bulk(
|
||||
ctx context.Context,
|
||||
lock Lock,
|
||||
unlock Unlock,
|
||||
) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
errs := lock(ctx, h.requeueAfter, systemID)
|
||||
//wait until projection is locked
|
||||
if err, ok := <-errs; err != nil || !ok {
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed")
|
||||
return err
|
||||
}
|
||||
go h.cancelOnErr(ctx, errs, cancel)
|
||||
|
||||
execErr := h.execBulk(ctx)
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(execErr).Warn("unable to execute")
|
||||
|
||||
unlockErr := unlock(systemID)
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock")
|
||||
|
||||
if execErr != nil {
|
||||
return execErr
|
||||
}
|
||||
|
||||
return unlockErr
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) cancelOnErr(ctx context.Context, errs <-chan error, cancel func()) {
|
||||
for {
|
||||
select {
|
||||
@@ -268,98 +234,15 @@ func (h *ProjectionHandler) cancelOnErr(ctx context.Context, errs <-chan error,
|
||||
}
|
||||
}
|
||||
|
||||
type executeBulk func(ctx context.Context) error
|
||||
|
||||
func (h *ProjectionHandler) prepareExecuteBulk(
|
||||
query SearchQuery,
|
||||
reduce Reduce,
|
||||
update Update,
|
||||
) executeBulk {
|
||||
return func(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
hasLimitExeeded, err := h.fetchBulkStmts(ctx, query, reduce)
|
||||
if err != nil || len(h.stmts) == 0 {
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("unable to fetch stmts")
|
||||
return err
|
||||
}
|
||||
|
||||
if err = h.push(ctx, update, reduce); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !hasLimitExeeded {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
func checkAdditionalEvents(eventQueue chan eventstore.Event, event eventstore.Event) []eventstore.Event {
|
||||
events := make([]eventstore.Event, 1)
|
||||
events[0] = event
|
||||
for {
|
||||
select {
|
||||
case event := <-eventQueue:
|
||||
events = append(events, event)
|
||||
default:
|
||||
return events
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) fetchBulkStmts(
|
||||
ctx context.Context,
|
||||
query SearchQuery,
|
||||
reduce Reduce,
|
||||
) (limitExeeded bool, err error) {
|
||||
eventQuery, eventsLimit, err := query(ctx)
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("unable to create event query")
|
||||
return false, err
|
||||
}
|
||||
|
||||
events, err := h.Eventstore.Filter(ctx, eventQuery)
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName).WithError(err).Info("Unable to bulk fetch events")
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
if err = h.processEvent(ctx, event, reduce); err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName, "sequence", event.Sequence(), "instanceID", event.Aggregate().InstanceID).WithError(err).Warn("unable to process event in bulk")
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
return len(events) == int(eventsLimit), nil
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) push(
|
||||
ctx context.Context,
|
||||
update Update,
|
||||
reduce Reduce,
|
||||
) (err error) {
|
||||
h.lockMu.Lock()
|
||||
defer h.lockMu.Unlock()
|
||||
|
||||
sort.Slice(h.stmts, func(i, j int) bool {
|
||||
return h.stmts[i].Sequence < h.stmts[j].Sequence
|
||||
})
|
||||
|
||||
h.stmts, err = update(ctx, h.stmts, reduce)
|
||||
h.pushSet = len(h.stmts) > 0
|
||||
|
||||
if h.pushSet {
|
||||
h.triggerShouldPush(h.retryFailedAfter)
|
||||
return nil
|
||||
}
|
||||
|
||||
h.shouldPush.Stop()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) shutdown() {
|
||||
h.lockMu.Lock()
|
||||
defer h.lockMu.Unlock()
|
||||
h.Sub.Unsubscribe()
|
||||
if !h.shouldBulk.Stop() {
|
||||
<-h.shouldBulk.C
|
||||
}
|
||||
if !h.shouldPush.Stop() {
|
||||
<-h.shouldPush.C
|
||||
}
|
||||
logging.New().Info("stop processing")
|
||||
}
|
||||
|
Reference in New Issue
Block a user