mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-15 04:18:01 +00:00
458a383de2
* fix: use current sequence for refetching of events * fix: use client ids
515 lines
13 KiB
Go
515 lines
13 KiB
Go
package spooler
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/mock/gomock"
|
|
|
|
"github.com/zitadel/zitadel/internal/errors"
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
|
|
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
|
|
"github.com/zitadel/zitadel/internal/eventstore/v1/query"
|
|
"github.com/zitadel/zitadel/internal/eventstore/v1/spooler/mock"
|
|
"github.com/zitadel/zitadel/internal/view/repository"
|
|
)
|
|
|
|
var (
|
|
testNow = time.Now()
|
|
)
|
|
|
|
type testHandler struct {
|
|
cycleDuration time.Duration
|
|
processSleep time.Duration
|
|
processError error
|
|
queryError error
|
|
viewModel string
|
|
bulkLimit uint64
|
|
maxErrCount int
|
|
}
|
|
|
|
func (h *testHandler) AggregateTypes() []models.AggregateType {
|
|
return nil
|
|
}
|
|
|
|
func (h *testHandler) CurrentSequence(ctx context.Context, instanceID string) (uint64, error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func (h *testHandler) Eventstore() v1.Eventstore {
|
|
return nil
|
|
}
|
|
|
|
func (h *testHandler) ViewModel() string {
|
|
return h.viewModel
|
|
}
|
|
|
|
func (h *testHandler) Subscription() *v1.Subscription {
|
|
return nil
|
|
}
|
|
|
|
func (h *testHandler) EventQuery(ctx context.Context, instanceIDs []string) (*models.SearchQuery, error) {
|
|
if h.queryError != nil {
|
|
return nil, h.queryError
|
|
}
|
|
return &models.SearchQuery{}, nil
|
|
}
|
|
|
|
func (h *testHandler) Reduce(*models.Event) error {
|
|
<-time.After(h.processSleep)
|
|
return h.processError
|
|
}
|
|
|
|
func (h *testHandler) OnError(event *models.Event, err error) error {
|
|
if h.maxErrCount == 2 {
|
|
return nil
|
|
}
|
|
h.maxErrCount++
|
|
return err
|
|
}
|
|
|
|
func (h *testHandler) OnSuccess([]string) error {
|
|
return nil
|
|
}
|
|
|
|
func (h *testHandler) MinimumCycleDuration() time.Duration {
|
|
return h.cycleDuration
|
|
}
|
|
|
|
func (h *testHandler) LockDuration() time.Duration {
|
|
return h.cycleDuration / 2
|
|
}
|
|
|
|
func (h *testHandler) QueryLimit() uint64 {
|
|
return h.bulkLimit
|
|
}
|
|
|
|
type eventstoreStub struct {
|
|
events []*models.Event
|
|
err error
|
|
}
|
|
|
|
func (es *eventstoreStub) Subscribe(...models.AggregateType) *v1.Subscription { return nil }
|
|
|
|
func (es *eventstoreStub) Health(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (es *eventstoreStub) AggregateCreator() *models.AggregateCreator {
|
|
return nil
|
|
}
|
|
|
|
func (es *eventstoreStub) FilterEvents(ctx context.Context, in *models.SearchQuery) ([]*models.Event, error) {
|
|
if es.err != nil {
|
|
return nil, es.err
|
|
}
|
|
return es.events, nil
|
|
}
|
|
func (es *eventstoreStub) PushAggregates(ctx context.Context, in ...*models.Aggregate) error {
|
|
return nil
|
|
}
|
|
|
|
func (es *eventstoreStub) LatestSequence(ctx context.Context, in *models.SearchQueryFactory) (uint64, error) {
|
|
return 0, nil
|
|
}
|
|
func (es *eventstoreStub) InstanceIDs(ctx context.Context, in *models.SearchQuery) ([]string, error) {
|
|
return nil, nil
|
|
}
|
|
func (es *eventstoreStub) V2() *eventstore.Eventstore {
|
|
return nil
|
|
}
|
|
|
|
func TestSpooler_process(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler *testHandler
|
|
}
|
|
type args struct {
|
|
timeout time.Duration
|
|
events []*models.Event
|
|
instanceIDs []string
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
wantErr bool
|
|
wantRetries int
|
|
}{
|
|
{
|
|
name: "process all events",
|
|
fields: fields{
|
|
currentHandler: &testHandler{},
|
|
},
|
|
args: args{
|
|
timeout: 0,
|
|
events: []*models.Event{{}, {}},
|
|
},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "deadline exeeded",
|
|
fields: fields{
|
|
currentHandler: &testHandler{processSleep: 501 * time.Millisecond},
|
|
},
|
|
args: args{
|
|
timeout: 1 * time.Second,
|
|
events: []*models.Event{{}, {}, {}, {}},
|
|
},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "process error",
|
|
fields: fields{
|
|
currentHandler: &testHandler{processSleep: 1 * time.Second, processError: fmt.Errorf("i am an error")},
|
|
},
|
|
args: args{
|
|
events: []*models.Event{{}, {}},
|
|
},
|
|
wantErr: false,
|
|
wantRetries: 2,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
var start time.Time
|
|
if tt.args.timeout > 0 {
|
|
ctx, _ = context.WithTimeout(ctx, tt.args.timeout)
|
|
start = time.Now()
|
|
}
|
|
|
|
if err := s.process(ctx, tt.args.events, "test", tt.args.instanceIDs); (err != nil) != tt.wantErr {
|
|
t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr)
|
|
}
|
|
if tt.fields.currentHandler.maxErrCount != tt.wantRetries {
|
|
t.Errorf("Spooler.process() wrong retry count got: %d want %d", tt.fields.currentHandler.maxErrCount, tt.wantRetries)
|
|
}
|
|
|
|
elapsed := time.Since(start).Round(1 * time.Second)
|
|
if tt.args.timeout != 0 && elapsed != tt.args.timeout {
|
|
t.Errorf("wrong timeout wanted %v elapsed %v since %v", tt.args.timeout, elapsed, time.Since(start))
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSpooler_awaitError(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler query.Handler
|
|
err error
|
|
canceled bool
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
}{
|
|
{
|
|
"no error",
|
|
fields{
|
|
err: nil,
|
|
currentHandler: &testHandler{processSleep: 500 * time.Millisecond},
|
|
canceled: false,
|
|
},
|
|
},
|
|
{
|
|
"with error",
|
|
fields{
|
|
err: fmt.Errorf("hodor"),
|
|
currentHandler: &testHandler{processSleep: 500 * time.Millisecond},
|
|
canceled: false,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
}
|
|
c := make(chan interface{})
|
|
errs := make(chan error)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
go func() {
|
|
s.awaitError(cancel, errs, "test")
|
|
c <- nil
|
|
}()
|
|
errs <- tt.fields.err
|
|
|
|
<-c
|
|
if ctx.Err() == nil {
|
|
t.Error("cancel function was not called")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSpooler_load checks if load terminates
|
|
func TestSpooler_load(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler query.Handler
|
|
locker *testLocker
|
|
eventstore v1.Eventstore
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
}{
|
|
{
|
|
"lock exists",
|
|
fields{
|
|
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView1", cycleDuration: 1 * time.Second, bulkLimit: 10},
|
|
locker: newTestLocker(t, "testID", "testView1").expectRenew(t, fmt.Errorf("lock already exists"), 500*time.Millisecond),
|
|
},
|
|
},
|
|
{
|
|
"lock fails",
|
|
fields{
|
|
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView2", cycleDuration: 1 * time.Second, bulkLimit: 10},
|
|
locker: newTestLocker(t, "testID", "testView2").expectRenew(t, fmt.Errorf("fail"), 500*time.Millisecond),
|
|
eventstore: &eventstoreStub{events: []*models.Event{{}}},
|
|
},
|
|
},
|
|
{
|
|
"query fails",
|
|
fields{
|
|
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView3", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10},
|
|
locker: newTestLocker(t, "testID", "testView3").expectRenew(t, nil, 500*time.Millisecond),
|
|
eventstore: &eventstoreStub{err: fmt.Errorf("fail")},
|
|
},
|
|
},
|
|
{
|
|
"process event fails",
|
|
fields{
|
|
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView4", cycleDuration: 500 * time.Millisecond, bulkLimit: 10},
|
|
locker: newTestLocker(t, "testID", "testView4").expectRenew(t, nil, 250*time.Millisecond),
|
|
eventstore: &eventstoreStub{events: []*models.Event{{}}},
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
defer tt.fields.locker.finish()
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
locker: tt.fields.locker.mock,
|
|
eventstore: tt.fields.eventstore,
|
|
}
|
|
s.load("test-worker")
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSpooler_lock(t *testing.T) {
|
|
type fields struct {
|
|
currentHandler query.Handler
|
|
locker *testLocker
|
|
expectsErr bool
|
|
}
|
|
type args struct {
|
|
deadline time.Time
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
}{
|
|
{
|
|
"renew correct",
|
|
fields{
|
|
currentHandler: &testHandler{cycleDuration: 1 * time.Second, viewModel: "testView"},
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 500*time.Millisecond),
|
|
expectsErr: false,
|
|
},
|
|
args{
|
|
deadline: time.Now().Add(1 * time.Second),
|
|
},
|
|
},
|
|
{
|
|
"renew fails",
|
|
fields{
|
|
currentHandler: &testHandler{cycleDuration: 900 * time.Millisecond, viewModel: "testView"},
|
|
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 450*time.Millisecond),
|
|
expectsErr: true,
|
|
},
|
|
args{
|
|
deadline: time.Now().Add(5 * time.Second),
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
defer tt.fields.locker.finish()
|
|
s := &spooledHandler{
|
|
Handler: tt.fields.currentHandler,
|
|
locker: tt.fields.locker.mock,
|
|
}
|
|
|
|
errs := make(chan error, 1)
|
|
defer close(errs)
|
|
ctx, _ := context.WithDeadline(context.Background(), tt.args.deadline)
|
|
|
|
locked := s.lock(ctx, errs, "test-worker")
|
|
|
|
if tt.fields.expectsErr {
|
|
lock := <-locked
|
|
err := <-errs
|
|
if err == nil {
|
|
t.Error("No error in error queue")
|
|
}
|
|
if lock {
|
|
t.Error("lock should have failed")
|
|
}
|
|
} else {
|
|
lock := <-locked
|
|
if !lock {
|
|
t.Error("lock should be true")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
type testLocker struct {
|
|
mock *mock.MockLocker
|
|
lockerID string
|
|
viewName string
|
|
ctrl *gomock.Controller
|
|
}
|
|
|
|
func newTestLocker(t *testing.T, lockerID, viewName string) *testLocker {
|
|
ctrl := gomock.NewController(t)
|
|
return &testLocker{mock.NewMockLocker(ctrl), lockerID, viewName, ctrl}
|
|
}
|
|
|
|
func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration) *testLocker {
|
|
t.Helper()
|
|
l.mock.EXPECT().Renew(gomock.Any(), l.viewName, gomock.Any(), gomock.Any()).DoAndReturn(
|
|
func(_, _, _ string, gotten time.Duration) error {
|
|
t.Helper()
|
|
if waitTime-gotten != 0 {
|
|
t.Errorf("expected waittime %v got %v", waitTime, gotten)
|
|
}
|
|
return err
|
|
}).MinTimes(1).MaxTimes(3)
|
|
|
|
return l
|
|
}
|
|
|
|
func (l *testLocker) finish() {
|
|
l.ctrl.Finish()
|
|
}
|
|
|
|
func TestHandleError(t *testing.T) {
|
|
type args struct {
|
|
event *models.Event
|
|
failedErr error
|
|
latestFailedEvent func(sequence uint64, instanceID string) (*repository.FailedEvent, error)
|
|
errorCountUntilSkip uint64
|
|
}
|
|
type res struct {
|
|
wantErr bool
|
|
shouldProcessSequence bool
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
args args
|
|
res res
|
|
}{
|
|
{
|
|
name: "should process sequence already too high",
|
|
args: args{
|
|
event: &models.Event{Sequence: 30000000},
|
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
|
latestFailedEvent: func(s uint64, instanceID string) (*repository.FailedEvent, error) {
|
|
return &repository.FailedEvent{
|
|
ErrMsg: "blub",
|
|
FailedSequence: s - 1,
|
|
FailureCount: 6,
|
|
ViewName: "super.table",
|
|
InstanceID: instanceID,
|
|
LastFailed: testNow,
|
|
}, nil
|
|
},
|
|
errorCountUntilSkip: 5,
|
|
},
|
|
res: res{
|
|
shouldProcessSequence: true,
|
|
},
|
|
},
|
|
{
|
|
name: "should process sequence after this event too high",
|
|
args: args{
|
|
event: &models.Event{Sequence: 30000000},
|
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
|
latestFailedEvent: func(s uint64, instanceID string) (*repository.FailedEvent, error) {
|
|
return &repository.FailedEvent{
|
|
ErrMsg: "blub",
|
|
FailedSequence: s - 1,
|
|
FailureCount: 5,
|
|
ViewName: "super.table",
|
|
InstanceID: instanceID,
|
|
LastFailed: testNow,
|
|
}, nil
|
|
},
|
|
errorCountUntilSkip: 6,
|
|
},
|
|
res: res{
|
|
shouldProcessSequence: true,
|
|
},
|
|
},
|
|
{
|
|
name: "should not process sequence",
|
|
args: args{
|
|
event: &models.Event{Sequence: 30000000},
|
|
failedErr: errors.ThrowInternal(nil, "SPOOL-Wk53B", "this was wrong"),
|
|
latestFailedEvent: func(s uint64, instanceID string) (*repository.FailedEvent, error) {
|
|
return &repository.FailedEvent{
|
|
ErrMsg: "blub",
|
|
FailedSequence: s - 1,
|
|
FailureCount: 3,
|
|
ViewName: "super.table",
|
|
InstanceID: instanceID,
|
|
LastFailed: testNow,
|
|
}, nil
|
|
},
|
|
errorCountUntilSkip: 5,
|
|
},
|
|
res: res{
|
|
shouldProcessSequence: false,
|
|
wantErr: true,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
processedSequence := false
|
|
err := HandleError(
|
|
tt.args.event,
|
|
tt.args.failedErr,
|
|
tt.args.latestFailedEvent,
|
|
func(*repository.FailedEvent) error {
|
|
return nil
|
|
},
|
|
func(*models.Event) error {
|
|
processedSequence = true
|
|
return nil
|
|
},
|
|
tt.args.errorCountUntilSkip)
|
|
|
|
if (err != nil) != tt.res.wantErr {
|
|
t.Errorf("HandleError() error = %v, wantErr %v", err, tt.res.wantErr)
|
|
}
|
|
if tt.res.shouldProcessSequence != processedSequence {
|
|
t.Error("should not process sequence")
|
|
}
|
|
})
|
|
}
|
|
}
|