diff --git a/internal/eventstore/spooler/spooler.go b/internal/eventstore/spooler/spooler.go index 755551a9e3..ec3b5109df 100644 --- a/internal/eventstore/spooler/spooler.go +++ b/internal/eventstore/spooler/spooler.go @@ -3,6 +3,7 @@ package spooler import ( "context" "strconv" + "sync" "github.com/caos/logging" "github.com/caos/zitadel/internal/eventstore" @@ -43,12 +44,7 @@ func (s *Spooler) Start() { go func(workerIdx int) { workerID := s.lockID + "--" + strconv.Itoa(workerIdx) for task := range s.queue { - go func(handler *spooledHandler, queue chan<- *spooledHandler) { - time.Sleep(handler.MinimumCycleDuration() - time.Since(handler.queuedAt)) - handler.queuedAt = time.Now() - queue <- handler - }(task, s.queue) - + go requeueTask(task, s.queue) task.load(workerID) } }(i) @@ -59,23 +55,20 @@ func (s *Spooler) Start() { } } +func requeueTask(task *spooledHandler, queue chan<- *spooledHandler) { + time.Sleep(task.MinimumCycleDuration() - time.Since(task.queuedAt)) + task.queuedAt = time.Now() + queue <- task +} + func (s *spooledHandler) load(workerID string) { errs := make(chan error) defer close(errs) ctx, cancel := context.WithCancel(context.Background()) go s.awaitError(cancel, errs, workerID) hasLocked := s.lock(ctx, errs, workerID) - defer close(hasLocked) if <-hasLocked { - go func() { - for l := range hasLocked { - if !l { - // we only need to break. An error is already written by the lock-routine to the errs channel - break - } - } - }() events, err := s.query(ctx) if err != nil { errs <- err @@ -110,26 +103,6 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo return nil } -func HandleError(event *models.Event, failedErr error, - latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error), - processFailedEvent func(*repository.FailedEvent) error, - processSequence func(uint64) error, errorCountUntilSkip uint64) error { - failedEvent, err := latestFailedEvent(event.Sequence) - if err != nil { - return err - } - failedEvent.FailureCount++ - failedEvent.ErrMsg = failedErr.Error() - err = processFailedEvent(failedEvent) - if err != nil { - return err - } - if errorCountUntilSkip <= failedEvent.FailureCount { - return processSequence(event.Sequence) - } - return nil -} - func (s *spooledHandler) query(ctx context.Context) ([]*models.Event, error) { query, err := s.EventQuery() if err != nil { @@ -152,33 +125,32 @@ func (s *spooledHandler) query(ctx context.Context) ([]*models.Event, error) { return s.eventstore.FilterEvents(ctx, query) } +//lock ensures the lock on the database. +// the returned channel will be closed if ctx is done or an error occured durring lock func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID string) chan bool { renewTimer := time.After(0) - renewDuration := s.MinimumCycleDuration() locked := make(chan bool) go func(locked chan bool) { + var firstLock sync.Once + defer close(locked) for { select { case <-ctx.Done(): return case <-renewTimer: - logging.Log("SPOOL-K2lst").WithField("view", s.ViewModel()).WithField("worker", workerID).Debug("renew") err := s.locker.Renew(workerID, s.ViewModel(), s.MinimumCycleDuration()*2) - logging.Log("SPOOL-u4j6k").WithField("view", s.ViewModel()).WithField("worker", workerID).WithError(err).Debug("renew done") + firstLock.Do(func() { + locked <- err == nil + }) if err == nil { - if ctx.Err() == nil { - locked <- true - renewTimer = time.After(renewDuration) - } + renewTimer = time.After(s.MinimumCycleDuration()) continue } if ctx.Err() == nil { errs <- err } - - locked <- false return } } @@ -186,3 +158,23 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s return locked } + +func HandleError(event *models.Event, failedErr error, + latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error), + processFailedEvent func(*repository.FailedEvent) error, + processSequence func(uint64) error, errorCountUntilSkip uint64) error { + failedEvent, err := latestFailedEvent(event.Sequence) + if err != nil { + return err + } + failedEvent.FailureCount++ + failedEvent.ErrMsg = failedErr.Error() + err = processFailedEvent(failedEvent) + if err != nil { + return err + } + if errorCountUntilSkip <= failedEvent.FailureCount { + return processSequence(event.Sequence) + } + return nil +} diff --git a/internal/eventstore/spooler/spooler_test.go b/internal/eventstore/spooler/spooler_test.go index eaf4e0b060..7a852ed871 100644 --- a/internal/eventstore/spooler/spooler_test.go +++ b/internal/eventstore/spooler/spooler_test.go @@ -293,15 +293,20 @@ func TestSpooler_lock(t *testing.T) { } errs := make(chan error, 1) + defer close(errs) ctx, _ := context.WithDeadline(context.Background(), tt.args.deadline) locked := s.lock(ctx, errs, "test-worker") if tt.fields.expectsErr { + lock := <-locked err := <-errs if err == nil { t.Error("No error in error queue") } + if lock { + t.Error("lock should have failed") + } } else { lock := <-locked if !lock {