fix(query): realtime data on defined requests (#3726)

* feat: directly specify factors on addCustomLoginPolicy and return on LoginPolicy responses

* fix proto

* update login policy

* feat: directly specify idp on addCustomLoginPolicy and return on LoginPolicy responses

* fix: tests

* fix(projection): trigger bulk

* refactor: clean projection pkg

* instance should bulk

* fix(query): should trigger bulk on id calls

* tests

* build prerelease

* fix: add shouldTriggerBulk

* fix: test

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
Co-authored-by: Max Peintner <max@caos.ch>
This commit is contained in:
Silvan
2022-06-14 07:51:00 +02:00
committed by GitHub
parent 5c805c48db
commit dd2f31683c
146 changed files with 1097 additions and 1239 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
var (
@@ -62,7 +63,6 @@ func NewStatementHandler(
}
h := StatementHandler{
ProjectionHandler: handler.NewProjectionHandler(config.ProjectionHandlerConfig),
client: config.Client,
sequenceTable: config.SequenceTable,
maxFailureCount: config.MaxFailureCount,
@@ -75,11 +75,12 @@ func NewStatementHandler(
bulkLimit: config.BulkLimit,
Locker: NewLocker(config.Client, config.LockTable, config.ProjectionHandlerConfig.ProjectionName),
}
h.ProjectionHandler = handler.NewProjectionHandler(config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery)
err := h.Init(ctx, config.InitCheck)
logging.OnError(err).Fatal("unable to initialize projections")
go h.ProjectionHandler.Process(
go h.Process(
ctx,
h.reduce,
h.Update,
@@ -88,11 +89,20 @@ func NewStatementHandler(
h.SearchQuery,
)
h.ProjectionHandler.Handler.Subscribe(h.aggregates...)
h.Subscribe(h.aggregates...)
return h
}
func (h *StatementHandler) TriggerBulk(ctx context.Context) {
ctx, span := tracing.NewSpan(ctx)
var err error
defer span.EndWithError(err)
err = h.ProjectionHandler.TriggerBulk(ctx, h.Lock, h.Unlock)
logging.OnError(err).WithField("projection", h.ProjectionName).Warn("unable to trigger bulk")
}
func (h *StatementHandler) SearchQuery(ctx context.Context) (*eventstore.SearchQueryBuilder, uint64, error) {
sequences, err := h.currentSequences(ctx, h.client.QueryContext)
if err != nil {

View File

@@ -694,15 +694,19 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := &StatementHandler{
ProjectionHandler: handler.NewProjectionHandler(handler.ProjectionHandlerConfig{
HandlerConfig: handler.HandlerConfig{
Eventstore: tt.fields.eventstore,
},
ProjectionName: "my_projection",
RequeueEvery: 0,
}),
aggregates: tt.fields.aggregates,
}
h.ProjectionHandler = handler.NewProjectionHandler(handler.ProjectionHandlerConfig{
HandlerConfig: handler.HandlerConfig{
Eventstore: tt.fields.eventstore,
},
ProjectionName: "my_projection",
RequeueEvery: 0,
},
h.reduce,
h.Update,
h.SearchQuery,
)
stmts, err := h.fetchPreviousStmts(tt.args.ctx, nil, tt.args.stmtSeq, "", tt.args.sequences, tt.args.reduce)
if !tt.want.isErr(err) {
t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err)

View File

@@ -42,6 +42,9 @@ type ProjectionHandler struct {
requeueAfter time.Duration
shouldBulk *time.Timer
bulkMu sync.Mutex
bulkLocked bool
execBulk executeBulk
retryFailedAfter time.Duration
shouldPush *time.Timer
@@ -53,7 +56,12 @@ type ProjectionHandler struct {
stmts []*Statement
}
func NewProjectionHandler(config ProjectionHandlerConfig) *ProjectionHandler {
func NewProjectionHandler(
config ProjectionHandlerConfig,
reduce Reduce,
update Update,
query SearchQuery,
) *ProjectionHandler {
h := &ProjectionHandler{
Handler: NewHandler(config.HandlerConfig),
ProjectionName: config.ProjectionName,
@@ -64,6 +72,8 @@ func NewProjectionHandler(config ProjectionHandlerConfig) *ProjectionHandler {
retryFailedAfter: config.RetryFailedAfter,
}
h.execBulk = h.prepareExecuteBulk(query, reduce, update)
//unitialized timer
//https://github.com/golang/go/issues/12721
<-h.shouldPush.C
@@ -113,7 +123,6 @@ func (h *ProjectionHandler) Process(
logging.WithFields("projection", h.ProjectionName, "cause", cause, "stack", string(debug.Stack())).Error("projection handler paniced")
}()
execBulk := h.prepareExecuteBulk(query, reduce, update)
for {
select {
case <-ctx.Done():
@@ -122,15 +131,19 @@ func (h *ProjectionHandler) Process(
}
h.shutdown()
return
case event := <-h.Handler.EventQueue:
case event := <-h.EventQueue:
if err := h.processEvent(ctx, event, reduce); err != nil {
logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("process failed")
continue
}
h.triggerShouldPush(0)
case <-h.shouldBulk.C:
h.bulk(ctx, lock, execBulk, unlock)
h.bulkMu.Lock()
h.bulkLocked = true
h.bulk(ctx, lock, unlock)
h.ResetShouldBulk()
h.bulkLocked = false
h.bulkMu.Unlock()
default:
//lower prio select with push
select {
@@ -140,15 +153,19 @@ func (h *ProjectionHandler) Process(
}
h.shutdown()
return
case event := <-h.Handler.EventQueue:
case event := <-h.EventQueue:
if err := h.processEvent(ctx, event, reduce); err != nil {
logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("process failed")
continue
}
h.triggerShouldPush(0)
case <-h.shouldBulk.C:
h.bulk(ctx, lock, execBulk, unlock)
h.bulkMu.Lock()
h.bulkLocked = true
h.bulk(ctx, lock, unlock)
h.ResetShouldBulk()
h.bulkLocked = false
h.bulkMu.Unlock()
case <-h.shouldPush.C:
h.push(ctx, update, reduce)
h.ResetShouldBulk()
@@ -176,10 +193,38 @@ func (h *ProjectionHandler) processEvent(
return nil
}
func (h *ProjectionHandler) TriggerBulk(
ctx context.Context,
lock Lock,
unlock Unlock,
) error {
if !h.shouldBulk.Stop() {
//make sure to flush shouldBulk chan
select {
case <-h.shouldBulk.C:
default:
}
}
defer h.ResetShouldBulk()
h.bulkMu.Lock()
if h.bulkLocked {
logging.WithFields("projection", h.ProjectionName).Debugf("waiting for existing bulk to finish")
h.bulkMu.Unlock()
return nil
}
h.bulkLocked = true
defer func() {
h.bulkLocked = false
h.bulkMu.Unlock()
}()
return h.bulk(ctx, lock, unlock)
}
func (h *ProjectionHandler) bulk(
ctx context.Context,
lock Lock,
executeBulk executeBulk,
unlock Unlock,
) error {
ctx, cancel := context.WithCancel(ctx)
@@ -193,7 +238,7 @@ func (h *ProjectionHandler) bulk(
}
go h.cancelOnErr(ctx, errs, cancel)
execErr := executeBulk(ctx)
execErr := h.execBulk(ctx)
logging.WithFields("projection", h.ProjectionName).OnError(execErr).Warn("unable to execute")
unlockErr := unlock(systemID)

View File

@@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
es_repo_mock "github.com/zitadel/zitadel/internal/eventstore/repository/mock"
@@ -126,7 +127,11 @@ func TestProjectionHandler_processEvent(t *testing.T) {
},
ProjectionName: "",
RequeueEvery: -1,
})
},
nil,
nil,
nil,
)
h.stmts = tt.fields.stmts
h.pushSet = tt.fields.pushSet
h.shouldPush = tt.fields.shouldPush
@@ -408,7 +413,11 @@ func TestProjectionHandler_push(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
h := NewProjectionHandler(ProjectionHandlerConfig{
HandlerConfig: HandlerConfig{},
})
},
nil,
nil,
nil,
)
h.stmts = tt.fields.stmts
h.pushSet = tt.fields.pushSet
if tt.args.previousLock > 0 {
@@ -655,8 +664,13 @@ func TestProjectionHandler_bulk(t *testing.T) {
HandlerConfig: HandlerConfig{},
ProjectionName: "",
RequeueEvery: -1,
})
err := h.bulk(tt.args.ctx, tt.args.lock.lock(), tt.args.executeBulk.executeBulk(), tt.args.unlock.unlock())
},
tt.args.executeBulk.Reduce,
tt.args.executeBulk.Update,
tt.args.executeBulk.Query,
)
h.Eventstore = tt.args.executeBulk.es(t)
err := h.bulk(tt.args.ctx, tt.args.lock.lock(), tt.args.unlock.unlock())
if !tt.res.isErr(err) {
t.Errorf("unexpected error %v", err)
}
@@ -873,6 +887,26 @@ type executeBulkMock struct {
canceled chan bool
}
func (m *executeBulkMock) Query(context.Context) (*eventstore.SearchQueryBuilder, uint64, error) {
m.callCount++
if m.err != nil {
return nil, 0, m.err
}
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).AddQuery().SequenceGreater(0).Builder(), 0, nil
}
func (m *executeBulkMock) Reduce(eventstore.Event) (*Statement, error) {
return nil, nil
}
func (m *executeBulkMock) Update(ctx context.Context, stmts []*Statement, reduce Reduce) (unexecutedStmts []*Statement, err error) {
return nil, nil
}
func (m *executeBulkMock) es(t *testing.T) *eventstore.Eventstore {
mock := es_repo_mock.NewRepo(t)
mock.EXPECT().Filter(gomock.Any(), gomock.Any()).AnyTimes().Return(nil, nil)
return eventstore.NewEventstore(mock)
}
func (m *executeBulkMock) executeBulk() executeBulk {
return func(ctx context.Context) error {
m.callCount++