mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-07 15:07:40 +00:00
682d623343
* fix(spooler): improve check for failure count * fix(spooler): add tests for HandleError * fix(spooler): correct test
438 lines
11 KiB
Go
438 lines
11 KiB
Go
package spooler
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/caos/zitadel/internal/errors"
|
|
"github.com/caos/zitadel/internal/eventstore"
|
|
"github.com/caos/zitadel/internal/eventstore/models"
|
|
"github.com/caos/zitadel/internal/eventstore/spooler/mock"
|
|
"github.com/caos/zitadel/internal/view/repository"
|
|
"github.com/golang/mock/gomock"
|
|
)
|
|
|
|
type testHandler struct {
|
|
cycleDuration time.Duration
|
|
processSleep time.Duration
|
|
processError error
|
|
queryError error
|
|
viewModel string
|
|
}
|
|
|
|
func (h *testHandler) ViewModel() string {
|
|
return h.viewModel
|
|
}
|
|
func (h *testHandler) EventQuery() (*models.SearchQuery, error) {
|
|
return nil, h.queryError
|
|
}
|
|
func (h *testHandler) Reduce(*models.Event) error {
|
|
<-time.After(h.processSleep)
|
|
return h.processError
|
|
}
|
|
func (h *testHandler) OnError(event *models.Event, err error) error {
|
|
return err
|
|
}
|
|
func (h *testHandler) MinimumCycleDuration() time.Duration { return h.cycleDuration }
|
|
|
|
type eventstoreStub struct {
|
|
events []*models.Event
|
|
err error
|
|
}
|
|
|
|
func (es *eventstoreStub) Health(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (es *eventstoreStub) AggregateCreator() *models.AggregateCreator {
|
|
return nil
|
|
}
|
|
|
|
func (es *eventstoreStub) FilterEvents(ctx context.Context, in *models.SearchQuery) ([]*models.Event, error) {
|
|
if es.err != nil {
|
|
return nil, es.err
|
|
}
|
|
return es.events, nil
|
|
}
|
|
func (es *eventstoreStub) PushAggregates(ctx context.Context, in ...*models.Aggregate) error {
|
|
return nil
|
|
}
|
|
|
|
func TestSpooler_process(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler Handler
|
|
}
|
|
type args struct {
|
|
timeout time.Duration
|
|
events []*models.Event
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
wantErr bool
|
|
}{
|
|
{
|
|
name: "process all events",
|
|
fields: fields{
|
|
currentHandler: &testHandler{},
|
|
},
|
|
args: args{
|
|
timeout: 0,
|
|
events: []*models.Event{&models.Event{}, &models.Event{}},
|
|
},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "deadline exeeded",
|
|
fields: fields{
|
|
currentHandler: &testHandler{processSleep: 501 * time.Millisecond},
|
|
},
|
|
args: args{
|
|
timeout: 1 * time.Second,
|
|
events: []*models.Event{&models.Event{}, &models.Event{}, &models.Event{}, &models.Event{}},
|
|
},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "process error",
|
|
fields: fields{
|
|
currentHandler: &testHandler{processSleep: 1 * time.Second, processError: fmt.Errorf("i am an error")},
|
|
},
|
|
args: args{
|
|
events: []*models.Event{&models.Event{}, &models.Event{}},
|
|
},
|
|
wantErr: true,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
var start time.Time
|
|
if tt.args.timeout > 0 {
|
|
ctx, _ = context.WithTimeout(ctx, tt.args.timeout)
|
|
start = time.Now()
|
|
}
|
|
|
|
if err := s.process(ctx, tt.args.events); (err != nil) != tt.wantErr {
|
|
t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr)
|
|
}
|
|
|
|
elapsed := time.Since(start).Round(1 * time.Second)
|
|
if tt.args.timeout != 0 && elapsed != tt.args.timeout {
|
|
t.Errorf("wrong timeout wanted %v elapsed %v since %v", tt.args.timeout, elapsed, time.Since(start))
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSpooler_awaitError(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler Handler
|
|
err error
|
|
canceled bool
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
}{
|
|
{
|
|
"no error",
|
|
fields{
|
|
err: nil,
|
|
currentHandler: &testHandler{processSleep: 500 * time.Millisecond},
|
|
canceled: false,
|
|
},
|
|
},
|
|
{
|
|
"with error",
|
|
fields{
|
|
err: fmt.Errorf("hodor"),
|
|
currentHandler: &testHandler{processSleep: 500 * time.Millisecond},
|
|
canceled: false,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
}
|
|
errs := make(chan error)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
go s.awaitError(cancel, errs)
|
|
errs <- tt.fields.err
|
|
|
|
if ctx.Err() == nil {
|
|
t.Error("cancel function was not called")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSpooler_load checks if load terminates
|
|
func TestSpooler_load(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler Handler
|
|
locker *testLocker
|
|
lockID string
|
|
eventstore eventstore.Eventstore
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
}{
|
|
{
|
|
"lock exists",
|
|
fields{
|
|
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
|
|
lockID: "testID",
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("lock already exists"), 2000*time.Millisecond),
|
|
},
|
|
},
|
|
{
|
|
"lock fails",
|
|
fields{
|
|
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
|
|
lockID: "testID",
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("fail"), 2000*time.Millisecond),
|
|
eventstore: &eventstoreStub{events: []*models.Event{&models.Event{}}},
|
|
},
|
|
},
|
|
{
|
|
"query fails",
|
|
fields{
|
|
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second},
|
|
lockID: "testID",
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
|
|
eventstore: &eventstoreStub{err: fmt.Errorf("fail")},
|
|
},
|
|
},
|
|
{
|
|
"process event fails",
|
|
fields{
|
|
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond},
|
|
lockID: "testID",
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond),
|
|
eventstore: &eventstoreStub{events: []*models.Event{&models.Event{}}},
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
defer tt.fields.locker.finish()
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
locker: tt.fields.locker.mock,
|
|
lockID: tt.fields.lockID,
|
|
eventstore: tt.fields.eventstore,
|
|
}
|
|
s.load()
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSpooler_lock(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler Handler
|
|
locker *testLocker
|
|
lockID string
|
|
expectsErr bool
|
|
}
|
|
type args struct {
|
|
deadline time.Time
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
}{
|
|
{
|
|
"renew correct",
|
|
fields{
|
|
currentHandler: &testHandler{cycleDuration: 1 * time.Second, viewModel: "testView"},
|
|
lockID: "testID",
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
|
|
expectsErr: false,
|
|
},
|
|
args{
|
|
deadline: time.Now().Add(1 * time.Second),
|
|
},
|
|
},
|
|
{
|
|
"renew fails",
|
|
fields{
|
|
currentHandler: &testHandler{cycleDuration: 900 * time.Millisecond, viewModel: "testView"},
|
|
lockID: "testID",
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 1800*time.Millisecond),
|
|
expectsErr: true,
|
|
},
|
|
args{
|
|
deadline: time.Now().Add(5 * time.Second),
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
defer tt.fields.locker.finish()
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
locker: tt.fields.locker.mock,
|
|
lockID: tt.fields.lockID,
|
|
}
|
|
|
|
errs := make(chan error, 1)
|
|
ctx, _ := context.WithDeadline(context.Background(), tt.args.deadline)
|
|
|
|
locked := s.lock(ctx, errs)
|
|
|
|
if tt.fields.expectsErr {
|
|
err := <-errs
|
|
if err == nil {
|
|
t.Error("No error in error queue")
|
|
}
|
|
} else {
|
|
lock := <-locked
|
|
if !lock {
|
|
t.Error("lock should be true")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
type testLocker struct {
|
|
mock *mock.MockLocker
|
|
lockerID string
|
|
viewName string
|
|
ctrl *gomock.Controller
|
|
}
|
|
|
|
func newTestLocker(t *testing.T, lockerID, viewName string) *testLocker {
|
|
ctrl := gomock.NewController(t)
|
|
return &testLocker{mock.NewMockLocker(ctrl), lockerID, viewName, ctrl}
|
|
}
|
|
|
|
func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration) *testLocker {
|
|
l.mock.EXPECT().Renew(l.lockerID, l.viewName, gomock.Any()).DoAndReturn(
|
|
func(_, _ string, gotten time.Duration) error {
|
|
if waitTime-gotten != 0 {
|
|
t.Errorf("expected waittime %v got %v", waitTime, gotten)
|
|
}
|
|
return err
|
|
}).Times(1)
|
|
|
|
return l
|
|
}
|
|
|
|
func (l *testLocker) finish() {
|
|
l.ctrl.Finish()
|
|
}
|
|
|
|
func TestHandleError(t *testing.T) {
|
|
type args struct {
|
|
event *models.Event
|
|
failedErr error
|
|
latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error)
|
|
errorCountUntilSkip uint64
|
|
}
|
|
type res struct {
|
|
wantErr bool
|
|
shouldProcessSequence bool
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
args args
|
|
res res
|
|
}{
|
|
{
|
|
name: "should process sequence already too high",
|
|
args: args{
|
|
event: &models.Event{Sequence: 30000000},
|
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
|
latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) {
|
|
return &repository.FailedEvent{
|
|
ErrMsg: "blub",
|
|
FailedSequence: s - 1,
|
|
FailureCount: 6,
|
|
ViewName: "super.table",
|
|
}, nil
|
|
},
|
|
errorCountUntilSkip: 5,
|
|
},
|
|
res: res{
|
|
shouldProcessSequence: true,
|
|
},
|
|
},
|
|
{
|
|
name: "should process sequence after this event too high",
|
|
args: args{
|
|
event: &models.Event{Sequence: 30000000},
|
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
|
latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) {
|
|
return &repository.FailedEvent{
|
|
ErrMsg: "blub",
|
|
FailedSequence: s - 1,
|
|
FailureCount: 5,
|
|
ViewName: "super.table",
|
|
}, nil
|
|
},
|
|
errorCountUntilSkip: 6,
|
|
},
|
|
res: res{
|
|
shouldProcessSequence: true,
|
|
},
|
|
},
|
|
{
|
|
name: "should not process sequence",
|
|
args: args{
|
|
event: &models.Event{Sequence: 30000000},
|
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
|
latestFailedEvent: func(s uint64) (*repository.FailedEvent, error) {
|
|
return &repository.FailedEvent{
|
|
ErrMsg: "blub",
|
|
FailedSequence: s - 1,
|
|
FailureCount: 3,
|
|
ViewName: "super.table",
|
|
}, nil
|
|
},
|
|
errorCountUntilSkip: 5,
|
|
},
|
|
res: res{
|
|
shouldProcessSequence: false,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
processedSequence := false
|
|
err := HandleError(
|
|
tt.args.event,
|
|
tt.args.failedErr,
|
|
tt.args.latestFailedEvent,
|
|
func(*repository.FailedEvent) error {
|
|
return nil
|
|
},
|
|
func(uint64) error {
|
|
processedSequence = true
|
|
return nil
|
|
},
|
|
tt.args.errorCountUntilSkip)
|
|
|
|
if (err != nil) != tt.res.wantErr {
|
|
t.Errorf("HandleError() error = %v, wantErr %v", err, tt.res.wantErr)
|
|
}
|
|
if tt.res.shouldProcessSequence != processedSequence {
|
|
t.Error("should not process sequence")
|
|
}
|
|
})
|
|
}
|
|
}
|