fix(eventstore): retry push on primary key sequence collision (#7420)

* fix(eventstore): retry push on primary key sequence collision

* MaxRetries config option and unit test
This commit is contained in:
Tim Möhlmann 2024-02-23 10:29:10 +02:00 committed by GitHub
parent 71373caab3
commit 1890e28f79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 244 additions and 9 deletions

View File

@ -546,6 +546,8 @@ Quotas:
Eventstore:
# Sets the maximum duration of transactions pushing events
PushTimeout: 15s #ZITADEL_EVENTSTORE_PUSHTIMEOUT
# Maximum amount of push retries in case of primary key violation on the sequence
MaxRetries: 5 #ZITADEL_EVENTSTORE_MAXRETRIES
DefaultInstance:
InstanceName: ZITADEL # ZITADEL_DEFAULTINSTANCE_INSTANCENAME

View File

@ -6,6 +6,7 @@ import (
type Config struct {
PushTimeout time.Duration
MaxRetries uint32
Pusher Pusher
Querier Querier

View File

@ -2,10 +2,14 @@ package eventstore
import (
"context"
"errors"
"sort"
"sync"
"time"
"github.com/jackc/pgconn"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
)
@ -13,6 +17,7 @@ import (
// and filters the stored events
type Eventstore struct {
PushTimeout time.Duration
maxRetries int
pusher Pusher
querier Querier
@ -52,6 +57,7 @@ type eventTypeInterceptors struct {
func NewEventstore(config *Config) *Eventstore {
return &Eventstore{
PushTimeout: config.PushTimeout,
maxRetries: int(config.MaxRetries),
pusher: config.Pusher,
querier: config.Querier,
@ -77,7 +83,23 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error
ctx, cancel = context.WithTimeout(ctx, es.PushTimeout)
defer cancel()
}
events, err := es.pusher.Push(ctx, cmds...)
var (
events []Event
err error
)
// Retry when there is a collision of the sequence as part of the primary key.
// "duplicate key value violates unique constraint \"events2_pkey\" (SQLSTATE 23505)"
// https://github.com/zitadel/zitadel/issues/7202
retry:
for i := 0; i <= es.maxRetries; i++ {
events, err = es.pusher.Push(ctx, cmds...)
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) || pgErr.ConstraintName != "events2_pkey" || pgErr.SQLState() != "23505" {
break retry
}
logging.WithError(err).Info("eventstore push retry")
}
if err != nil {
return nil, err
}

View File

@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/jackc/pgconn"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/service"
"github.com/zitadel/zitadel/internal/zerrors"
@ -329,7 +331,7 @@ func Test_eventData(t *testing.T) {
type testPusher struct {
events []Event
err error
errs []error
t *testing.T
}
@ -339,8 +341,9 @@ func (repo *testPusher) Health(ctx context.Context) error {
}
func (repo *testPusher) Push(ctx context.Context, commands ...Command) (events []Event, err error) {
if repo.err != nil {
return nil, repo.err
if len(repo.errs) != 0 {
err, repo.errs = repo.errs[0], repo.errs[1:]
return nil, err
}
if len(repo.events) != len(commands) {
@ -439,6 +442,7 @@ func TestEventstore_Push(t *testing.T) {
events []Command
}
type fields struct {
maxRetries int
pusher *testPusher
eventMapper map[EventType]func(Event) (Event, error)
}
@ -465,6 +469,51 @@ func TestEventstore_Push(t *testing.T) {
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
},
{
name: "one aggregate one event, retry disabled",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 0,
pusher: &testPusher{
t: t,
events: []Event{
@ -515,6 +564,7 @@ func TestEventstore_Push(t *testing.T) {
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: []Event{
@ -586,6 +636,7 @@ func TestEventstore_Push(t *testing.T) {
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: combineEventLists(
@ -658,9 +709,10 @@ func TestEventstore_Push(t *testing.T) {
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
err: zerrors.ThrowInternal(nil, "V2-qaa4S", "test err"),
t: t,
errs: []error{zerrors.ThrowInternal(nil, "V2-qaa4S", "test err")},
},
},
res: res{
@ -681,21 +733,179 @@ func TestEventstore_Push(t *testing.T) {
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
err: zerrors.ThrowInternal(nil, "V2-qaa4S", "test err"),
t: t,
errs: []error{zerrors.ThrowInternal(nil, "V2-qaa4S", "test err")},
},
},
res: res{
wantErr: true,
},
},
{
name: "retry succeeds",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
errs: []error{
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
},
{
name: "retry fails",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
errs: []error{
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
res: res{wantErr: true},
},
{
name: "retry disabled",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 0,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
errs: []error{
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
res: res{wantErr: true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eventInterceptors = map[EventType]eventTypeInterceptors{}
es := &Eventstore{
pusher: tt.fields.pusher,
maxRetries: tt.fields.maxRetries,
pusher: tt.fields.pusher,
}
for eventType, mapper := range tt.fields.eventMapper {
RegisterFilterEventMapper("test", eventType, mapper)