From 1ee7a1ab7ca30438529ad33f4186c78b886aec0f Mon Sep 17 00:00:00 2001 From: Silvan Date: Fri, 22 Nov 2024 17:25:28 +0100 Subject: [PATCH] feat(eventstore): accept transaction in push (#8945) # Which Problems Are Solved Push is not capable of external transactions. # How the Problems Are Solved A new function `PushWithClient` is added to the eventstore framework which allows to pass a client which can either be a `*sql.Client` or `*sql.Tx` and is used during push. # Additional Changes Added interfaces to database package. # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Livio Spring Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../instance_debug_notification_log_test.go | 4 +- internal/database/database.go | 36 ++++++++ internal/eventstore/eventstore.go | 30 +++++-- internal/eventstore/eventstore_bench_test.go | 4 +- internal/eventstore/eventstore_pusher_test.go | 4 +- internal/eventstore/eventstore_test.go | 8 +- .../repository/mock/repository.mock.go | 24 +++-- .../repository/mock/repository.mock.impl.go | 17 ++-- internal/eventstore/v3/eventstore.go | 8 ++ internal/eventstore/v3/field.go | 15 ++-- internal/eventstore/v3/push.go | 90 +++++++++++-------- internal/eventstore/v3/sequence.go | 3 +- internal/eventstore/v3/unique_constraints.go | 4 +- 13 files changed, 177 insertions(+), 70 deletions(-) diff --git a/internal/command/instance_debug_notification_log_test.go b/internal/command/instance_debug_notification_log_test.go index 9190064f60..32fdd06618 100644 --- a/internal/command/instance_debug_notification_log_test.go +++ b/internal/command/instance_debug_notification_log_test.go @@ -199,7 +199,7 @@ func TestCommandSide_ChangeDebugNotificationProviderLog(t *testing.T) { }, }, { - name: "change, ok", + name: "change, ok 1", fields: fields{ eventstore: eventstoreExpect( t, @@ -232,7 +232,7 @@ func TestCommandSide_ChangeDebugNotificationProviderLog(t *testing.T) { }, }, { - name: "change, ok", + name: "change, ok 2", fields: fields{ eventstore: eventstoreExpect( t, diff --git a/internal/database/database.go b/internal/database/database.go index d6ccf2873c..0191f34b6d 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -18,6 +18,42 @@ import ( "github.com/zitadel/zitadel/internal/zerrors" ) +type QueryExecuter interface { + Query(query string, args ...any) (*sql.Rows, error) + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) + Exec(query string, args ...any) (sql.Result, error) + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) +} + +type Client interface { + QueryExecuter + BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) + Begin() (*sql.Tx, error) +} + +type Tx interface { + QueryExecuter + Commit() error + Rollback() error +} + +var ( + _ Client = (*sql.DB)(nil) + _ Tx = (*sql.Tx)(nil) +) + +func CloseTransaction(tx Tx, err error) error { + if err != nil { + rollbackErr := tx.Rollback() + logging.OnError(rollbackErr).Error("failed to rollback transaction") + return err + } + + commitErr := tx.Commit() + logging.OnError(commitErr).Error("failed to commit transaction") + return commitErr +} + type Config struct { Dialects map[string]interface{} `mapstructure:",remain"` EventPushConnRatio float64 diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index a8c8e923b5..6246978739 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -85,6 +85,12 @@ func (es *Eventstore) Health(ctx context.Context) error { // Push pushes the events in a single transaction // an event needs at least an aggregate func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error) { + return es.PushWithClient(ctx, nil, cmds...) +} + +// PushWithClient pushes the events in a single transaction using the provided database client +// an event needs at least an aggregate +func (es *Eventstore) PushWithClient(ctx context.Context, client database.Client, cmds ...Command) ([]Event, error) { if es.PushTimeout > 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, es.PushTimeout) @@ -100,12 +106,24 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error // https://github.com/zitadel/zitadel/issues/7202 retry: for i := 0; i <= es.maxRetries; i++ { - events, err = es.pusher.Push(ctx, cmds...) - var pgErr *pgconn.PgError - if !errors.As(err, &pgErr) || pgErr.ConstraintName != "events2_pkey" || pgErr.SQLState() != "23505" { + events, err = es.pusher.Push(ctx, client, cmds...) + // if there is a transaction passed the calling function needs to retry + if _, ok := client.(database.Tx); ok { break retry } - logging.WithError(err).Info("eventstore push retry") + var pgErr *pgconn.PgError + if !errors.As(err, &pgErr) { + break retry + } + if pgErr.ConstraintName == "events2_pkey" && pgErr.SQLState() == "23505" { + logging.WithError(err).Info("eventstore push retry") + continue + } + if pgErr.SQLState() == "CR000" || pgErr.SQLState() == "40001" { + logging.WithError(err).Info("eventstore push retry") + continue + } + break retry } if err != nil { return nil, err @@ -283,7 +301,9 @@ type Pusher interface { // Health checks if the connection to the storage is available Health(ctx context.Context) error // Push stores the actions - Push(ctx context.Context, commands ...Command) (_ []Event, err error) + Push(ctx context.Context, client database.QueryExecuter, commands ...Command) (_ []Event, err error) + // Client returns the underlying database connection + Client() *database.DB } type FillFieldsEvent interface { diff --git a/internal/eventstore/eventstore_bench_test.go b/internal/eventstore/eventstore_bench_test.go index 69b958abd8..582391e09f 100644 --- a/internal/eventstore/eventstore_bench_test.go +++ b/internal/eventstore/eventstore_bench_test.go @@ -69,7 +69,7 @@ func Benchmark_Push_SameAggregate(b *testing.B) { b.StartTimer() for n := 0; n < b.N; n++ { - _, err := store.Push(ctx, cmds...) + _, err := store.Push(ctx, store.Client().DB, cmds...) if err != nil { b.Error(err) } @@ -149,7 +149,7 @@ func Benchmark_Push_MultipleAggregate_Parallel(b *testing.B) { b.RunParallel(func(p *testing.PB) { for p.Next() { i++ - _, err := store.Push(ctx, commandCreator(strconv.Itoa(i))...) + _, err := store.Push(ctx, store.Client().DB, commandCreator(strconv.Itoa(i))...) if err != nil { b.Error(err) } diff --git a/internal/eventstore/eventstore_pusher_test.go b/internal/eventstore/eventstore_pusher_test.go index bd97b2e1e6..4e8e663667 100644 --- a/internal/eventstore/eventstore_pusher_test.go +++ b/internal/eventstore/eventstore_pusher_test.go @@ -607,7 +607,7 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) { } } -func pushAggregates(pusher eventstore.Pusher, aggregateCommands [][]eventstore.Command) []error { +func pushAggregates(es *eventstore.Eventstore, aggregateCommands [][]eventstore.Command) []error { wg := sync.WaitGroup{} errs := make([]error, 0) errsMu := sync.Mutex{} @@ -619,7 +619,7 @@ func pushAggregates(pusher eventstore.Pusher, aggregateCommands [][]eventstore.C go func(events []eventstore.Command) { <-ctx.Done() - _, err := pusher.Push(context.Background(), events...) //nolint:contextcheck + _, err := es.Push(context.Background(), events...) //nolint:contextcheck if err != nil { errsMu.Lock() errs = append(errs, err) diff --git a/internal/eventstore/eventstore_test.go b/internal/eventstore/eventstore_test.go index 33e80892c5..ec28b9c551 100644 --- a/internal/eventstore/eventstore_test.go +++ b/internal/eventstore/eventstore_test.go @@ -330,6 +330,12 @@ func Test_eventData(t *testing.T) { } } +var _ Pusher = (*testPusher)(nil) + +func (repo *testPusher) Client() *database.DB { + return nil +} + type testPusher struct { events []Event errs []error @@ -341,7 +347,7 @@ func (repo *testPusher) Health(ctx context.Context) error { return nil } -func (repo *testPusher) Push(ctx context.Context, commands ...Command) (events []Event, err error) { +func (repo *testPusher) Push(_ context.Context, _ database.QueryExecuter, commands ...Command) (events []Event, err error) { if len(repo.errs) != 0 { err, repo.errs = repo.errs[0], repo.errs[1:] return nil, err diff --git a/internal/eventstore/repository/mock/repository.mock.go b/internal/eventstore/repository/mock/repository.mock.go index 58a6c8f86f..de04fef8c9 100644 --- a/internal/eventstore/repository/mock/repository.mock.go +++ b/internal/eventstore/repository/mock/repository.mock.go @@ -136,6 +136,20 @@ func (m *MockPusher) EXPECT() *MockPusherMockRecorder { return m.recorder } +// Client mocks base method. +func (m *MockPusher) Client() *database.DB { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Client") + ret0, _ := ret[0].(*database.DB) + return ret0 +} + +// Client indicates an expected call of Client. +func (mr *MockPusherMockRecorder) Client() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Client", reflect.TypeOf((*MockPusher)(nil).Client)) +} + // Health mocks base method. func (m *MockPusher) Health(arg0 context.Context) error { m.ctrl.T.Helper() @@ -151,10 +165,10 @@ func (mr *MockPusherMockRecorder) Health(arg0 any) *gomock.Call { } // Push mocks base method. -func (m *MockPusher) Push(arg0 context.Context, arg1 ...eventstore.Command) ([]eventstore.Event, error) { +func (m *MockPusher) Push(arg0 context.Context, arg1 database.QueryExecuter, arg2 ...eventstore.Command) ([]eventstore.Event, error) { m.ctrl.T.Helper() - varargs := []any{arg0} - for _, a := range arg1 { + varargs := []any{arg0, arg1} + for _, a := range arg2 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Push", varargs...) @@ -164,8 +178,8 @@ func (m *MockPusher) Push(arg0 context.Context, arg1 ...eventstore.Command) ([]e } // Push indicates an expected call of Push. -func (mr *MockPusherMockRecorder) Push(arg0 any, arg1 ...any) *gomock.Call { +func (mr *MockPusherMockRecorder) Push(arg0, arg1 any, arg2 ...any) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]any{arg0}, arg1...) + varargs := append([]any{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), varargs...) } diff --git a/internal/eventstore/repository/mock/repository.mock.impl.go b/internal/eventstore/repository/mock/repository.mock.impl.go index d41521ad8f..365da7afe2 100644 --- a/internal/eventstore/repository/mock/repository.mock.impl.go +++ b/internal/eventstore/repository/mock/repository.mock.impl.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" + "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/repository" ) @@ -78,8 +79,8 @@ func (m *MockRepository) ExpectInstanceIDsError(err error) *MockRepository { // ExpectPush checks if the expectedCommands are send to the Push method. // The call will sleep at least the amount of passed duration. func (m *MockRepository) ExpectPush(expectedCommands []eventstore.Command, sleep time.Duration) *MockRepository { - m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, commands ...eventstore.Command) ([]eventstore.Event, error) { + m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, _ database.QueryExecuter, commands ...eventstore.Command) ([]eventstore.Event, error) { m.MockPusher.ctrl.T.Helper() time.Sleep(sleep) @@ -133,8 +134,8 @@ func (m *MockRepository) ExpectPush(expectedCommands []eventstore.Command, sleep func (m *MockRepository) ExpectPushFailed(err error, expectedCommands []eventstore.Command) *MockRepository { m.MockPusher.ctrl.T.Helper() - m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, commands ...eventstore.Command) ([]eventstore.Event, error) { + m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, _ database.QueryExecuter, commands ...eventstore.Command) ([]eventstore.Event, error) { if len(expectedCommands) != len(commands) { return nil, fmt.Errorf("unexpected amount of commands: want %d, got %d", len(expectedCommands), len(commands)) } @@ -195,8 +196,8 @@ func (e *mockEvent) CreatedAt() time.Time { } func (m *MockRepository) ExpectRandomPush(expectedCommands []eventstore.Command) *MockRepository { - m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, commands ...eventstore.Command) ([]eventstore.Event, error) { + m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, _ database.QueryExecuter, commands ...eventstore.Command) ([]eventstore.Event, error) { assert.Len(m.MockPusher.ctrl.T, commands, len(expectedCommands)) events := make([]eventstore.Event, len(commands)) @@ -213,8 +214,8 @@ func (m *MockRepository) ExpectRandomPush(expectedCommands []eventstore.Command) } func (m *MockRepository) ExpectRandomPushFailed(err error, expectedEvents []eventstore.Command) *MockRepository { - m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, events ...eventstore.Command) ([]eventstore.Event, error) { + m.MockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, _ database.QueryExecuter, events ...eventstore.Command) ([]eventstore.Event, error) { assert.Len(m.MockPusher.ctrl.T, events, len(expectedEvents)) return nil, err }, diff --git a/internal/eventstore/v3/eventstore.go b/internal/eventstore/v3/eventstore.go index 4ecaf6bad2..7c58f53f29 100644 --- a/internal/eventstore/v3/eventstore.go +++ b/internal/eventstore/v3/eventstore.go @@ -4,6 +4,7 @@ import ( "context" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" ) var ( @@ -11,12 +12,19 @@ var ( pushPlaceholderFmt string // uniqueConstraintPlaceholderFmt defines the format of the unique constraint error returned from the database uniqueConstraintPlaceholderFmt string + + _ eventstore.Pusher = (*Eventstore)(nil) ) type Eventstore struct { client *database.DB } +// Client implements the [eventstore.Pusher] +func (es *Eventstore) Client() *database.DB { + return es.client +} + func NewEventstore(client *database.DB) *Eventstore { switch client.Type() { case "cockroach": diff --git a/internal/eventstore/v3/field.go b/internal/eventstore/v3/field.go index 17037f8bcc..cfa9c08bba 100644 --- a/internal/eventstore/v3/field.go +++ b/internal/eventstore/v3/field.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/telemetry/tracing" "github.com/zitadel/zitadel/internal/zerrors" @@ -142,7 +143,7 @@ func buildSearchCondition(builder *strings.Builder, index int, conditions map[ev return args } -func handleFieldCommands(ctx context.Context, tx *sql.Tx, commands []eventstore.Command) error { +func handleFieldCommands(ctx context.Context, tx database.Tx, commands []eventstore.Command) error { for _, command := range commands { if len(command.Fields()) > 0 { if err := handleFieldOperations(ctx, tx, command.Fields()); err != nil { @@ -153,7 +154,7 @@ func handleFieldCommands(ctx context.Context, tx *sql.Tx, commands []eventstore. return nil } -func handleFieldFillEvents(ctx context.Context, tx *sql.Tx, events []eventstore.FillFieldsEvent) error { +func handleFieldFillEvents(ctx context.Context, tx database.Tx, events []eventstore.FillFieldsEvent) error { for _, event := range events { if len(event.Fields()) > 0 { if err := handleFieldOperations(ctx, tx, event.Fields()); err != nil { @@ -164,7 +165,7 @@ func handleFieldFillEvents(ctx context.Context, tx *sql.Tx, events []eventstore. return nil } -func handleFieldOperations(ctx context.Context, tx *sql.Tx, operations []*eventstore.FieldOperation) error { +func handleFieldOperations(ctx context.Context, tx database.Tx, operations []*eventstore.FieldOperation) error { for _, operation := range operations { if operation.Set != nil { if err := handleFieldSet(ctx, tx, operation.Set); err != nil { @@ -182,7 +183,7 @@ func handleFieldOperations(ctx context.Context, tx *sql.Tx, operations []*events return nil } -func handleFieldSet(ctx context.Context, tx *sql.Tx, field *eventstore.Field) error { +func handleFieldSet(ctx context.Context, tx database.Tx, field *eventstore.Field) error { if len(field.UpsertConflictFields) == 0 { return handleSearchInsert(ctx, tx, field) } @@ -193,7 +194,7 @@ const ( insertField = `INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)` ) -func handleSearchInsert(ctx context.Context, tx *sql.Tx, field *eventstore.Field) error { +func handleSearchInsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error { value, err := json.Marshal(field.Value.Value) if err != nil { return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value") @@ -222,7 +223,7 @@ const ( fieldsUpsertSuffix = ` RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)` ) -func handleSearchUpsert(ctx context.Context, tx *sql.Tx, field *eventstore.Field) error { +func handleSearchUpsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error { value, err := json.Marshal(field.Value.Value) if err != nil { return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value") @@ -268,7 +269,7 @@ func writeUpsertField(fields []eventstore.FieldType) string { const removeSearch = `DELETE FROM eventstore.fields WHERE ` -func handleSearchDelete(ctx context.Context, tx *sql.Tx, clauses map[eventstore.FieldType]any) error { +func handleSearchDelete(ctx context.Context, tx database.Tx, clauses map[eventstore.FieldType]any) error { if len(clauses) == 0 { return zerrors.ThrowInvalidArgument(nil, "V3-oqlBZ", "no conditions") } diff --git a/internal/eventstore/v3/push.go b/internal/eventstore/v3/push.go index 47a4c96dca..c0f66209c3 100644 --- a/internal/eventstore/v3/push.go +++ b/internal/eventstore/v3/push.go @@ -14,6 +14,7 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/telemetry/tracing" @@ -22,13 +23,45 @@ import ( var appNamePrefix = dialect.DBPurposeEventPusher.AppName() + "_" -func (es *Eventstore) Push(ctx context.Context, commands ...eventstore.Command) (events []eventstore.Event, err error) { +var pushTxOpts = &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + ReadOnly: false, +} + +func (es *Eventstore) Push(ctx context.Context, client database.QueryExecuter, commands ...eventstore.Command) (events []eventstore.Event, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() - tx, err := es.client.BeginTx(ctx, nil) - if err != nil { - return nil, err + var tx database.Tx + switch c := client.(type) { + case database.Tx: + tx = c + case database.Client: + // We cannot use READ COMMITTED on CockroachDB because we use cluster_logical_timestamp() which is not supported in this isolation level + var opts *sql.TxOptions + if es.client.Database.Type() == "postgres" { + opts = pushTxOpts + } + tx, err = c.BeginTx(ctx, opts) + if err != nil { + return nil, err + } + defer func() { + err = database.CloseTransaction(tx, err) + }() + default: + // We cannot use READ COMMITTED on CockroachDB because we use cluster_logical_timestamp() which is not supported in this isolation level + var opts *sql.TxOptions + if es.client.Database.Type() == "postgres" { + opts = pushTxOpts + } + tx, err = es.client.BeginTx(ctx, opts) + if err != nil { + return nil, err + } + defer func() { + err = database.CloseTransaction(tx, err) + }() } // tx is not closed because [crdb.ExecuteInTx] takes care of that var ( @@ -42,43 +75,30 @@ func (es *Eventstore) Push(ctx context.Context, commands ...eventstore.Command) return nil, err } - // needs to be set like this because psql complains about parameters in the SET statement - _, err = tx.ExecContext(ctx, "SET application_name = '"+appNamePrefix+authz.GetInstance(ctx).InstanceID()+"'") + sequences, err = latestSequences(ctx, tx, commands) if err != nil { - logging.WithError(err).Warn("failed to set application name") return nil, err } - err = crdb.ExecuteInTx(ctx, &transaction{tx}, func() (err error) { - inTxCtx, span := tracing.NewSpan(ctx) - defer func() { span.EndWithError(err) }() + events, err = insertEvents(ctx, tx, sequences, commands) + if err != nil { + return nil, err + } - sequences, err = latestSequences(inTxCtx, tx, commands) + if err = handleUniqueConstraints(ctx, tx, commands); err != nil { + return nil, err + } + + // CockroachDB by default does not allow multiple modifications of the same table using ON CONFLICT + // Thats why we enable it manually + if es.client.Type() == "cockroach" { + _, err = tx.Exec("SET enable_multiple_modifications_of_table = on") if err != nil { - return err + return nil, err } + } - events, err = insertEvents(inTxCtx, tx, sequences, commands) - if err != nil { - return err - } - - if err = handleUniqueConstraints(inTxCtx, tx, commands); err != nil { - return err - } - - // CockroachDB by default does not allow multiple modifications of the same table using ON CONFLICT - // Thats why we enable it manually - if es.client.Type() == "cockroach" { - _, err = tx.Exec("SET enable_multiple_modifications_of_table = on") - if err != nil { - return err - } - } - - return handleFieldCommands(inTxCtx, tx, commands) - }) - + err = handleFieldCommands(ctx, tx, commands) if err != nil { return nil, err } @@ -89,7 +109,7 @@ func (es *Eventstore) Push(ctx context.Context, commands ...eventstore.Command) //go:embed push.sql var pushStmt string -func insertEvents(ctx context.Context, tx *sql.Tx, sequences []*latestSequence, commands []eventstore.Command) ([]eventstore.Event, error) { +func insertEvents(ctx context.Context, tx database.Tx, sequences []*latestSequence, commands []eventstore.Command) ([]eventstore.Event, error) { events, placeholders, args, err := mapCommands(commands, sequences) if err != nil { return nil, err @@ -186,7 +206,7 @@ func mapCommands(commands []eventstore.Command, sequences []*latestSequence) (ev } type transaction struct { - *sql.Tx + database.Tx } var _ crdb.Tx = (*transaction)(nil) diff --git a/internal/eventstore/v3/sequence.go b/internal/eventstore/v3/sequence.go index 8d84ef4755..7d97e1080d 100644 --- a/internal/eventstore/v3/sequence.go +++ b/internal/eventstore/v3/sequence.go @@ -10,6 +10,7 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -22,7 +23,7 @@ type latestSequence struct { //go:embed sequences_query.sql var latestSequencesStmt string -func latestSequences(ctx context.Context, tx *sql.Tx, commands []eventstore.Command) ([]*latestSequence, error) { +func latestSequences(ctx context.Context, tx database.Tx, commands []eventstore.Command) ([]*latestSequence, error) { sequences := commandsToSequences(ctx, commands) conditions, args := sequencesToSql(sequences) diff --git a/internal/eventstore/v3/unique_constraints.go b/internal/eventstore/v3/unique_constraints.go index e3bae89805..9c4d1831c4 100644 --- a/internal/eventstore/v3/unique_constraints.go +++ b/internal/eventstore/v3/unique_constraints.go @@ -2,7 +2,6 @@ package eventstore import ( "context" - "database/sql" _ "embed" "errors" "fmt" @@ -11,6 +10,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -24,7 +24,7 @@ var ( addConstraintStmt string ) -func handleUniqueConstraints(ctx context.Context, tx *sql.Tx, commands []eventstore.Command) error { +func handleUniqueConstraints(ctx context.Context, tx database.Tx, commands []eventstore.Command) error { deletePlaceholders := make([]string, 0) deleteArgs := make([]any, 0)