This commit is contained in:
adlerhurst
2025-03-24 11:12:47 +01:00
parent d0044058ec
commit a41a998028
12 changed files with 418 additions and 90 deletions

View File

@@ -21,6 +21,8 @@ type Transaction interface {
Rollback(ctx context.Context) error
End(ctx context.Context, err error) error
Begin(ctx context.Context, opts *TransactionOptions) (Transaction, error)
QueryExecutor
}

View File

@@ -3,6 +3,7 @@ package gosql
import (
"context"
"database/sql"
"errors"
"github.com/zitadel/zitadel/backend/storage/database"
)
@@ -49,6 +50,12 @@ func (tx *sqlTx) Exec(ctx context.Context, sql string, args ...any) error {
return err
}
// Begin implements [database.Transaction].
// it is unimplemented
func (tx *sqlTx) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
return nil, errors.New("nested transactions are not supported")
}
func transactionOptionsToSql(opts *database.TransactionOptions) *sql.TxOptions {
if opts == nil {
return nil

View File

@@ -44,13 +44,24 @@ func (tx *pgxTx) QueryRow(ctx context.Context, sql string, args ...any) database
return tx.Tx.QueryRow(ctx, sql, args...)
}
// Exec implements [database.Pool].
// Exec implements [database.Transaction].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) error {
_, err := tx.Tx.Exec(ctx, sql, args...)
return err
}
// Begin implements [database.Transaction].
// As postgres does not support nested transactions we use savepoints to emulate them.
// TransactionOptions are ignored as savepoints do not support changing isolation levels.
func (tx *pgxTx) Begin(ctx context.Context, _ *database.TransactionOptions) (database.Transaction, error) {
savepoint, err := tx.Tx.Begin(ctx)
if err != nil {
return nil, err
}
return &pgxTx{savepoint}, nil
}
func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions {
if opts == nil {
return pgx.TxOptions{}

View File

@@ -0,0 +1,52 @@
package database
// import (
// "context"
// "fmt"
// "github.com/zitadel/zitadel/backend/handler"
// )
// func Begin[In, Out, NextOut any](ctx context.Context, beginner Beginner, opts *TransactionOptions) handler.Defer[In, Out, NextOut] {
// // func(ctx context.Context, in *VerifyEmail) (_ *VerifyEmail, _ func(context.Context, error) error, err error) {
// return func(handle handler.DeferrableHandle[In, Out], next handler.Handle[Out, NextOut]) handler.Handle[In, NextOut] {
// return func(ctx context.Context, in In) (out NextOut, err error) {
// tx, err := beginner.Begin(ctx, opts)
// if err != nil {
// return out, err
// }
// defer func() {
// if err != nil {
// rollbackErr := tx.Rollback(ctx)
// if rollbackErr != nil {
// err = fmt.Errorf("query failed: %w, rollback failed: %v", err, rollbackErr)
// }
// } else {
// err = tx.Commit(ctx)
// }
// }()
// return handle(ctx, in, tx)
// }
// }
// }
// type QueryExecutorSetter interface {
// SetQueryExecutor(QueryExecutor)
// }
// func Begin[In QueryExecutorSetter](ctx context.Context, beginner Beginner, in In) (_ In, _ func(context.Context, error) error, err error) {
// tx, err := beginner.Begin(ctx, nil)
// if err != nil {
// return in, nil, err
// }
// in.SetQueryExecutor(tx)
// return in, func(ctx context.Context, err error) error {
// err = tx.End(ctx, err)
// if err != nil {
// return err
// }
// in.SetQueryExecutor(beginner)
// return nil
// }, err
// }

View File

@@ -118,6 +118,12 @@ func (tx *Transaction) Exec(ctx context.Context, stmt string, args ...any) error
return nil
}
// Begin implements [database.Transaction].
// it is unimplemented
func (tx *Transaction) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
return nil, errors.New("nested transactions are not supported")
}
// Query implements [database.Transaction].
func (tx *Transaction) Query(ctx context.Context, stmt string, args ...any) (database.Rows, error) {
e := tx.nextExpecter()