From a95b1ab3d0624e6f90fd46f12a70208cb9ec18aa Mon Sep 17 00:00:00 2001 From: Silvan Date: Thu, 19 May 2022 13:44:16 +0200 Subject: [PATCH] fix(storage): resolve deadlock occuring durring projection (#3671) --- docs/docs/apis/assets/assets.md | 0 internal/eventstore/handler/crdb/handler_stmt.go | 7 +++---- internal/eventstore/handler/crdb/handler_stmt_test.go | 2 +- internal/eventstore/repository/search_query.go | 7 ++++++- internal/eventstore/repository/sql/query.go | 10 +++++++++- internal/eventstore/search_query.go | 10 ++++++++++ 6 files changed, 29 insertions(+), 7 deletions(-) mode change 100644 => 100755 docs/docs/apis/assets/assets.md diff --git a/docs/docs/apis/assets/assets.md b/docs/docs/apis/assets/assets.md old mode 100644 new mode 100755 diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index 6a50ecd1c9..63da8a7aea 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -149,7 +149,7 @@ func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statemen // because there could be events between current sequence and a creation event // and we cannot check via stmt.PreviousSequence if stmts[0].PreviousSequence == 0 { - previousStmts, err := h.fetchPreviousStmts(ctx, stmts[0].Sequence, stmts[0].InstanceID, sequences, reduce) + previousStmts, err := h.fetchPreviousStmts(ctx, tx, stmts[0].Sequence, stmts[0].InstanceID, sequences, reduce) if err != nil { tx.Rollback() return stmts, err @@ -186,9 +186,8 @@ func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statemen return unexecutedStmts, nil } -func (h *StatementHandler) fetchPreviousStmts(ctx context.Context, stmtSeq uint64, instanceID string, sequences currentSequences, reduce handler.Reduce) (previousStmts []*handler.Statement, err error) { - - query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent) +func (h *StatementHandler) fetchPreviousStmts(ctx context.Context, tx *sql.Tx, stmtSeq uint64, instanceID string, sequences currentSequences, reduce handler.Reduce) (previousStmts []*handler.Statement, err error) { + query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).SetTx(tx) queriesAdded := false for _, aggregateType := range h.aggregates { for _, sequence := range sequences[aggregateType] { diff --git a/internal/eventstore/handler/crdb/handler_stmt_test.go b/internal/eventstore/handler/crdb/handler_stmt_test.go index f2bb26428d..18bbb1aa64 100644 --- a/internal/eventstore/handler/crdb/handler_stmt_test.go +++ b/internal/eventstore/handler/crdb/handler_stmt_test.go @@ -701,7 +701,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) { }), aggregates: tt.fields.aggregates, } - stmts, err := h.fetchPreviousStmts(tt.args.ctx, tt.args.stmtSeq, "", tt.args.sequences, tt.args.reduce) + stmts, err := h.fetchPreviousStmts(tt.args.ctx, nil, tt.args.stmtSeq, "", tt.args.sequences, tt.args.reduce) if !tt.want.isErr(err) { t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err) return diff --git a/internal/eventstore/repository/search_query.go b/internal/eventstore/repository/search_query.go index 7bd6d09604..cf342c804b 100644 --- a/internal/eventstore/repository/search_query.go +++ b/internal/eventstore/repository/search_query.go @@ -1,6 +1,10 @@ package repository -import "github.com/zitadel/zitadel/internal/errors" +import ( + "database/sql" + + "github.com/zitadel/zitadel/internal/errors" +) //SearchQuery defines the which and how data are queried type SearchQuery struct { @@ -8,6 +12,7 @@ type SearchQuery struct { Limit uint64 Desc bool Filters [][]*Filter + Tx *sql.Tx } //Columns defines which fields of the event are needed for the query diff --git a/internal/eventstore/repository/sql/query.go b/internal/eventstore/repository/sql/query.go index a9116fb4b4..de78f20686 100644 --- a/internal/eventstore/repository/sql/query.go +++ b/internal/eventstore/repository/sql/query.go @@ -47,7 +47,15 @@ func query(ctx context.Context, criteria querier, searchQuery *repository.Search query = criteria.placeholder(query) - rows, err := criteria.db().QueryContext(ctx, query, values...) + var contextQuerier interface { + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + } + contextQuerier = criteria.db() + if searchQuery.Tx != nil { + contextQuerier = searchQuery.Tx + } + + rows, err := contextQuerier.QueryContext(ctx, query, values...) if err != nil { logging.New().WithError(err).Info("query failed") return z_errors.ThrowInternal(err, "SQL-KyeAx", "unable to filter events") diff --git a/internal/eventstore/search_query.go b/internal/eventstore/search_query.go index e6f51fa6c3..4a3fb486f5 100644 --- a/internal/eventstore/search_query.go +++ b/internal/eventstore/search_query.go @@ -1,6 +1,8 @@ package eventstore import ( + "database/sql" + "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore/repository" ) @@ -14,6 +16,7 @@ type SearchQueryBuilder struct { resourceOwner string instanceID string queries []*SearchQuery + tx *sql.Tx } type SearchQuery struct { @@ -110,6 +113,12 @@ func (builder *SearchQueryBuilder) OrderAsc() *SearchQueryBuilder { return builder } +//SetTx ensures that the eventstore library uses the existing transaction +func (builder *SearchQueryBuilder) SetTx(tx *sql.Tx) *SearchQueryBuilder { + builder.tx = tx + return builder +} + //AddQuery creates a new sub query. //All fields in the sub query are AND-connected in the storage request. //Multiple sub queries are OR-connected in the storage request. @@ -240,6 +249,7 @@ func (builder *SearchQueryBuilder) build(instanceID string) (*repository.SearchQ Limit: builder.limit, Desc: builder.desc, Filters: filters, + Tx: builder.tx, }, nil }