fix: lock again (#1132)

* start sub

* start implement subsciptions

* start subscription

* implementation for member done

* admin done

* fix: tests

* extend handlers

* prepary notification

* no errors in adminapi

* changed current sequence in all packages

* ignore mocks

* works

* subscriptions as singleton

* tests

* refactor: rename function scope var

* fix: process ALL previous sequences

* fix: spooler and pubsub

* handler check

* fix: process events until all done

* fix break on query err

* fix: handler

* fix: process sequence or return error

* check aggregate id

* fix: log only in error case

* fix tests

* fix: handlers

* fix: spooler

* fix: spooler

* fix: tests

* fix: continue

* fix: locker duration

* fix: variable lock duration

* fix: test

* fix: test

* fix: test min max time

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Silvan 2020-12-22 12:27:55 +01:00 committed by GitHub
parent f96838cf62
commit a6c4702b8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 63 additions and 31 deletions

View File

@ -95,6 +95,10 @@ func (h *handler) MinimumCycleDuration() time.Duration {
return h.cycleDuration
}
func (h *handler) LockDuration() time.Duration {
return h.cycleDuration / 3
}
func (h *handler) QueryLimit() uint64 {
return h.bulkLimit
}

View File

@ -2,26 +2,18 @@ package spooler
import (
"database/sql"
es_locker "github.com/caos/zitadel/internal/eventstore/locker"
"time"
)
const (
lockTable = "adminapi.locks"
lockedUntilKey = "locked_until"
lockerIDKey = "locker_id"
objectTypeKey = "object_type"
)
type locker struct {
dbClient *sql.DB
}
type lock struct {
LockerID string `gorm:"column:locker_id;primary_key"`
LockedUntil time.Time `gorm:"column:locked_until"`
ViewName string `gorm:"column:object_type;primary_key"`
}
func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
return nil
return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime)
}

View File

@ -108,6 +108,10 @@ func (h *handler) MinimumCycleDuration() time.Duration {
return h.cycleDuration
}
func (h *handler) LockDuration() time.Duration {
return h.cycleDuration / 3
}
func (h *handler) QueryLimit() uint64 {
return h.bulkLimit
}

View File

@ -2,13 +2,18 @@ package spooler
import (
"database/sql"
es_locker "github.com/caos/zitadel/internal/eventstore/locker"
"time"
)
const (
lockTable = "management.locks"
)
type locker struct {
dbClient *sql.DB
}
func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
return nil
return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime)
}

View File

@ -3,13 +3,12 @@ package handler
import (
"time"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/view"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/config/types"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/query"
iam_events "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/view"
"github.com/caos/zitadel/internal/config/types"
)
type Configs map[string]*Config
@ -60,6 +59,10 @@ func (h *handler) MinimumCycleDuration() time.Duration {
return h.cycleDuration
}
func (h *handler) LockDuration() time.Duration {
return h.cycleDuration / 3
}
func (h *handler) QueryLimit() uint64 {
return h.bulkLimit
}

View File

@ -2,6 +2,7 @@ package spooler
import (
"database/sql"
es_locker "github.com/caos/zitadel/internal/eventstore/locker"
"time"
)
@ -14,5 +15,5 @@ type locker struct {
}
func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
return nil
return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime)
}

View File

@ -20,6 +20,7 @@ type Handler interface {
OnError(event *models.Event, err error) error
OnSuccess() error
MinimumCycleDuration() time.Duration
LockDuration() time.Duration
QueryLimit() uint64
AggregateTypes() []models.AggregateType

View File

@ -160,12 +160,12 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s
case <-ctx.Done():
return
case <-renewTimer:
err := s.locker.Renew(workerID, s.ViewModel(), s.MinimumCycleDuration()*2)
err := s.locker.Renew(workerID, s.ViewModel(), s.LockDuration())
firstLock.Do(func() {
locked <- err == nil
})
if err == nil {
renewTimer = time.After(s.MinimumCycleDuration())
renewTimer = time.After(s.LockDuration())
continue
}

View File

@ -40,16 +40,19 @@ func (h *testHandler) Eventstore() eventstore.Eventstore {
func (h *testHandler) ViewModel() string {
return h.viewModel
}
func (h *testHandler) EventQuery() (*models.SearchQuery, error) {
if h.queryError != nil {
return nil, h.queryError
}
return &models.SearchQuery{}, nil
}
func (h *testHandler) Reduce(*models.Event) error {
<-time.After(h.processSleep)
return h.processError
}
func (h *testHandler) OnError(event *models.Event, err error) error {
if h.maxErrCount == 2 {
return nil
@ -57,12 +60,19 @@ func (h *testHandler) OnError(event *models.Event, err error) error {
h.maxErrCount++
return err
}
func (h *testHandler) OnSuccess() error {
return nil
}
func (h *testHandler) MinimumCycleDuration() time.Duration {
return h.cycleDuration
}
func (h *testHandler) LockDuration() time.Duration {
return h.cycleDuration / 2
}
func (h *testHandler) QueryLimit() uint64 {
return h.bulkLimit
}
@ -232,31 +242,31 @@ func TestSpooler_load(t *testing.T) {
{
"lock exists",
fields{
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("lock already exists"), 2000*time.Millisecond),
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView1", cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView1").expectRenew(t, fmt.Errorf("lock already exists"), 500*time.Millisecond),
},
},
{
"lock fails",
fields{
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("fail"), 2000*time.Millisecond),
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView2", cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView2").expectRenew(t, fmt.Errorf("fail"), 500*time.Millisecond),
eventstore: &eventstoreStub{events: []*models.Event{{}}},
},
},
{
"query fails",
fields{
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView3", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView3").expectRenew(t, nil, 500*time.Millisecond),
eventstore: &eventstoreStub{err: fmt.Errorf("fail")},
},
},
{
"process event fails",
fields{
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond).expectRenew(t, nil, 1000*time.Millisecond),
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView4", cycleDuration: 500 * time.Millisecond, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView4").expectRenew(t, nil, 250*time.Millisecond),
eventstore: &eventstoreStub{events: []*models.Event{{}}},
},
},
@ -292,7 +302,7 @@ func TestSpooler_lock(t *testing.T) {
"renew correct",
fields{
currentHandler: &testHandler{cycleDuration: 1 * time.Second, viewModel: "testView"},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 500*time.Millisecond),
expectsErr: false,
},
args{
@ -303,7 +313,7 @@ func TestSpooler_lock(t *testing.T) {
"renew fails",
fields{
currentHandler: &testHandler{cycleDuration: 900 * time.Millisecond, viewModel: "testView"},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 1800*time.Millisecond),
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 450*time.Millisecond),
expectsErr: true,
},
args{
@ -357,13 +367,15 @@ func newTestLocker(t *testing.T, lockerID, viewName string) *testLocker {
}
func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration) *testLocker {
t.Helper()
l.mock.EXPECT().Renew(gomock.Any(), l.viewName, gomock.Any()).DoAndReturn(
func(_, _ string, gotten time.Duration) error {
t.Helper()
if waitTime-gotten != 0 {
t.Errorf("expected waittime %v got %v", waitTime, gotten)
}
return err
}).Times(1)
}).MinTimes(1).MaxTimes(3)
return l
}

View File

@ -118,6 +118,10 @@ func (h *handler) MinimumCycleDuration() time.Duration {
return h.cycleDuration
}
func (h *handler) LockDuration() time.Duration {
return h.cycleDuration / 3
}
func (h *handler) QueryLimit() uint64 {
return h.bulkLimit
}

View File

@ -2,6 +2,7 @@ package spooler
import (
"database/sql"
es_locker "github.com/caos/zitadel/internal/eventstore/locker"
"time"
)
@ -14,5 +15,5 @@ type locker struct {
}
func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
return nil
return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime)
}

View File

@ -77,6 +77,10 @@ func (h *handler) MinimumCycleDuration() time.Duration {
return h.cycleDuration
}
func (h *handler) LockDuration() time.Duration {
return h.cycleDuration / 3
}
func (h *handler) QueryLimit() uint64 {
return h.bulkLimit
}

View File

@ -2,6 +2,7 @@ package spooler
import (
"database/sql"
es_locker "github.com/caos/zitadel/internal/eventstore/locker"
"time"
)
@ -14,5 +15,5 @@ type locker struct {
}
func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
return nil
return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime)
}