From 5d309bcf5bbb4b7cb2fd3c582803b137192a259d Mon Sep 17 00:00:00 2001 From: adlerhurst <27845747+adlerhurst@users.noreply.github.com> Date: Fri, 25 Jul 2025 16:16:45 +0200 Subject: [PATCH] feat(database): implement sql adapter for backwards compatibility --- .../v3/storage/database/dialect/sql/conn.go | 59 ++++++++++ .../v3/storage/database/dialect/sql/doc.go | 3 + .../v3/storage/database/dialect/sql/error.go | 41 +++++++ .../v3/storage/database/dialect/sql/pool.go | 74 +++++++++++++ .../v3/storage/database/dialect/sql/rows.go | 79 ++++++++++++++ .../storage/database/dialect/sql/savepoint.go | 69 ++++++++++++ backend/v3/storage/database/dialect/sql/tx.go | 103 ++++++++++++++++++ 7 files changed, 428 insertions(+) create mode 100644 backend/v3/storage/database/dialect/sql/conn.go create mode 100644 backend/v3/storage/database/dialect/sql/doc.go create mode 100644 backend/v3/storage/database/dialect/sql/error.go create mode 100644 backend/v3/storage/database/dialect/sql/pool.go create mode 100644 backend/v3/storage/database/dialect/sql/rows.go create mode 100644 backend/v3/storage/database/dialect/sql/savepoint.go create mode 100644 backend/v3/storage/database/dialect/sql/tx.go diff --git a/backend/v3/storage/database/dialect/sql/conn.go b/backend/v3/storage/database/dialect/sql/conn.go new file mode 100644 index 0000000000..967ed2d1e8 --- /dev/null +++ b/backend/v3/storage/database/dialect/sql/conn.go @@ -0,0 +1,59 @@ +package sql + +import ( + "context" + "database/sql" + + "github.com/zitadel/zitadel/backend/v3/storage/database" +) + +type sqlConn struct { + *sql.Conn +} + +var _ database.Client = (*sqlConn)(nil) + +// Release implements [database.Client]. +func (c *sqlConn) Release(_ context.Context) error { + return c.Close() +} + +// Begin implements [database.Client]. +func (c *sqlConn) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) { + tx, err := c.BeginTx(ctx, transactionOptionsToSQL(opts)) + if err != nil { + return nil, wrapError(err) + } + return &sqlTx{tx}, nil +} + +// Query implements sql.Client. +// Subtle: this method shadows the method (*Conn).Query of pgxConn.Conn. +func (c *sqlConn) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) { + rows, err := c.QueryContext(ctx, sql, args...) + if err != nil { + return nil, wrapError(err) + } + return &Rows{rows}, nil +} + +// QueryRow implements sql.Client. +// Subtle: this method shadows the method (*Conn).QueryRow of pgxConn.Conn. +func (c *sqlConn) QueryRow(ctx context.Context, sql string, args ...any) database.Row { + return &Row{c.QueryRowContext(ctx, sql, args...)} +} + +// Exec implements [database.Pool]. +// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool. +func (c *sqlConn) Exec(ctx context.Context, sql string, args ...any) (int64, error) { + res, err := c.ExecContext(ctx, sql, args...) + if err != nil { + return 0, wrapError(err) + } + return res.RowsAffected() +} + +// Migrate implements [database.Migrator]. +func (c *sqlConn) Migrate(ctx context.Context) error { + return ErrMigrate +} diff --git a/backend/v3/storage/database/dialect/sql/doc.go b/backend/v3/storage/database/dialect/sql/doc.go new file mode 100644 index 0000000000..a6b2782dc5 --- /dev/null +++ b/backend/v3/storage/database/dialect/sql/doc.go @@ -0,0 +1,3 @@ +// [database/sql] implementation of the interfaces defined in the database package. +// This package is used to migrate from event driven to relational Zitadel. +package sql diff --git a/backend/v3/storage/database/dialect/sql/error.go b/backend/v3/storage/database/dialect/sql/error.go new file mode 100644 index 0000000000..f01b1e70e1 --- /dev/null +++ b/backend/v3/storage/database/dialect/sql/error.go @@ -0,0 +1,41 @@ +package sql + +import ( + "database/sql" + "errors" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "github.com/zitadel/zitadel/backend/v3/storage/database" +) + +var ErrMigrate = errors.New("sql does not support migrations, use a different dialect") + +func wrapError(err error) error { + if err == nil { + return nil + } + if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, sql.ErrNoRows) { + return database.NewNoRowFoundError(err) + } + var pgxErr *pgconn.PgError + if !errors.As(err, &pgxErr) { + return database.NewUnknownError(err) + } + switch pgxErr.Code { + // 23514: check_violation - A value violates a CHECK constraint. + case "23514": + return database.NewCheckError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr) + // 23505: unique_violation - A value violates a UNIQUE constraint. + case "23505": + return database.NewUniqueError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr) + // 23503: foreign_key_violation - A value violates a foreign key constraint. + case "23503": + return database.NewForeignKeyError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr) + // 23502: not_null_violation - A value violates a NOT NULL constraint. + case "23502": + return database.NewNotNullError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr) + } + return database.NewUnknownError(err) +} diff --git a/backend/v3/storage/database/dialect/sql/pool.go b/backend/v3/storage/database/dialect/sql/pool.go new file mode 100644 index 0000000000..0c4a0cac66 --- /dev/null +++ b/backend/v3/storage/database/dialect/sql/pool.go @@ -0,0 +1,74 @@ +package sql + +import ( + "context" + "database/sql" + + "github.com/zitadel/zitadel/backend/v3/storage/database" +) + +type sqlPool struct { + *sql.DB +} + +var _ database.Pool = (*sqlPool)(nil) + +func SQLPool(db *sql.DB) *sqlPool { + return &sqlPool{ + DB: db, + } +} + +// Acquire implements [database.Pool]. +func (c *sqlPool) Acquire(ctx context.Context) (database.Client, error) { + conn, err := c.Conn(ctx) + if err != nil { + return nil, wrapError(err) + } + return &sqlConn{Conn: conn}, nil +} + +// Query implements [database.Pool]. +// Subtle: this method shadows the method (Pool).Query of pgxPool.Pool. +func (c *sqlPool) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) { + rows, err := c.QueryContext(ctx, sql, args...) + if err != nil { + return nil, wrapError(err) + } + return &Rows{rows}, nil +} + +// QueryRow implements [database.Pool]. +// Subtle: this method shadows the method (Pool).QueryRow of pgxPool.Pool. +func (c *sqlPool) QueryRow(ctx context.Context, sql string, args ...any) database.Row { + return &Row{c.QueryRowContext(ctx, sql, args...)} +} + +// Exec implements [database.Pool]. +// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool. +func (c *sqlPool) Exec(ctx context.Context, sql string, args ...any) (int64, error) { + res, err := c.ExecContext(ctx, sql, args...) + if err != nil { + return 0, wrapError(err) + } + return res.RowsAffected() +} + +// Begin implements [database.Pool]. +func (c *sqlPool) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) { + tx, err := c.BeginTx(ctx, transactionOptionsToSQL(opts)) + if err != nil { + return nil, wrapError(err) + } + return &sqlTx{tx}, nil +} + +// Close implements [database.Pool]. +func (c *sqlPool) Close(_ context.Context) error { + return c.DB.Close() +} + +// Migrate implements [database.Migrator]. +func (c *sqlPool) Migrate(ctx context.Context) error { + return ErrMigrate +} diff --git a/backend/v3/storage/database/dialect/sql/rows.go b/backend/v3/storage/database/dialect/sql/rows.go new file mode 100644 index 0000000000..b2265ca1ca --- /dev/null +++ b/backend/v3/storage/database/dialect/sql/rows.go @@ -0,0 +1,79 @@ +package sql + +import ( + "database/sql" + + pgxscan "github.com/georgysavva/scany/v2/dbscan" + "github.com/jackc/pgx/v5" + + "github.com/zitadel/zitadel/backend/v3/storage/database" +) + +var ( + _ database.Rows = (*Rows)(nil) + _ database.CollectableRows = (*Rows)(nil) + _ database.Row = (*Row)(nil) +) + +type Row struct{ pgx.Row } + +// Scan implements [database.Row]. +// Subtle: this method shadows the method ([pgx.Row]).Scan of Row.Row. +func (r *Row) Scan(dest ...any) error { + return wrapError(r.Row.Scan(dest...)) +} + +type Rows struct{ *sql.Rows } + +// Err implements [database.Rows]. +// Subtle: this method shadows the method ([pgx.Rows]).Err of Rows.Rows. +func (r *Rows) Err() error { + return wrapError(r.Rows.Err()) +} + +func (r *Rows) Scan(dest ...any) error { + return wrapError(r.Rows.Scan(dest...)) +} + +// Collect implements [database.CollectableRows]. +// See [this page](https://github.com/georgysavva/scany/blob/master/dbscan/doc.go#L8) for additional details. +func (r *Rows) Collect(dest any) (err error) { + defer func() { + closeErr := r.Close() + if err == nil { + err = closeErr + } + }() + return wrapError(pgxscan.ScanAll(dest, r.Rows)) +} + +// CollectFirst implements [database.CollectableRows]. +// See [this page](https://github.com/georgysavva/scany/blob/master/dbscan/doc.go#L8) for additional details. +func (r *Rows) CollectFirst(dest any) (err error) { + defer func() { + closeErr := r.Close() + if err == nil { + err = closeErr + } + }() + return wrapError(pgxscan.ScanRow(dest, r.Rows)) +} + +// CollectExactlyOneRow implements [database.CollectableRows]. +// See [this page](https://github.com/georgysavva/scany/blob/master/dbscan/doc.go#L8) for additional details. +func (r *Rows) CollectExactlyOneRow(dest any) (err error) { + defer func() { + closeErr := r.Close() + if err == nil { + err = closeErr + } + }() + return wrapError(pgxscan.ScanOne(dest, r.Rows)) +} + +// Close implements [database.Rows]. +// Subtle: this method shadows the method (Rows).Close of Rows.Rows. +func (r *Rows) Close() error { + r.Rows.Close() + return nil +} diff --git a/backend/v3/storage/database/dialect/sql/savepoint.go b/backend/v3/storage/database/dialect/sql/savepoint.go new file mode 100644 index 0000000000..1933771e9a --- /dev/null +++ b/backend/v3/storage/database/dialect/sql/savepoint.go @@ -0,0 +1,69 @@ +package sql + +import ( + "context" + "errors" + + "github.com/zitadel/zitadel/backend/v3/storage/database" +) + +var _ database.Transaction = (*sqlSavepoint)(nil) + +const ( + savepointName = "zitadel_savepoint" + createSavepoint = "SAVEPOINT " + savepointName + rollbackToSavepoint = "ROLLBACK TO SAVEPOINT " + savepointName + commitSavepoint = "RELEASE SAVEPOINT " + savepointName +) + +type sqlSavepoint struct { + parent database.Transaction +} + +// Commit implements [database.Transaction]. +func (s *sqlSavepoint) Commit(ctx context.Context) error { + _, err := s.parent.Exec(ctx, commitSavepoint) + return wrapError(err) +} + +// Rollback implements [database.Transaction]. +func (s *sqlSavepoint) Rollback(ctx context.Context) error { + _, err := s.parent.Exec(ctx, rollbackToSavepoint) + return wrapError(err) +} + +// End implements [database.Transaction]. +func (s *sqlSavepoint) End(ctx context.Context, err error) error { + if err != nil { + rollbackErr := s.Rollback(ctx) + if rollbackErr != nil { + err = errors.Join(err, rollbackErr) + } + return err + } + return s.Commit(ctx) +} + +// Query implements [database.Transaction]. +// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx. +func (s *sqlSavepoint) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) { + return s.parent.Query(ctx, sql, args...) +} + +// QueryRow implements [database.Transaction]. +// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx. +func (s *sqlSavepoint) QueryRow(ctx context.Context, sql string, args ...any) database.Row { + return s.parent.QueryRow(ctx, sql, args...) +} + +// Exec implements [database.Transaction]. +// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool. +func (s *sqlSavepoint) Exec(ctx context.Context, sql string, args ...any) (int64, error) { + return s.parent.Exec(ctx, sql, args...) +} + +// Begin implements [database.Transaction]. +// As postgres does not support nested transactions we use savepoints to emulate them. +func (s *sqlSavepoint) Begin(ctx context.Context) (database.Transaction, error) { + return s.parent.Begin(ctx) +} diff --git a/backend/v3/storage/database/dialect/sql/tx.go b/backend/v3/storage/database/dialect/sql/tx.go new file mode 100644 index 0000000000..c2da0e650b --- /dev/null +++ b/backend/v3/storage/database/dialect/sql/tx.go @@ -0,0 +1,103 @@ +package sql + +import ( + "context" + "database/sql" + "errors" + + "github.com/zitadel/zitadel/backend/v3/storage/database" +) + +type sqlTx struct{ *sql.Tx } + +var _ database.Transaction = (*sqlTx)(nil) + +func SQLTx(tx *sql.Tx) *sqlTx { + return &sqlTx{ + Tx: tx, + } +} + +// Commit implements [database.Transaction]. +func (tx *sqlTx) Commit(ctx context.Context) error { + return wrapError(tx.Tx.Commit()) +} + +// Rollback implements [database.Transaction]. +func (tx *sqlTx) Rollback(ctx context.Context) error { + return wrapError(tx.Tx.Rollback()) +} + +// End implements [database.Transaction]. +func (tx *sqlTx) End(ctx context.Context, err error) error { + if err != nil { + rollbackErr := tx.Rollback(ctx) + if rollbackErr != nil { + err = errors.Join(err, rollbackErr) + } + return err + } + return tx.Commit(ctx) +} + +// Query implements [database.Transaction]. +// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx. +func (tx *sqlTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) { + rows, err := tx.QueryContext(ctx, sql, args...) + if err != nil { + return nil, wrapError(err) + } + return &Rows{rows}, nil +} + +// QueryRow implements [database.Transaction]. +// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx. +func (tx *sqlTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row { + return &Row{tx.QueryRowContext(ctx, sql, args...)} +} + +// Exec implements [database.Transaction]. +// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool. +func (tx *sqlTx) Exec(ctx context.Context, sql string, args ...any) (int64, error) { + res, err := tx.ExecContext(ctx, sql, args...) + if err != nil { + return 0, wrapError(err) + } + return res.RowsAffected() +} + +// Begin implements [database.Transaction]. +// As postgres does not support nested transactions we use savepoints to emulate them. +func (tx *sqlTx) Begin(ctx context.Context) (database.Transaction, error) { + _, err := tx.ExecContext(ctx, createSavepoint) + if err != nil { + return nil, wrapError(err) + } + return &sqlSavepoint{tx}, nil +} + +func transactionOptionsToSQL(opts *database.TransactionOptions) *sql.TxOptions { + if opts == nil { + return nil + } + + return &sql.TxOptions{ + Isolation: isolationToSQL(opts.IsolationLevel), + ReadOnly: accessModeToSQL(opts.AccessMode), + } +} + +func isolationToSQL(isolation database.IsolationLevel) sql.IsolationLevel { + switch isolation { + case database.IsolationLevelSerializable: + return sql.LevelSerializable + case database.IsolationLevelReadCommitted: + return sql.LevelReadCommitted + default: + return sql.LevelSerializable + } +} + +func accessModeToSQL(accessMode database.AccessMode) bool { + return accessMode == database.AccessModeReadOnly +}