diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 0d671c4c7f..0c3fa9201c 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -513,7 +513,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add return additionalIteration, err } - lastProcessedIndex, err := h.executeStatements(ctx, tx, currentState, statements) + lastProcessedIndex, err := h.executeStatements(ctx, tx, statements) h.log().OnError(err).WithField("lastProcessedIndex", lastProcessedIndex).Debug("execution of statements failed") if lastProcessedIndex < 0 { return false, err @@ -585,7 +585,7 @@ func skipPreviouslyReduced(statements []*Statement, currentState *state) int { return -1 } -func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, currentState *state, statements []*Statement) (lastProcessedIndex int, err error) { +func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, statements []*Statement) (lastProcessedIndex int, err error) { lastProcessedIndex = -1 for i, statement := range statements { @@ -593,7 +593,7 @@ func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, currentStat case <-ctx.Done(): break default: - err := h.executeStatement(ctx, tx, currentState, statement) + err := h.executeStatement(ctx, tx, statement) if err != nil { return lastProcessedIndex, err } @@ -603,28 +603,24 @@ func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, currentStat return lastProcessedIndex, nil } -func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, currentState *state, statement *Statement) (err error) { +func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *Statement) (err error) { if statement.Execute == nil { return nil } - _, err = tx.Exec("SAVEPOINT exec") + _, err = tx.ExecContext(ctx, "SAVEPOINT exec_stmt") if err != nil { h.log().WithError(err).Debug("create savepoint failed") return err } - var shouldContinue bool - defer func() { - _, errSave := tx.Exec("RELEASE SAVEPOINT exec") - if err == nil { - err = errSave - } - }() if err = statement.Execute(tx, h.projection.Name()); err != nil { h.log().WithError(err).Error("statement execution failed") - shouldContinue = h.handleFailedStmt(tx, failureFromStatement(statement, err)) + _, rollbackErr := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT exec_stmt") + h.log().OnError(rollbackErr).Error("rollback to savepoint failed") + + shouldContinue := h.handleFailedStmt(tx, failureFromStatement(statement, err)) if shouldContinue { return nil } diff --git a/internal/eventstore/handler/v2/init.go b/internal/eventstore/handler/v2/init.go index c703e8ee3a..ead1c806d0 100644 --- a/internal/eventstore/handler/v2/init.go +++ b/internal/eventstore/handler/v2/init.go @@ -264,11 +264,23 @@ func NewViewCheck(selectStmt string, secondaryTables ...*SuffixedTable) *handler } func execNextIfExists(config execConfig, q query, opts []execOption, executeNext bool) func(handler.Executer, string) (bool, error) { - return func(handler handler.Executer, name string) (bool, error) { - err := exec(config, q, opts)(handler, name) - if isErrAlreadyExists(err) { - return executeNext, nil + return func(handler handler.Executer, name string) (shouldExecuteNext bool, err error) { + _, err = handler.Exec("SAVEPOINT exec_stmt") + if err != nil { + return false, zerrors.ThrowInternal(err, "V2-U1wlz", "create savepoint failed") } + defer func() { + if err == nil { + return + } + + if isErrAlreadyExists(err) { + _, err = handler.Exec("ROLLBACK TO SAVEPOINT exec_stmt") + shouldExecuteNext = executeNext + return + } + }() + err = exec(config, q, opts)(handler, name) return false, err } } diff --git a/internal/eventstore/handler/v2/statement.go b/internal/eventstore/handler/v2/statement.go index 2eb9193083..15fc92f9c3 100644 --- a/internal/eventstore/handler/v2/statement.go +++ b/internal/eventstore/handler/v2/statement.go @@ -601,18 +601,6 @@ func exec(config execConfig, q query, opts []execOption) Exec { opt(&config) } - _, err = ex.Exec("SAVEPOINT stmt_exec") - if err != nil { - return zerrors.ThrowInternal(err, "CRDB-YdOXD", "create savepoint failed") - } - defer func() { - if err != nil { - _, rollbackErr := ex.Exec("ROLLBACK TO SAVEPOINT stmt_exec") - logging.OnError(rollbackErr).Debug("rollback failed") - return - } - _, err = ex.Exec("RELEASE SAVEPOINT stmt_exec") - }() _, err = ex.Exec(q(config), config.args...) if err != nil { return zerrors.ThrowInternal(err, "CRDB-pKtsr", "exec failed") diff --git a/internal/query/projection/executer_test.go b/internal/query/projection/executer_test.go index 9c1dd021fc..7af4e66a6a 100644 --- a/internal/query/projection/executer_test.go +++ b/internal/query/projection/executer_test.go @@ -25,7 +25,7 @@ type execution struct { type anyArg struct{} func (e *testExecuter) Exec(stmt string, args ...interface{}) (sql.Result, error) { - if stmt == "SAVEPOINT stmt_exec" || stmt == "RELEASE SAVEPOINT stmt_exec" { + if stmt == "SAVEPOINT exec_stmt" { return nil, nil }