mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-13 03:24:26 +00:00
fix: wait for projection initialization to be done (#4473)
* fix: wait for projection initialization to be done * close channel Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
parent
df624f0de5
commit
fcb36cd406
@ -74,9 +74,11 @@ func NewStatementHandler(
|
|||||||
bulkLimit: config.BulkLimit,
|
bulkLimit: config.BulkLimit,
|
||||||
Locker: NewLocker(config.Client, config.LockTable, config.ProjectionName),
|
Locker: NewLocker(config.Client, config.LockTable, config.ProjectionName),
|
||||||
}
|
}
|
||||||
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock)
|
|
||||||
|
|
||||||
err := h.Init(ctx, config.InitCheck)
|
initialized := make(chan bool)
|
||||||
|
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, initialized)
|
||||||
|
|
||||||
|
err := h.Init(ctx, initialized, config.InitCheck)
|
||||||
logging.OnError(err).WithField("projection", config.ProjectionName).Fatal("unable to initialize projections")
|
logging.OnError(err).WithField("projection", config.ProjectionName).Fatal("unable to initialize projections")
|
||||||
|
|
||||||
h.Subscribe(h.aggregates...)
|
h.Subscribe(h.aggregates...)
|
||||||
|
@ -186,9 +186,11 @@ type ForeignKey struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Init implements handler.Init
|
// Init implements handler.Init
|
||||||
func (h *StatementHandler) Init(ctx context.Context, checks ...*handler.Check) error {
|
func (h *StatementHandler) Init(ctx context.Context, initialized chan<- bool, checks ...*handler.Check) error {
|
||||||
for _, check := range checks {
|
for _, check := range checks {
|
||||||
if check == nil || check.IsNoop() {
|
if check == nil || check.IsNoop() {
|
||||||
|
initialized <- true
|
||||||
|
close(initialized)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
tx, err := h.client.BeginTx(ctx, nil)
|
tx, err := h.client.BeginTx(ctx, nil)
|
||||||
@ -211,6 +213,8 @@ func (h *StatementHandler) Init(ctx context.Context, checks ...*handler.Check) e
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
initialized <- true
|
||||||
|
close(initialized)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +62,7 @@ func NewProjectionHandler(
|
|||||||
query SearchQuery,
|
query SearchQuery,
|
||||||
lock Lock,
|
lock Lock,
|
||||||
unlock Unlock,
|
unlock Unlock,
|
||||||
|
initialized <-chan bool,
|
||||||
) *ProjectionHandler {
|
) *ProjectionHandler {
|
||||||
concurrentInstances := int(config.ConcurrentInstances)
|
concurrentInstances := int(config.ConcurrentInstances)
|
||||||
if concurrentInstances < 1 {
|
if concurrentInstances < 1 {
|
||||||
@ -82,9 +83,12 @@ func NewProjectionHandler(
|
|||||||
concurrentInstances: concurrentInstances,
|
concurrentInstances: concurrentInstances,
|
||||||
}
|
}
|
||||||
|
|
||||||
go h.subscribe(ctx)
|
go func() {
|
||||||
|
<-initialized
|
||||||
|
go h.subscribe(ctx)
|
||||||
|
|
||||||
go h.schedule(ctx)
|
go h.schedule(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
return h
|
return h
|
||||||
}
|
}
|
||||||
|
@ -337,6 +337,7 @@ func TestProjectionHandler_Process(t *testing.T) {
|
|||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
index, err := h.Process(tt.args.ctx, tt.args.events...)
|
index, err := h.Process(tt.args.ctx, tt.args.events...)
|
||||||
|
Loading…
Reference in New Issue
Block a user