mirror of
https://github.com/zitadel/zitadel.git
synced 2025-03-01 01:57:23 +00:00
fix(spooler): improve check for failure count (#326)
* fix(spooler): improve check for failure count * fix(spooler): add tests for HandleError * fix(spooler): correct test
This commit is contained in:
parent
fcdf27c683
commit
682d623343
@ -2,6 +2,7 @@ package spooler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/caos/logging"
|
"github.com/caos/logging"
|
||||||
"github.com/caos/zitadel/internal/eventstore"
|
"github.com/caos/zitadel/internal/eventstore"
|
||||||
"github.com/caos/zitadel/internal/eventstore/models"
|
"github.com/caos/zitadel/internal/eventstore/models"
|
||||||
@ -117,7 +118,7 @@ func HandleError(event *models.Event, failedErr error,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if errorCountUntilSkip == failedEvent.FailureCount {
|
if errorCountUntilSkip <= failedEvent.FailureCount {
|
||||||
return processSequence(event.Sequence)
|
return processSequence(event.Sequence)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -3,12 +3,14 @@ package spooler
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/caos/zitadel/internal/eventstore"
|
|
||||||
"github.com/caos/zitadel/internal/eventstore/models"
|
|
||||||
"github.com/caos/zitadel/internal/eventstore/spooler/mock"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/caos/zitadel/internal/errors"
|
||||||
|
"github.com/caos/zitadel/internal/eventstore"
|
||||||
|
"github.com/caos/zitadel/internal/eventstore/models"
|
||||||
|
"github.com/caos/zitadel/internal/eventstore/spooler/mock"
|
||||||
|
"github.com/caos/zitadel/internal/view/repository"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -333,3 +335,103 @@ func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration
|
|||||||
func (l *testLocker) finish() {
|
func (l *testLocker) finish() {
|
||||||
l.ctrl.Finish()
|
l.ctrl.Finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHandleError(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
event *models.Event
|
||||||
|
failedErr error
|
||||||
|
latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error)
|
||||||
|
errorCountUntilSkip uint64
|
||||||
|
}
|
||||||
|
type res struct {
|
||||||
|
wantErr bool
|
||||||
|
shouldProcessSequence bool
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
res res
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should process sequence already too high",
|
||||||
|
args: args{
|
||||||
|
event: &models.Event{Sequence: 30000000},
|
||||||
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
||||||
|
latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) {
|
||||||
|
return &repository.FailedEvent{
|
||||||
|
ErrMsg: "blub",
|
||||||
|
FailedSequence: s - 1,
|
||||||
|
FailureCount: 6,
|
||||||
|
ViewName: "super.table",
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
errorCountUntilSkip: 5,
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
shouldProcessSequence: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should process sequence after this event too high",
|
||||||
|
args: args{
|
||||||
|
event: &models.Event{Sequence: 30000000},
|
||||||
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
||||||
|
latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) {
|
||||||
|
return &repository.FailedEvent{
|
||||||
|
ErrMsg: "blub",
|
||||||
|
FailedSequence: s - 1,
|
||||||
|
FailureCount: 5,
|
||||||
|
ViewName: "super.table",
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
errorCountUntilSkip: 6,
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
shouldProcessSequence: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should not process sequence",
|
||||||
|
args: args{
|
||||||
|
event: &models.Event{Sequence: 30000000},
|
||||||
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
||||||
|
latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) {
|
||||||
|
return &repository.FailedEvent{
|
||||||
|
ErrMsg: "blub",
|
||||||
|
FailedSequence: s - 1,
|
||||||
|
FailureCount: 3,
|
||||||
|
ViewName: "super.table",
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
errorCountUntilSkip: 5,
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
shouldProcessSequence: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
processedSequence := false
|
||||||
|
err := HandleError(
|
||||||
|
tt.args.event,
|
||||||
|
tt.args.failedErr,
|
||||||
|
tt.args.latestFailedEvent,
|
||||||
|
func(*repository.FailedEvent) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
func(uint64) error {
|
||||||
|
processedSequence = true
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
tt.args.errorCountUntilSkip)
|
||||||
|
|
||||||
|
if (err != nil) != tt.res.wantErr {
|
||||||
|
t.Errorf("HandleError() error = %v, wantErr %v", err, tt.res.wantErr)
|
||||||
|
}
|
||||||
|
if tt.res.shouldProcessSequence != processedSequence {
|
||||||
|
t.Error("should not process sequence")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user