diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 37bb9718d0..fd9c0ec637 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -471,11 +471,11 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add txCtx := ctx if h.txDuration > 0 { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, h.txDuration) - defer cancel() + var cancel, cancelTx func() // add 100ms to store current state if iteration takes too long - txCtx, cancel = context.WithTimeout(ctx, h.txDuration+100*time.Millisecond) + txCtx, cancelTx = context.WithTimeout(ctx, h.txDuration+100*time.Millisecond) + defer cancelTx() + ctx, cancel = context.WithTimeout(ctx, h.txDuration) defer cancel() } diff --git a/internal/eventstore/handler/v2/state.go b/internal/eventstore/handler/v2/state.go index cdd9a3b9b9..d3b6953488 100644 --- a/internal/eventstore/handler/v2/state.go +++ b/internal/eventstore/handler/v2/state.go @@ -93,8 +93,8 @@ func (h *Handler) setState(tx *sql.Tx, updatedState *state) error { updatedState.offset, ) if err != nil { - h.log().WithError(err).Debug("unable to update state") - return err + h.log().WithError(err).Warn("unable to update state") + return zerrors.ThrowInternal(err, "V2-WF23g2", "unable to update state") } if affected, err := res.RowsAffected(); affected == 0 { h.log().OnError(err).Error("unable to check if states are updated")