fix(spooler): correct workers (#508)

* one concurrent task

* disable spooler

* fix: improve concurrency in spooler

* fix: dont block lock

* fix: break if lock failed

* fix: check if handler is working

* fix: worker id

* fix: test

* fix: use limit for spoolers configured in startup.yaml

* fix test

* fix: factory

* fix(key): only reduce if not expired

* fix(searchQueryFactory): check for string-slice in aggregateID

* fix(migrations): combine migrations

* fix: allow saving multiple objects in one request

* fix(eventstore): logging

* fix(eventstore): rethink insert i locks table

* fix: ignore failed tests for the moment

* fix: tuubel

* fix: for tests in io

* fix: ignore tests for io

* fix: rename concurrent tasks to workers

* fix: incomment tests and remove some tests

* fix: refert changes for io

* refactor(eventstore): combine types of sql in one file

* refactor(eventstore): logs, TODO's, tests

* fix(eventstore): sql package

* test(eventstore): add tests for search query factory

* chore: logs

* fix(spooler): optimize lock query
chore(migrations): rename locks.object_type to view_name
chore(migrations): refactor migrations

* test: incomment tests

* fix: rename PrepareSaves to PrepareBulkSave

* chore: go dependencies

* fix(migrations): add id in events table

* refactor(lock): less magic numbers

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Silvan
2020-07-28 09:42:21 +02:00
committed by GitHub
parent 531060ab67
commit 41e1a7cc7b
90 changed files with 2210 additions and 1471 deletions

View File

@@ -2,13 +2,13 @@ package handler
import (
"github.com/caos/logging"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/project/repository/eventsourcing"
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
view_model "github.com/caos/zitadel/internal/project/repository/view/model"
"time"
)
type Application struct {
@@ -20,8 +20,6 @@ const (
applicationTable = "auth.applications"
)
func (p *Application) MinimumCycleDuration() time.Duration { return p.cycleDuration }
func (p *Application) ViewModel() string {
return applicationTable
}

View File

@@ -1,16 +1,17 @@
package handler
import (
"time"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/query"
iam_events "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
org_events "github.com/caos/zitadel/internal/org/repository/eventsourcing"
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
"time"
"github.com/caos/zitadel/internal/auth/repository/eventsourcing/view"
"github.com/caos/zitadel/internal/config/types"
"github.com/caos/zitadel/internal/eventstore/spooler"
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
)
@@ -34,8 +35,8 @@ type EventstoreRepos struct {
IamEvents *iam_events.IamEventstore
}
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, eventstore eventstore.Eventstore, repos EventstoreRepos, systemDefaults sd.SystemDefaults) []spooler.Handler {
return []spooler.Handler{
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, eventstore eventstore.Eventstore, repos EventstoreRepos, systemDefaults sd.SystemDefaults) []query.Handler {
return []query.Handler{
&User{handler: handler{view, bulkLimit, configs.cycleDuration("User"), errorCount}, orgEvents: repos.OrgEvents},
&UserSession{handler: handler{view, bulkLimit, configs.cycleDuration("UserSession"), errorCount}, userEvents: repos.UserEvents},
&Token{handler: handler{view, bulkLimit, configs.cycleDuration("Token"), errorCount}, ProjectEvents: repos.ProjectEvents},
@@ -60,3 +61,11 @@ func (configs Configs) cycleDuration(viewModel string) time.Duration {
}
return c.MinimumCycleDuration.Duration
}
func (h *handler) MinimumCycleDuration() time.Duration {
return h.cycleDuration
}
func (h *handler) QueryLimit() uint64 {
return h.bulkLimit
}

View File

@@ -21,8 +21,6 @@ const (
keyTable = "auth.keys"
)
func (k *Key) MinimumCycleDuration() time.Duration { return k.cycleDuration }
func (k *Key) ViewModel() string {
return keyTable
}
@@ -42,11 +40,13 @@ func (k *Key) Reduce(event *models.Event) error {
if err != nil {
return err
}
if privateKey.Expiry.Before(time.Now()) && publicKey.Expiry.Before(time.Now()) {
return k.view.ProcessedKeySequence(event.Sequence)
}
return k.view.PutKeys(privateKey, publicKey, event.Sequence)
default:
return k.view.ProcessedKeySequence(event.Sequence)
}
return nil
}
func (k *Key) OnError(event *models.Event, err error) error {

View File

@@ -2,12 +2,12 @@ package handler
import (
"github.com/caos/logging"
es_models "github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/org/repository/eventsourcing"
"github.com/caos/zitadel/internal/org/repository/eventsourcing/model"
org_model "github.com/caos/zitadel/internal/org/repository/view/model"
"time"
)
type Org struct {
@@ -18,8 +18,6 @@ const (
orgTable = "auth.orgs"
)
func (o *Org) MinimumCycleDuration() time.Duration { return o.cycleDuration }
func (o *Org) ViewModel() string {
return orgTable
}

View File

@@ -3,7 +3,6 @@ package handler
import (
"context"
"encoding/json"
"time"
"github.com/caos/logging"
@@ -25,8 +24,6 @@ const (
tokenTable = "auth.tokens"
)
func (u *Token) MinimumCycleDuration() time.Duration { return u.cycleDuration }
func (u *Token) ViewModel() string {
return tokenTable
}

View File

@@ -2,7 +2,6 @@ package handler
import (
"context"
"time"
es_models "github.com/caos/zitadel/internal/eventstore/models"
org_model "github.com/caos/zitadel/internal/org/model"
@@ -28,8 +27,6 @@ const (
userTable = "auth.users"
)
func (p *User) MinimumCycleDuration() time.Duration { return p.cycleDuration }
func (p *User) ViewModel() string {
return userTable
}

View File

@@ -3,9 +3,9 @@ package handler
import (
"context"
"strings"
"time"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore"
@@ -42,8 +42,6 @@ const (
userGrantTable = "auth.user_grants"
)
func (u *UserGrant) MinimumCycleDuration() time.Duration { return u.cycleDuration }
func (u *UserGrant) ViewModel() string {
return userGrantTable
}

View File

@@ -1,8 +1,6 @@
package handler
import (
"time"
req_model "github.com/caos/zitadel/internal/auth_request/model"
"github.com/caos/zitadel/internal/errors"
es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
@@ -25,8 +23,6 @@ const (
userSessionTable = "auth.user_sessions"
)
func (u *UserSession) MinimumCycleDuration() time.Duration { return u.cycleDuration }
func (u *UserSession) ViewModel() string {
return userSessionTable
}

View File

@@ -1,127 +0,0 @@
package spooler
import (
"database/sql"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
type dbMock struct {
db *sql.DB
mock sqlmock.Sqlmock
}
func mockDB(t *testing.T) *dbMock {
mockDB := dbMock{}
var err error
mockDB.db, mockDB.mock, err = sqlmock.New()
if err != nil {
t.Fatalf("error occured while creating stub db %v", err)
}
mockDB.mock.MatchExpectationsInOrder(true)
return &mockDB
}
func (db *dbMock) expectCommit() *dbMock {
db.mock.ExpectCommit()
return db
}
func (db *dbMock) expectRollback() *dbMock {
db.mock.ExpectRollback()
return db
}
func (db *dbMock) expectBegin() *dbMock {
db.mock.ExpectBegin()
return db
}
func (db *dbMock) expectSavepoint() *dbMock {
db.mock.ExpectExec("SAVEPOINT").WillReturnResult(sqlmock.NewResult(1, 1))
return db
}
func (db *dbMock) expectReleaseSavepoint() *dbMock {
db.mock.ExpectExec("RELEASE SAVEPOINT").WillReturnResult(sqlmock.NewResult(1, 1))
return db
}
func (db *dbMock) expectRenew(lockerID, view string, affectedRows int64) *dbMock {
query := db.mock.
ExpectExec(`INSERT INTO auth\.locks \(object_type, locker_id, locked_until\) VALUES \(\$1, \$2, now\(\)\+\$3\) ON CONFLICT \(object_type\) DO UPDATE SET locked_until = now\(\)\+\$4, locker_id = \$5 WHERE \(locks\.locked_until < now\(\) OR locks\.locker_id = \$6\) AND locks\.object_type = \$7`).
WithArgs(view, lockerID, sqlmock.AnyArg(), sqlmock.AnyArg(), lockerID, lockerID, view).
WillReturnResult(sqlmock.NewResult(1, 1))
if affectedRows == 0 {
query.WillReturnResult(sqlmock.NewResult(0, 0))
} else {
query.WillReturnResult(sqlmock.NewResult(1, affectedRows))
}
return db
}
func Test_locker_Renew(t *testing.T) {
type fields struct {
db *dbMock
}
type args struct {
lockerID string
viewModel string
waitTime time.Duration
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "renew succeeded",
fields: fields{
db: mockDB(t).
expectBegin().
expectSavepoint().
expectRenew("locker", "view", 1).
expectReleaseSavepoint().
expectCommit(),
},
args: args{lockerID: "locker", viewModel: "view", waitTime: 1 * time.Second},
wantErr: false,
},
{
name: "renew now rows updated",
fields: fields{
db: mockDB(t).
expectBegin().
expectSavepoint().
expectRenew("locker", "view", 0).
expectRollback(),
},
args: args{lockerID: "locker", viewModel: "view", waitTime: 1 * time.Second},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &locker{
dbClient: tt.fields.db.db,
}
if err := l.Renew(tt.args.lockerID, tt.args.viewModel, tt.args.waitTime); (err != nil) != tt.wantErr {
t.Errorf("locker.Renew() error = %v, wantErr %v", err, tt.wantErr)
}
if err := tt.fields.db.mock.ExpectationsWereMet(); err != nil {
t.Errorf("not all database expectations met: %v", err)
}
})
}
}

View File

@@ -2,6 +2,7 @@ package spooler
import (
"database/sql"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/auth/repository/eventsourcing/handler"
@@ -14,16 +15,16 @@ import (
type SpoolerConfig struct {
BulkLimit uint64
FailureCountUntilSkip uint64
ConcurrentTasks int
ConcurrentWorkers int
Handlers handler.Configs
}
func StartSpooler(c SpoolerConfig, es eventstore.Eventstore, view *view.View, sql *sql.DB, repos handler.EventstoreRepos, systemDefaults sd.SystemDefaults) *spooler.Spooler {
func StartSpooler(c SpoolerConfig, es eventstore.Eventstore, view *view.View, client *sql.DB, repos handler.EventstoreRepos, systemDefaults sd.SystemDefaults) *spooler.Spooler {
spoolerConfig := spooler.Config{
Eventstore: es,
Locker: &locker{dbClient: sql},
ConcurrentTasks: c.ConcurrentTasks,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, repos, systemDefaults),
Eventstore: es,
Locker: &locker{dbClient: client},
ConcurrentWorkers: c.ConcurrentWorkers,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, repos, systemDefaults),
}
spool := spoolerConfig.New()
spool.Start()