feat(eventstore): add row locking option (#8939)

# Which Problems Are Solved

We need a reliable way to lock events that are being processed as part
of a job queue. For example in the notification handlers.

# How the Problems Are Solved

Allow setting `FOR UPDATE [ NOWAIT | SKIP LOCKED ]` to the eventstore
query builder using an open transaction.

- NOWAIT returns an errors if the lock cannot be obtained
- SKIP LOCKED only returns row which are not locked.
- Default is to wait for the lock to be released.

# Additional Changes

- none

# Additional Context

- [Locking
docs](https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE)
- Related to https://github.com/zitadel/zitadel/issues/8931
This commit is contained in:
Tim Möhlmann 2024-11-21 16:46:30 +02:00 committed by GitHub
parent b65266907c
commit d4389ab359
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 188 additions and 0 deletions

View File

@ -14,6 +14,8 @@ type SearchQuery struct {
SubQueries [][]*Filter
Tx *sql.Tx
LockRows bool
LockOption eventstore.LockOption
AllowTimeTravel bool
AwaitOpenTransactions bool
Limit uint64
@ -130,6 +132,7 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err
AwaitOpenTransactions: builder.GetAwaitOpenTransactions(),
SubQueries: make([][]*Filter, len(builder.GetQueries())),
}
query.LockRows, query.LockOption = builder.GetLockRows()
for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{
instanceIDFilter,

View File

@ -105,6 +105,18 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
query += " OFFSET ?"
}
if q.LockRows {
query += " FOR UPDATE"
switch q.LockOption {
case eventstore.LockOptionWait: // default behavior
case eventstore.LockOptionNoWait:
query += " NOWAIT"
case eventstore.LockOptionSkipLocked:
query += " SKIP LOCKED"
}
}
query = criteria.placeholder(query)
var contextQuerier interface {

View File

@ -657,6 +657,89 @@ func Test_query_events_with_crdb(t *testing.T) {
}
}
/* Cockroach test DB doesn't seem to lock
func Test_query_events_with_crdb_locking(t *testing.T) {
type args struct {
searchQuery *eventstore.SearchQueryBuilder
}
type fields struct {
existingEvents []eventstore.Command
client *sql.DB
}
tests := []struct {
name string
fields fields
args args
lockOption eventstore.LockOption
wantErr bool
}{
{
name: "skip locked",
fields: fields{
client: testCRDBClient,
existingEvents: []eventstore.Command{
generateEvent(t, "306", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
generateEvent(t, "307", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
generateEvent(t, "308", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
},
},
args: args{
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
ResourceOwner("caos"),
},
lockOption: eventstore.LockOptionNoWait,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
DB: &database.DB{
DB: tt.fields.client,
Database: new(testDB),
},
}
// setup initial data for query
if _, err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
t.Errorf("error in setup = %v", err)
return
}
// first TX should lock and return all events
tx1, err := db.DB.Begin()
require.NoError(t, err)
defer func() {
require.NoError(t, tx1.Rollback())
}()
searchQuery1 := tt.args.searchQuery.LockRowsDuringTx(tx1, tt.lockOption)
gotEvents1 := []eventstore.Event{}
err = query(context.Background(), db, searchQuery1, eventstore.Reducer(func(event eventstore.Event) error {
gotEvents1 = append(gotEvents1, event)
return nil
}), true)
require.NoError(t, err)
assert.Len(t, gotEvents1, len(tt.fields.existingEvents))
// second TX should not return the events, and might return an error
tx2, err := db.DB.Begin()
require.NoError(t, err)
defer func() {
require.NoError(t, tx2.Rollback())
}()
searchQuery2 := tt.args.searchQuery.LockRowsDuringTx(tx1, tt.lockOption)
gotEvents2 := []eventstore.Event{}
err = query(context.Background(), db, searchQuery2, eventstore.Reducer(func(event eventstore.Event) error {
gotEvents2 = append(gotEvents2, event)
return nil
}), true)
if tt.wantErr {
require.Error(t, err)
}
require.NoError(t, err)
assert.Len(t, gotEvents2, 0)
})
}
}
*/
func Test_query_events_mocked(t *testing.T) {
type args struct {
query *eventstore.SearchQueryBuilder
@ -762,6 +845,69 @@ func Test_query_events_mocked(t *testing.T) {
wantErr: false,
},
},
{
name: "lock, wait",
args: args{
dest: &[]*repository.Event{},
query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
Limit(5).
AddQuery().
AggregateTypes("user").
Builder().LockRowsDuringTx(nil, eventstore.LockOptionWait),
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC LIMIT \$2 FOR UPDATE`,
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "lock, no wait",
args: args{
dest: &[]*repository.Event{},
query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
Limit(5).
AddQuery().
AggregateTypes("user").
Builder().LockRowsDuringTx(nil, eventstore.LockOptionNoWait),
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC LIMIT \$2 FOR UPDATE NOWAIT`,
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "lock, skip locked",
args: args{
dest: &[]*repository.Event{},
query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
Limit(5).
AddQuery().
AggregateTypes("user").
Builder().LockRowsDuringTx(nil, eventstore.LockOptionSkipLocked),
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC LIMIT \$2 FOR UPDATE SKIP LOCKED`,
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "error sql conn closed",
args: args{

View File

@ -22,6 +22,8 @@ type SearchQueryBuilder struct {
editorUser string
queries []*SearchQuery
tx *sql.Tx
lockRows bool
lockOption LockOption
allowTimeTravel bool
positionAfter float64
awaitOpenTransactions bool
@ -94,6 +96,10 @@ func (q SearchQueryBuilder) GetCreationDateBefore() time.Time {
return q.creationDateBefore
}
func (q SearchQueryBuilder) GetLockRows() (bool, LockOption) {
return q.lockRows, q.lockOption
}
// ensureInstanceID makes sure that the instance id is always set
func (b *SearchQueryBuilder) ensureInstanceID(ctx context.Context) {
if b.instanceID == nil && len(b.instanceIDs) == 0 && authz.GetInstance(ctx).InstanceID() != "" {
@ -307,6 +313,27 @@ func (builder *SearchQueryBuilder) CreationDateBefore(creationDate time.Time) *S
return builder
}
type LockOption int
const (
// Wait until the previous lock on all of the selected rows is released (default)
LockOptionWait LockOption = iota
// With NOWAIT, the statement reports an error, rather than waiting, if a selected row cannot be locked immediately.
LockOptionNoWait
// With SKIP LOCKED, any selected rows that cannot be immediately locked are skipped.
LockOptionSkipLocked
)
// LockRowsDuringTx locks the found rows for the duration of the transaction,
// using the [`FOR UPDATE`](https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE) lock strength.
// The lock is removed on transaction commit or rollback.
func (builder *SearchQueryBuilder) LockRowsDuringTx(tx *sql.Tx, option LockOption) *SearchQueryBuilder {
builder.tx = tx
builder.lockRows = true
builder.lockOption = option
return builder
}
// AddQuery creates a new sub query.
// All fields in the sub query are AND-connected in the storage request.
// Multiple sub queries are OR-connected in the storage request.