diff --git a/internal/eventstore/spooler/spooler.go b/internal/eventstore/spooler/spooler.go index ee7882f064..9cfa1663f0 100644 --- a/internal/eventstore/spooler/spooler.go +++ b/internal/eventstore/spooler/spooler.go @@ -2,6 +2,7 @@ package spooler import ( "context" + "github.com/caos/logging" "github.com/caos/zitadel/internal/eventstore" "github.com/caos/zitadel/internal/eventstore/models" @@ -117,7 +118,7 @@ func HandleError(event *models.Event, failedErr error, if err != nil { return err } - if errorCountUntilSkip == failedEvent.FailureCount { + if errorCountUntilSkip <= failedEvent.FailureCount { return processSequence(event.Sequence) } return nil diff --git a/internal/eventstore/spooler/spooler_test.go b/internal/eventstore/spooler/spooler_test.go index 962b9e3cd9..d652dbd772 100644 --- a/internal/eventstore/spooler/spooler_test.go +++ b/internal/eventstore/spooler/spooler_test.go @@ -3,12 +3,14 @@ package spooler import ( "context" "fmt" - "github.com/caos/zitadel/internal/eventstore" - "github.com/caos/zitadel/internal/eventstore/models" - "github.com/caos/zitadel/internal/eventstore/spooler/mock" "testing" "time" + "github.com/caos/zitadel/internal/errors" + "github.com/caos/zitadel/internal/eventstore" + "github.com/caos/zitadel/internal/eventstore/models" + "github.com/caos/zitadel/internal/eventstore/spooler/mock" + "github.com/caos/zitadel/internal/view/repository" "github.com/golang/mock/gomock" ) @@ -333,3 +335,103 @@ func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration func (l *testLocker) finish() { l.ctrl.Finish() } + +func TestHandleError(t *testing.T) { + type args struct { + event *models.Event + failedErr error + latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error) + errorCountUntilSkip uint64 + } + type res struct { + wantErr bool + shouldProcessSequence bool + } + tests := []struct { + name string + args args + res res + }{ + { + name: "should process sequence already too high", + args: args{ + event: &models.Event{Sequence: 30000000}, + failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"), + latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) { + return &repository.FailedEvent{ + ErrMsg: "blub", + FailedSequence: s - 1, + FailureCount: 6, + ViewName: "super.table", + }, nil + }, + errorCountUntilSkip: 5, + }, + res: res{ + shouldProcessSequence: true, + }, + }, + { + name: "should process sequence after this event too high", + args: args{ + event: &models.Event{Sequence: 30000000}, + failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"), + latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) { + return &repository.FailedEvent{ + ErrMsg: "blub", + FailedSequence: s - 1, + FailureCount: 5, + ViewName: "super.table", + }, nil + }, + errorCountUntilSkip: 6, + }, + res: res{ + shouldProcessSequence: true, + }, + }, + { + name: "should not process sequence", + args: args{ + event: &models.Event{Sequence: 30000000}, + failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"), + latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) { + return &repository.FailedEvent{ + ErrMsg: "blub", + FailedSequence: s - 1, + FailureCount: 3, + ViewName: "super.table", + }, nil + }, + errorCountUntilSkip: 5, + }, + res: res{ + shouldProcessSequence: false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + processedSequence := false + err := HandleError( + tt.args.event, + tt.args.failedErr, + tt.args.latestFailedEvent, + func(*repository.FailedEvent) error { + return nil + }, + func(uint64) error { + processedSequence = true + return nil + }, + tt.args.errorCountUntilSkip) + + if (err != nil) != tt.res.wantErr { + t.Errorf("HandleError() error = %v, wantErr %v", err, tt.res.wantErr) + } + if tt.res.shouldProcessSequence != processedSequence { + t.Error("should not process sequence") + } + }) + } +}