fix(storage): resolve deadlock occuring durring projection (#3671)

This commit is contained in:
Silvan 2022-05-19 13:44:16 +02:00 committed by GitHub
parent 2f8c50aa4c
commit a95b1ab3d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 29 additions and 7 deletions

0
docs/docs/apis/assets/assets.md Normal file → Executable file
View File

View File

@ -149,7 +149,7 @@ func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statemen
// because there could be events between current sequence and a creation event // because there could be events between current sequence and a creation event
// and we cannot check via stmt.PreviousSequence // and we cannot check via stmt.PreviousSequence
if stmts[0].PreviousSequence == 0 { if stmts[0].PreviousSequence == 0 {
previousStmts, err := h.fetchPreviousStmts(ctx, stmts[0].Sequence, stmts[0].InstanceID, sequences, reduce) previousStmts, err := h.fetchPreviousStmts(ctx, tx, stmts[0].Sequence, stmts[0].InstanceID, sequences, reduce)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return stmts, err return stmts, err
@ -186,9 +186,8 @@ func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statemen
return unexecutedStmts, nil return unexecutedStmts, nil
} }
func (h *StatementHandler) fetchPreviousStmts(ctx context.Context, stmtSeq uint64, instanceID string, sequences currentSequences, reduce handler.Reduce) (previousStmts []*handler.Statement, err error) { func (h *StatementHandler) fetchPreviousStmts(ctx context.Context, tx *sql.Tx, stmtSeq uint64, instanceID string, sequences currentSequences, reduce handler.Reduce) (previousStmts []*handler.Statement, err error) {
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).SetTx(tx)
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent)
queriesAdded := false queriesAdded := false
for _, aggregateType := range h.aggregates { for _, aggregateType := range h.aggregates {
for _, sequence := range sequences[aggregateType] { for _, sequence := range sequences[aggregateType] {

View File

@ -701,7 +701,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
}), }),
aggregates: tt.fields.aggregates, aggregates: tt.fields.aggregates,
} }
stmts, err := h.fetchPreviousStmts(tt.args.ctx, tt.args.stmtSeq, "", tt.args.sequences, tt.args.reduce) stmts, err := h.fetchPreviousStmts(tt.args.ctx, nil, tt.args.stmtSeq, "", tt.args.sequences, tt.args.reduce)
if !tt.want.isErr(err) { if !tt.want.isErr(err) {
t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err) t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err)
return return

View File

@ -1,6 +1,10 @@
package repository package repository
import "github.com/zitadel/zitadel/internal/errors" import (
"database/sql"
"github.com/zitadel/zitadel/internal/errors"
)
//SearchQuery defines the which and how data are queried //SearchQuery defines the which and how data are queried
type SearchQuery struct { type SearchQuery struct {
@ -8,6 +12,7 @@ type SearchQuery struct {
Limit uint64 Limit uint64
Desc bool Desc bool
Filters [][]*Filter Filters [][]*Filter
Tx *sql.Tx
} }
//Columns defines which fields of the event are needed for the query //Columns defines which fields of the event are needed for the query

View File

@ -47,7 +47,15 @@ func query(ctx context.Context, criteria querier, searchQuery *repository.Search
query = criteria.placeholder(query) query = criteria.placeholder(query)
rows, err := criteria.db().QueryContext(ctx, query, values...) var contextQuerier interface {
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
}
contextQuerier = criteria.db()
if searchQuery.Tx != nil {
contextQuerier = searchQuery.Tx
}
rows, err := contextQuerier.QueryContext(ctx, query, values...)
if err != nil { if err != nil {
logging.New().WithError(err).Info("query failed") logging.New().WithError(err).Info("query failed")
return z_errors.ThrowInternal(err, "SQL-KyeAx", "unable to filter events") return z_errors.ThrowInternal(err, "SQL-KyeAx", "unable to filter events")

View File

@ -1,6 +1,8 @@
package eventstore package eventstore
import ( import (
"database/sql"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/repository" "github.com/zitadel/zitadel/internal/eventstore/repository"
) )
@ -14,6 +16,7 @@ type SearchQueryBuilder struct {
resourceOwner string resourceOwner string
instanceID string instanceID string
queries []*SearchQuery queries []*SearchQuery
tx *sql.Tx
} }
type SearchQuery struct { type SearchQuery struct {
@ -110,6 +113,12 @@ func (builder *SearchQueryBuilder) OrderAsc() *SearchQueryBuilder {
return builder return builder
} }
//SetTx ensures that the eventstore library uses the existing transaction
func (builder *SearchQueryBuilder) SetTx(tx *sql.Tx) *SearchQueryBuilder {
builder.tx = tx
return builder
}
//AddQuery creates a new sub query. //AddQuery creates a new sub query.
//All fields in the sub query are AND-connected in the storage request. //All fields in the sub query are AND-connected in the storage request.
//Multiple sub queries are OR-connected in the storage request. //Multiple sub queries are OR-connected in the storage request.
@ -240,6 +249,7 @@ func (builder *SearchQueryBuilder) build(instanceID string) (*repository.SearchQ
Limit: builder.limit, Limit: builder.limit,
Desc: builder.desc, Desc: builder.desc,
Filters: filters, Filters: filters,
Tx: builder.tx,
}, nil }, nil
} }