mirror of
https://github.com/zitadel/zitadel.git
synced 2025-12-24 01:37:11 +00:00
Improves compatibility of eventstore and related database components with the new relational table package. ## Which problems are solved 1. **Incompatible Database Interfaces**: The existing eventstore was tightly coupled to the database package, which is incompatible with the new, more abstract relational table package in v3. This prevented the new command-side logic from pushing events to the legacy eventstore. 2. **Missing Health Checks**: The database interfaces in the new package lacked a Ping method, making it impossible to perform health checks on database connections. 3. **Event Publishing Logic**: The command handling logic in domain needed a way to collect and push events to the legacy eventstore after a command was successfully executed. ## How the problems are solved 1. **`LegacyEventstore` Interface**: * A new `LegacyEventstore` interface is introduced in the new `database/eventstore` . This interface exposes a `PushWithNewClient` method that accepts the new `database.QueryExecutor` interface, decoupling the v3 domain from the legacy implementation. * The `internal/eventstore.Eventstore` now implements this interface. A wrapper, PushWithClient, is added to convert the old database client types (`*sql.DB`, `*sql.Tx`) into the new `QueryExecutor` types before calling `PushWithNewClient`. 2. **Database Interface Updates**: * The `database.Pool` and `database.Client` interfaces in `storage/eventstore` have been updated to include a Ping method, allowing for consistent health checks across different database dialects. * The `postgres` and `sql` dialect implementations have been updated to support this new method. 3. **Command and Invoker Refactoring**: * The `Commander` interface in domain now includes an `Events() []legacy_es.Command` method. This allows commands to declare which events they will generate. * The `eventCollector` in the invoker logic has been redesigned. It now ensures a database transaction is started before executing a command. After successful execution, it calls the `Events()` method on the command to collect the generated events and appends them to a list. * The `eventStoreInvoker` then pushes all collected events to the legacy eventstore using the new `LegacyEventstore` interface, ensuring that events are only pushed if the entire command (and any sub-commands) executes successfully within the transaction. 4. **Testing**: * New unit tests have been added for the invoker to verify that events are correctly collected from single commands, batched commands, and nested commands. These changes create a clean bridge between the new v3 command-side logic and the existing v1 eventstore, allowing for incremental adoption of the new architecture while maintaining full functionality. ## Additional Information closes https://github.com/zitadel/zitadel/issues/10442
114 lines
2.9 KiB
Go
114 lines
2.9 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
|
|
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
|
)
|
|
|
|
type Transaction struct{ pgx.Tx }
|
|
|
|
func PGxTx(tx pgx.Tx) *Transaction {
|
|
return &Transaction{
|
|
Tx: tx,
|
|
}
|
|
}
|
|
|
|
var _ database.Transaction = (*Transaction)(nil)
|
|
|
|
// Commit implements [database.Transaction].
|
|
func (tx *Transaction) Commit(ctx context.Context) error {
|
|
err := tx.Tx.Commit(ctx)
|
|
return wrapError(err)
|
|
}
|
|
|
|
// Rollback implements [database.Transaction].
|
|
func (tx *Transaction) Rollback(ctx context.Context) error {
|
|
err := tx.Tx.Rollback(ctx)
|
|
return wrapError(err)
|
|
}
|
|
|
|
// End implements [database.Transaction].
|
|
func (tx *Transaction) End(ctx context.Context, err error) error {
|
|
if err != nil {
|
|
rollbackErr := tx.Rollback(ctx)
|
|
if rollbackErr != nil {
|
|
err = errors.Join(err, rollbackErr)
|
|
}
|
|
return err
|
|
}
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
// Query implements [database.Transaction].
|
|
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
|
|
func (tx *Transaction) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
|
rows, err := tx.Tx.Query(ctx, sql, args...)
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
return &Rows{rows}, nil
|
|
}
|
|
|
|
// QueryRow implements [database.Transaction].
|
|
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
|
|
func (tx *Transaction) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
|
return &Row{tx.Tx.QueryRow(ctx, sql, args...)}
|
|
}
|
|
|
|
// Exec implements [database.Transaction].
|
|
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
|
func (tx *Transaction) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
|
|
res, err := tx.Tx.Exec(ctx, sql, args...)
|
|
if err != nil {
|
|
return 0, wrapError(err)
|
|
}
|
|
return res.RowsAffected(), nil
|
|
}
|
|
|
|
// Begin implements [database.Transaction].
|
|
// As postgres does not support nested transactions we use savepoints to emulate them.
|
|
func (tx *Transaction) Begin(ctx context.Context) (database.Transaction, error) {
|
|
savepoint, err := tx.Tx.Begin(ctx)
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
return &Transaction{savepoint}, nil
|
|
}
|
|
|
|
func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions {
|
|
if opts == nil {
|
|
return pgx.TxOptions{}
|
|
}
|
|
|
|
return pgx.TxOptions{
|
|
IsoLevel: isolationToPgx(opts.IsolationLevel),
|
|
AccessMode: accessModeToPgx(opts.AccessMode),
|
|
}
|
|
}
|
|
|
|
func isolationToPgx(isolation database.IsolationLevel) pgx.TxIsoLevel {
|
|
switch isolation {
|
|
case database.IsolationLevelSerializable:
|
|
return pgx.Serializable
|
|
case database.IsolationLevelReadCommitted:
|
|
return pgx.ReadCommitted
|
|
default:
|
|
return pgx.Serializable
|
|
}
|
|
}
|
|
|
|
func accessModeToPgx(accessMode database.AccessMode) pgx.TxAccessMode {
|
|
switch accessMode {
|
|
case database.AccessModeReadWrite:
|
|
return pgx.ReadWrite
|
|
case database.AccessModeReadOnly:
|
|
return pgx.ReadOnly
|
|
default:
|
|
return pgx.ReadWrite
|
|
}
|
|
}
|