From a6c4702b8ed651beb4d37dacb02b81cb1e3f898d Mon Sep 17 00:00:00 2001 From: Silvan Date: Tue, 22 Dec 2020 12:27:55 +0100 Subject: [PATCH] fix: lock again (#1132) * start sub * start implement subsciptions * start subscription * implementation for member done * admin done * fix: tests * extend handlers * prepary notification * no errors in adminapi * changed current sequence in all packages * ignore mocks * works * subscriptions as singleton * tests * refactor: rename function scope var * fix: process ALL previous sequences * fix: spooler and pubsub * handler check * fix: process events until all done * fix break on query err * fix: handler * fix: process sequence or return error * check aggregate id * fix: log only in error case * fix tests * fix: handlers * fix: spooler * fix: spooler * fix: tests * fix: continue * fix: locker duration * fix: variable lock duration * fix: test * fix: test * fix: test min max time Co-authored-by: Livio Amstutz --- .../eventsourcing/handler/handler.go | 4 +++ .../repository/eventsourcing/spooler/lock.go | 14 ++------ .../eventsourcing/handler/handler.go | 4 +++ .../repository/eventsourcing/spooler/lock.go | 7 +++- .../eventsourcing/handler/handler.go | 9 +++-- .../repository/eventsourcing/spooler/lock.go | 3 +- internal/eventstore/query/handler.go | 1 + internal/eventstore/spooler/spooler.go | 4 +-- internal/eventstore/spooler/spooler_test.go | 34 +++++++++++++------ .../eventsourcing/handler/handler.go | 4 +++ .../repository/eventsourcing/spooler/lock.go | 3 +- .../eventsourcing/handler/handler.go | 4 +++ .../repository/eventsourcing/spooler/lock.go | 3 +- 13 files changed, 63 insertions(+), 31 deletions(-) diff --git a/internal/admin/repository/eventsourcing/handler/handler.go b/internal/admin/repository/eventsourcing/handler/handler.go index 60dcb9dac1..e6fdc3e531 100644 --- a/internal/admin/repository/eventsourcing/handler/handler.go +++ b/internal/admin/repository/eventsourcing/handler/handler.go @@ -95,6 +95,10 @@ func (h *handler) MinimumCycleDuration() time.Duration { return h.cycleDuration } +func (h *handler) LockDuration() time.Duration { + return h.cycleDuration / 3 +} + func (h *handler) QueryLimit() uint64 { return h.bulkLimit } diff --git a/internal/admin/repository/eventsourcing/spooler/lock.go b/internal/admin/repository/eventsourcing/spooler/lock.go index 0c04a3bf07..f07b190c1c 100644 --- a/internal/admin/repository/eventsourcing/spooler/lock.go +++ b/internal/admin/repository/eventsourcing/spooler/lock.go @@ -2,26 +2,18 @@ package spooler import ( "database/sql" + es_locker "github.com/caos/zitadel/internal/eventstore/locker" "time" ) const ( - lockTable = "adminapi.locks" - lockedUntilKey = "locked_until" - lockerIDKey = "locker_id" - objectTypeKey = "object_type" + lockTable = "adminapi.locks" ) type locker struct { dbClient *sql.DB } -type lock struct { - LockerID string `gorm:"column:locker_id;primary_key"` - LockedUntil time.Time `gorm:"column:locked_until"` - ViewName string `gorm:"column:object_type;primary_key"` -} - func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error { - return nil + return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime) } diff --git a/internal/auth/repository/eventsourcing/handler/handler.go b/internal/auth/repository/eventsourcing/handler/handler.go index 0235304ce7..c5825c5abe 100644 --- a/internal/auth/repository/eventsourcing/handler/handler.go +++ b/internal/auth/repository/eventsourcing/handler/handler.go @@ -108,6 +108,10 @@ func (h *handler) MinimumCycleDuration() time.Duration { return h.cycleDuration } +func (h *handler) LockDuration() time.Duration { + return h.cycleDuration / 3 +} + func (h *handler) QueryLimit() uint64 { return h.bulkLimit } diff --git a/internal/auth/repository/eventsourcing/spooler/lock.go b/internal/auth/repository/eventsourcing/spooler/lock.go index 5fdbc5f0ae..f24f98f6cc 100644 --- a/internal/auth/repository/eventsourcing/spooler/lock.go +++ b/internal/auth/repository/eventsourcing/spooler/lock.go @@ -2,13 +2,18 @@ package spooler import ( "database/sql" + es_locker "github.com/caos/zitadel/internal/eventstore/locker" "time" ) +const ( + lockTable = "management.locks" +) + type locker struct { dbClient *sql.DB } func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error { - return nil + return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime) } diff --git a/internal/authz/repository/eventsourcing/handler/handler.go b/internal/authz/repository/eventsourcing/handler/handler.go index 2d5f816c97..ee21d1b020 100644 --- a/internal/authz/repository/eventsourcing/handler/handler.go +++ b/internal/authz/repository/eventsourcing/handler/handler.go @@ -3,13 +3,12 @@ package handler import ( "time" + "github.com/caos/zitadel/internal/authz/repository/eventsourcing/view" sd "github.com/caos/zitadel/internal/config/systemdefaults" + "github.com/caos/zitadel/internal/config/types" "github.com/caos/zitadel/internal/eventstore" "github.com/caos/zitadel/internal/eventstore/query" iam_events "github.com/caos/zitadel/internal/iam/repository/eventsourcing" - - "github.com/caos/zitadel/internal/authz/repository/eventsourcing/view" - "github.com/caos/zitadel/internal/config/types" ) type Configs map[string]*Config @@ -60,6 +59,10 @@ func (h *handler) MinimumCycleDuration() time.Duration { return h.cycleDuration } +func (h *handler) LockDuration() time.Duration { + return h.cycleDuration / 3 +} + func (h *handler) QueryLimit() uint64 { return h.bulkLimit } diff --git a/internal/authz/repository/eventsourcing/spooler/lock.go b/internal/authz/repository/eventsourcing/spooler/lock.go index 0397382276..6100f470d3 100644 --- a/internal/authz/repository/eventsourcing/spooler/lock.go +++ b/internal/authz/repository/eventsourcing/spooler/lock.go @@ -2,6 +2,7 @@ package spooler import ( "database/sql" + es_locker "github.com/caos/zitadel/internal/eventstore/locker" "time" ) @@ -14,5 +15,5 @@ type locker struct { } func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error { - return nil + return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime) } diff --git a/internal/eventstore/query/handler.go b/internal/eventstore/query/handler.go index 84a79d3079..622a69156e 100755 --- a/internal/eventstore/query/handler.go +++ b/internal/eventstore/query/handler.go @@ -20,6 +20,7 @@ type Handler interface { OnError(event *models.Event, err error) error OnSuccess() error MinimumCycleDuration() time.Duration + LockDuration() time.Duration QueryLimit() uint64 AggregateTypes() []models.AggregateType diff --git a/internal/eventstore/spooler/spooler.go b/internal/eventstore/spooler/spooler.go index b5c844e482..40d124a6ed 100644 --- a/internal/eventstore/spooler/spooler.go +++ b/internal/eventstore/spooler/spooler.go @@ -160,12 +160,12 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s case <-ctx.Done(): return case <-renewTimer: - err := s.locker.Renew(workerID, s.ViewModel(), s.MinimumCycleDuration()*2) + err := s.locker.Renew(workerID, s.ViewModel(), s.LockDuration()) firstLock.Do(func() { locked <- err == nil }) if err == nil { - renewTimer = time.After(s.MinimumCycleDuration()) + renewTimer = time.After(s.LockDuration()) continue } diff --git a/internal/eventstore/spooler/spooler_test.go b/internal/eventstore/spooler/spooler_test.go index 8192e4b908..5b02977ea1 100644 --- a/internal/eventstore/spooler/spooler_test.go +++ b/internal/eventstore/spooler/spooler_test.go @@ -40,16 +40,19 @@ func (h *testHandler) Eventstore() eventstore.Eventstore { func (h *testHandler) ViewModel() string { return h.viewModel } + func (h *testHandler) EventQuery() (*models.SearchQuery, error) { if h.queryError != nil { return nil, h.queryError } return &models.SearchQuery{}, nil } + func (h *testHandler) Reduce(*models.Event) error { <-time.After(h.processSleep) return h.processError } + func (h *testHandler) OnError(event *models.Event, err error) error { if h.maxErrCount == 2 { return nil @@ -57,12 +60,19 @@ func (h *testHandler) OnError(event *models.Event, err error) error { h.maxErrCount++ return err } + func (h *testHandler) OnSuccess() error { return nil } + func (h *testHandler) MinimumCycleDuration() time.Duration { return h.cycleDuration } + +func (h *testHandler) LockDuration() time.Duration { + return h.cycleDuration / 2 +} + func (h *testHandler) QueryLimit() uint64 { return h.bulkLimit } @@ -232,31 +242,31 @@ func TestSpooler_load(t *testing.T) { { "lock exists", fields{ - currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10}, - locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("lock already exists"), 2000*time.Millisecond), + currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView1", cycleDuration: 1 * time.Second, bulkLimit: 10}, + locker: newTestLocker(t, "testID", "testView1").expectRenew(t, fmt.Errorf("lock already exists"), 500*time.Millisecond), }, }, { "lock fails", fields{ - currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10}, - locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("fail"), 2000*time.Millisecond), + currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView2", cycleDuration: 1 * time.Second, bulkLimit: 10}, + locker: newTestLocker(t, "testID", "testView2").expectRenew(t, fmt.Errorf("fail"), 500*time.Millisecond), eventstore: &eventstoreStub{events: []*models.Event{{}}}, }, }, { "query fails", fields{ - currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10}, - locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond), + currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView3", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10}, + locker: newTestLocker(t, "testID", "testView3").expectRenew(t, nil, 500*time.Millisecond), eventstore: &eventstoreStub{err: fmt.Errorf("fail")}, }, }, { "process event fails", fields{ - currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond, bulkLimit: 10}, - locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond).expectRenew(t, nil, 1000*time.Millisecond), + currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView4", cycleDuration: 500 * time.Millisecond, bulkLimit: 10}, + locker: newTestLocker(t, "testID", "testView4").expectRenew(t, nil, 250*time.Millisecond), eventstore: &eventstoreStub{events: []*models.Event{{}}}, }, }, @@ -292,7 +302,7 @@ func TestSpooler_lock(t *testing.T) { "renew correct", fields{ currentHandler: &testHandler{cycleDuration: 1 * time.Second, viewModel: "testView"}, - locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond), + locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 500*time.Millisecond), expectsErr: false, }, args{ @@ -303,7 +313,7 @@ func TestSpooler_lock(t *testing.T) { "renew fails", fields{ currentHandler: &testHandler{cycleDuration: 900 * time.Millisecond, viewModel: "testView"}, - locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 1800*time.Millisecond), + locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 450*time.Millisecond), expectsErr: true, }, args{ @@ -357,13 +367,15 @@ func newTestLocker(t *testing.T, lockerID, viewName string) *testLocker { } func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration) *testLocker { + t.Helper() l.mock.EXPECT().Renew(gomock.Any(), l.viewName, gomock.Any()).DoAndReturn( func(_, _ string, gotten time.Duration) error { + t.Helper() if waitTime-gotten != 0 { t.Errorf("expected waittime %v got %v", waitTime, gotten) } return err - }).Times(1) + }).MinTimes(1).MaxTimes(3) return l } diff --git a/internal/management/repository/eventsourcing/handler/handler.go b/internal/management/repository/eventsourcing/handler/handler.go index 56ea997f2f..addd360caf 100644 --- a/internal/management/repository/eventsourcing/handler/handler.go +++ b/internal/management/repository/eventsourcing/handler/handler.go @@ -118,6 +118,10 @@ func (h *handler) MinimumCycleDuration() time.Duration { return h.cycleDuration } +func (h *handler) LockDuration() time.Duration { + return h.cycleDuration / 3 +} + func (h *handler) QueryLimit() uint64 { return h.bulkLimit } diff --git a/internal/management/repository/eventsourcing/spooler/lock.go b/internal/management/repository/eventsourcing/spooler/lock.go index 9dd53a4cc3..f24f98f6cc 100644 --- a/internal/management/repository/eventsourcing/spooler/lock.go +++ b/internal/management/repository/eventsourcing/spooler/lock.go @@ -2,6 +2,7 @@ package spooler import ( "database/sql" + es_locker "github.com/caos/zitadel/internal/eventstore/locker" "time" ) @@ -14,5 +15,5 @@ type locker struct { } func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error { - return nil + return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime) } diff --git a/internal/notification/repository/eventsourcing/handler/handler.go b/internal/notification/repository/eventsourcing/handler/handler.go index aaf553ebdf..eb8035ed88 100644 --- a/internal/notification/repository/eventsourcing/handler/handler.go +++ b/internal/notification/repository/eventsourcing/handler/handler.go @@ -77,6 +77,10 @@ func (h *handler) MinimumCycleDuration() time.Duration { return h.cycleDuration } +func (h *handler) LockDuration() time.Duration { + return h.cycleDuration / 3 +} + func (h *handler) QueryLimit() uint64 { return h.bulkLimit } diff --git a/internal/notification/repository/eventsourcing/spooler/lock.go b/internal/notification/repository/eventsourcing/spooler/lock.go index fe14ec49fc..46325b1495 100644 --- a/internal/notification/repository/eventsourcing/spooler/lock.go +++ b/internal/notification/repository/eventsourcing/spooler/lock.go @@ -2,6 +2,7 @@ package spooler import ( "database/sql" + es_locker "github.com/caos/zitadel/internal/eventstore/locker" "time" ) @@ -14,5 +15,5 @@ type locker struct { } func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error { - return nil + return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime) }