From fcb36cd406288d5d1de8f6da2ee4136e48153f4e Mon Sep 17 00:00:00 2001 From: Livio Spring Date: Mon, 3 Oct 2022 16:09:59 +0200 Subject: [PATCH] fix: wait for projection initialization to be done (#4473) * fix: wait for projection initialization to be done * close channel Co-authored-by: Silvan --- internal/eventstore/handler/crdb/handler_stmt.go | 6 ++++-- internal/eventstore/handler/crdb/init.go | 6 +++++- internal/eventstore/handler/handler_projection.go | 8 ++++++-- internal/eventstore/handler/handler_projection_test.go | 1 + 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index 4c858ac3b4..fb1e165307 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -74,9 +74,11 @@ func NewStatementHandler( bulkLimit: config.BulkLimit, Locker: NewLocker(config.Client, config.LockTable, config.ProjectionName), } - h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock) - err := h.Init(ctx, config.InitCheck) + initialized := make(chan bool) + h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, initialized) + + err := h.Init(ctx, initialized, config.InitCheck) logging.OnError(err).WithField("projection", config.ProjectionName).Fatal("unable to initialize projections") h.Subscribe(h.aggregates...) diff --git a/internal/eventstore/handler/crdb/init.go b/internal/eventstore/handler/crdb/init.go index b928ee8b42..4c76e9f288 100644 --- a/internal/eventstore/handler/crdb/init.go +++ b/internal/eventstore/handler/crdb/init.go @@ -186,9 +186,11 @@ type ForeignKey struct { } // Init implements handler.Init -func (h *StatementHandler) Init(ctx context.Context, checks ...*handler.Check) error { +func (h *StatementHandler) Init(ctx context.Context, initialized chan<- bool, checks ...*handler.Check) error { for _, check := range checks { if check == nil || check.IsNoop() { + initialized <- true + close(initialized) return nil } tx, err := h.client.BeginTx(ctx, nil) @@ -211,6 +213,8 @@ func (h *StatementHandler) Init(ctx context.Context, checks ...*handler.Check) e return err } } + initialized <- true + close(initialized) return nil } diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index 856cf3042d..abb52604a0 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -62,6 +62,7 @@ func NewProjectionHandler( query SearchQuery, lock Lock, unlock Unlock, + initialized <-chan bool, ) *ProjectionHandler { concurrentInstances := int(config.ConcurrentInstances) if concurrentInstances < 1 { @@ -82,9 +83,12 @@ func NewProjectionHandler( concurrentInstances: concurrentInstances, } - go h.subscribe(ctx) + go func() { + <-initialized + go h.subscribe(ctx) - go h.schedule(ctx) + go h.schedule(ctx) + }() return h } diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index ada896b89e..7242dc9eba 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -337,6 +337,7 @@ func TestProjectionHandler_Process(t *testing.T) { nil, nil, nil, + nil, ) index, err := h.Process(tt.args.ctx, tt.args.events...)