Files
zitadel/backend/v3/storage/database/dialect/postgres/tx.go
Silvan 61cab8878e feat(backend): state persisted objects (#9870)
This PR initiates the rework of Zitadel's backend to state-persisted
objects. This change is a step towards a more scalable and maintainable
architecture.

## Changes

* **New `/backend/v3` package**: A new package structure has been
introduced to house the reworked backend logic. This includes:
* `domain`: Contains the core business logic, commands, and repository
interfaces.
* `storage`: Implements the repository interfaces for database
interactions with new transactional tables.
  * `telemetry`: Provides logging and tracing capabilities.
* **Transactional Tables**: New database tables have been defined for
`instances`, `instance_domains`, `organizations`, and `org_domains`.
* **Projections**: New projections have been created to populate the new
relational tables from the existing event store, ensuring data
consistency during the migration.
* **Repositories**: New repositories provide an abstraction layer for
accessing and manipulating the data in the new tables.
* **Setup**: A new setup step for `TransactionalTables` has been added
to manage the database migrations for the new tables.

This PR lays the foundation for future work to fully transition to
state-persisted objects for these components, which will improve
performance and simplify data access patterns.

This PR initiates the rework of ZITADEL's backend to state-persisted
objects. This is a foundational step towards a new architecture that
will improve performance and maintainability.

The following objects are migrated from event-sourced aggregates to
state-persisted objects:

* Instances
  * incl. Domains
* Orgs
  * incl. Domains

The structure of the new backend implementation follows the software
architecture defined in this [wiki
page](https://github.com/zitadel/zitadel/wiki/Software-Architecturel).

This PR includes:

* The initial implementation of the new transactional repositories for
the objects listed above.
* Projections to populate the new relational tables from the existing
event store.
* Adjustments to the build and test process to accommodate the new
backend structure.

This is a work in progress and further changes will be made to complete
the migration.

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Iraq Jaber <iraq+github@zitadel.com>
Co-authored-by: Iraq <66622793+kkrime@users.noreply.github.com>
Co-authored-by: Tim Möhlmann <tim+github@zitadel.com>
2025-09-05 09:54:34 +01:00

108 lines
2.7 KiB
Go

package postgres
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/zitadel/zitadel/backend/v3/storage/database"
)
type pgxTx struct{ pgx.Tx }
var _ database.Transaction = (*pgxTx)(nil)
// Commit implements [database.Transaction].
func (tx *pgxTx) Commit(ctx context.Context) error {
err := tx.Tx.Commit(ctx)
return wrapError(err)
}
// Rollback implements [database.Transaction].
func (tx *pgxTx) Rollback(ctx context.Context) error {
err := tx.Tx.Rollback(ctx)
return wrapError(err)
}
// End implements [database.Transaction].
func (tx *pgxTx) End(ctx context.Context, err error) error {
if err != nil {
rollbackErr := tx.Rollback(ctx)
if rollbackErr != nil {
err = errors.Join(err, rollbackErr)
}
return err
}
return tx.Commit(ctx)
}
// Query implements [database.Transaction].
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
func (tx *pgxTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
rows, err := tx.Tx.Query(ctx, sql, args...)
if err != nil {
return nil, wrapError(err)
}
return &Rows{rows}, nil
}
// QueryRow implements [database.Transaction].
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
func (tx *pgxTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return &Row{tx.Tx.QueryRow(ctx, sql, args...)}
}
// Exec implements [database.Transaction].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
res, err := tx.Tx.Exec(ctx, sql, args...)
if err != nil {
return 0, wrapError(err)
}
return res.RowsAffected(), nil
}
// Begin implements [database.Transaction].
// As postgres does not support nested transactions we use savepoints to emulate them.
func (tx *pgxTx) Begin(ctx context.Context) (database.Transaction, error) {
savepoint, err := tx.Tx.Begin(ctx)
if err != nil {
return nil, wrapError(err)
}
return &pgxTx{savepoint}, nil
}
func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions {
if opts == nil {
return pgx.TxOptions{}
}
return pgx.TxOptions{
IsoLevel: isolationToPgx(opts.IsolationLevel),
AccessMode: accessModeToPgx(opts.AccessMode),
}
}
func isolationToPgx(isolation database.IsolationLevel) pgx.TxIsoLevel {
switch isolation {
case database.IsolationLevelSerializable:
return pgx.Serializable
case database.IsolationLevelReadCommitted:
return pgx.ReadCommitted
default:
return pgx.Serializable
}
}
func accessModeToPgx(accessMode database.AccessMode) pgx.TxAccessMode {
switch accessMode {
case database.AccessModeReadWrite:
return pgx.ReadWrite
case database.AccessModeReadOnly:
return pgx.ReadOnly
default:
return pgx.ReadWrite
}
}