mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 20:37:30 +00:00
feat(database): implement sql adapter for backwards compatibility
This commit is contained in:
59
backend/v3/storage/database/dialect/sql/conn.go
Normal file
59
backend/v3/storage/database/dialect/sql/conn.go
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
type sqlConn struct {
|
||||||
|
*sql.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ database.Client = (*sqlConn)(nil)
|
||||||
|
|
||||||
|
// Release implements [database.Client].
|
||||||
|
func (c *sqlConn) Release(_ context.Context) error {
|
||||||
|
return c.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Begin implements [database.Client].
|
||||||
|
func (c *sqlConn) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
|
||||||
|
tx, err := c.BeginTx(ctx, transactionOptionsToSQL(opts))
|
||||||
|
if err != nil {
|
||||||
|
return nil, wrapError(err)
|
||||||
|
}
|
||||||
|
return &sqlTx{tx}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query implements sql.Client.
|
||||||
|
// Subtle: this method shadows the method (*Conn).Query of pgxConn.Conn.
|
||||||
|
func (c *sqlConn) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||||
|
rows, err := c.QueryContext(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, wrapError(err)
|
||||||
|
}
|
||||||
|
return &Rows{rows}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryRow implements sql.Client.
|
||||||
|
// Subtle: this method shadows the method (*Conn).QueryRow of pgxConn.Conn.
|
||||||
|
func (c *sqlConn) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||||
|
return &Row{c.QueryRowContext(ctx, sql, args...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec implements [database.Pool].
|
||||||
|
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||||
|
func (c *sqlConn) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
|
||||||
|
res, err := c.ExecContext(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return 0, wrapError(err)
|
||||||
|
}
|
||||||
|
return res.RowsAffected()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migrate implements [database.Migrator].
|
||||||
|
func (c *sqlConn) Migrate(ctx context.Context) error {
|
||||||
|
return ErrMigrate
|
||||||
|
}
|
3
backend/v3/storage/database/dialect/sql/doc.go
Normal file
3
backend/v3/storage/database/dialect/sql/doc.go
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
// [database/sql] implementation of the interfaces defined in the database package.
|
||||||
|
// This package is used to migrate from event driven to relational Zitadel.
|
||||||
|
package sql
|
41
backend/v3/storage/database/dialect/sql/error.go
Normal file
41
backend/v3/storage/database/dialect/sql/error.go
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrMigrate = errors.New("sql does not support migrations, use a different dialect")
|
||||||
|
|
||||||
|
func wrapError(err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return database.NewNoRowFoundError(err)
|
||||||
|
}
|
||||||
|
var pgxErr *pgconn.PgError
|
||||||
|
if !errors.As(err, &pgxErr) {
|
||||||
|
return database.NewUnknownError(err)
|
||||||
|
}
|
||||||
|
switch pgxErr.Code {
|
||||||
|
// 23514: check_violation - A value violates a CHECK constraint.
|
||||||
|
case "23514":
|
||||||
|
return database.NewCheckError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr)
|
||||||
|
// 23505: unique_violation - A value violates a UNIQUE constraint.
|
||||||
|
case "23505":
|
||||||
|
return database.NewUniqueError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr)
|
||||||
|
// 23503: foreign_key_violation - A value violates a foreign key constraint.
|
||||||
|
case "23503":
|
||||||
|
return database.NewForeignKeyError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr)
|
||||||
|
// 23502: not_null_violation - A value violates a NOT NULL constraint.
|
||||||
|
case "23502":
|
||||||
|
return database.NewNotNullError(pgxErr.TableName, pgxErr.ConstraintName, pgxErr)
|
||||||
|
}
|
||||||
|
return database.NewUnknownError(err)
|
||||||
|
}
|
74
backend/v3/storage/database/dialect/sql/pool.go
Normal file
74
backend/v3/storage/database/dialect/sql/pool.go
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
type sqlPool struct {
|
||||||
|
*sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ database.Pool = (*sqlPool)(nil)
|
||||||
|
|
||||||
|
func SQLPool(db *sql.DB) *sqlPool {
|
||||||
|
return &sqlPool{
|
||||||
|
DB: db,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Acquire implements [database.Pool].
|
||||||
|
func (c *sqlPool) Acquire(ctx context.Context) (database.Client, error) {
|
||||||
|
conn, err := c.Conn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, wrapError(err)
|
||||||
|
}
|
||||||
|
return &sqlConn{Conn: conn}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query implements [database.Pool].
|
||||||
|
// Subtle: this method shadows the method (Pool).Query of pgxPool.Pool.
|
||||||
|
func (c *sqlPool) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||||
|
rows, err := c.QueryContext(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, wrapError(err)
|
||||||
|
}
|
||||||
|
return &Rows{rows}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryRow implements [database.Pool].
|
||||||
|
// Subtle: this method shadows the method (Pool).QueryRow of pgxPool.Pool.
|
||||||
|
func (c *sqlPool) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||||
|
return &Row{c.QueryRowContext(ctx, sql, args...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec implements [database.Pool].
|
||||||
|
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||||
|
func (c *sqlPool) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
|
||||||
|
res, err := c.ExecContext(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return 0, wrapError(err)
|
||||||
|
}
|
||||||
|
return res.RowsAffected()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Begin implements [database.Pool].
|
||||||
|
func (c *sqlPool) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
|
||||||
|
tx, err := c.BeginTx(ctx, transactionOptionsToSQL(opts))
|
||||||
|
if err != nil {
|
||||||
|
return nil, wrapError(err)
|
||||||
|
}
|
||||||
|
return &sqlTx{tx}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements [database.Pool].
|
||||||
|
func (c *sqlPool) Close(_ context.Context) error {
|
||||||
|
return c.DB.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migrate implements [database.Migrator].
|
||||||
|
func (c *sqlPool) Migrate(ctx context.Context) error {
|
||||||
|
return ErrMigrate
|
||||||
|
}
|
79
backend/v3/storage/database/dialect/sql/rows.go
Normal file
79
backend/v3/storage/database/dialect/sql/rows.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
pgxscan "github.com/georgysavva/scany/v2/dbscan"
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ database.Rows = (*Rows)(nil)
|
||||||
|
_ database.CollectableRows = (*Rows)(nil)
|
||||||
|
_ database.Row = (*Row)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
type Row struct{ pgx.Row }
|
||||||
|
|
||||||
|
// Scan implements [database.Row].
|
||||||
|
// Subtle: this method shadows the method ([pgx.Row]).Scan of Row.Row.
|
||||||
|
func (r *Row) Scan(dest ...any) error {
|
||||||
|
return wrapError(r.Row.Scan(dest...))
|
||||||
|
}
|
||||||
|
|
||||||
|
type Rows struct{ *sql.Rows }
|
||||||
|
|
||||||
|
// Err implements [database.Rows].
|
||||||
|
// Subtle: this method shadows the method ([pgx.Rows]).Err of Rows.Rows.
|
||||||
|
func (r *Rows) Err() error {
|
||||||
|
return wrapError(r.Rows.Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Rows) Scan(dest ...any) error {
|
||||||
|
return wrapError(r.Rows.Scan(dest...))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect implements [database.CollectableRows].
|
||||||
|
// See [this page](https://github.com/georgysavva/scany/blob/master/dbscan/doc.go#L8) for additional details.
|
||||||
|
func (r *Rows) Collect(dest any) (err error) {
|
||||||
|
defer func() {
|
||||||
|
closeErr := r.Close()
|
||||||
|
if err == nil {
|
||||||
|
err = closeErr
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return wrapError(pgxscan.ScanAll(dest, r.Rows))
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollectFirst implements [database.CollectableRows].
|
||||||
|
// See [this page](https://github.com/georgysavva/scany/blob/master/dbscan/doc.go#L8) for additional details.
|
||||||
|
func (r *Rows) CollectFirst(dest any) (err error) {
|
||||||
|
defer func() {
|
||||||
|
closeErr := r.Close()
|
||||||
|
if err == nil {
|
||||||
|
err = closeErr
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return wrapError(pgxscan.ScanRow(dest, r.Rows))
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollectExactlyOneRow implements [database.CollectableRows].
|
||||||
|
// See [this page](https://github.com/georgysavva/scany/blob/master/dbscan/doc.go#L8) for additional details.
|
||||||
|
func (r *Rows) CollectExactlyOneRow(dest any) (err error) {
|
||||||
|
defer func() {
|
||||||
|
closeErr := r.Close()
|
||||||
|
if err == nil {
|
||||||
|
err = closeErr
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return wrapError(pgxscan.ScanOne(dest, r.Rows))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements [database.Rows].
|
||||||
|
// Subtle: this method shadows the method (Rows).Close of Rows.Rows.
|
||||||
|
func (r *Rows) Close() error {
|
||||||
|
r.Rows.Close()
|
||||||
|
return nil
|
||||||
|
}
|
69
backend/v3/storage/database/dialect/sql/savepoint.go
Normal file
69
backend/v3/storage/database/dialect/sql/savepoint.go
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ database.Transaction = (*sqlSavepoint)(nil)
|
||||||
|
|
||||||
|
const (
|
||||||
|
savepointName = "zitadel_savepoint"
|
||||||
|
createSavepoint = "SAVEPOINT " + savepointName
|
||||||
|
rollbackToSavepoint = "ROLLBACK TO SAVEPOINT " + savepointName
|
||||||
|
commitSavepoint = "RELEASE SAVEPOINT " + savepointName
|
||||||
|
)
|
||||||
|
|
||||||
|
type sqlSavepoint struct {
|
||||||
|
parent database.Transaction
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit implements [database.Transaction].
|
||||||
|
func (s *sqlSavepoint) Commit(ctx context.Context) error {
|
||||||
|
_, err := s.parent.Exec(ctx, commitSavepoint)
|
||||||
|
return wrapError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rollback implements [database.Transaction].
|
||||||
|
func (s *sqlSavepoint) Rollback(ctx context.Context) error {
|
||||||
|
_, err := s.parent.Exec(ctx, rollbackToSavepoint)
|
||||||
|
return wrapError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// End implements [database.Transaction].
|
||||||
|
func (s *sqlSavepoint) End(ctx context.Context, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
rollbackErr := s.Rollback(ctx)
|
||||||
|
if rollbackErr != nil {
|
||||||
|
err = errors.Join(err, rollbackErr)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.Commit(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query implements [database.Transaction].
|
||||||
|
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
|
||||||
|
func (s *sqlSavepoint) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||||
|
return s.parent.Query(ctx, sql, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryRow implements [database.Transaction].
|
||||||
|
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
|
||||||
|
func (s *sqlSavepoint) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||||
|
return s.parent.QueryRow(ctx, sql, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec implements [database.Transaction].
|
||||||
|
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||||
|
func (s *sqlSavepoint) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
|
||||||
|
return s.parent.Exec(ctx, sql, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Begin implements [database.Transaction].
|
||||||
|
// As postgres does not support nested transactions we use savepoints to emulate them.
|
||||||
|
func (s *sqlSavepoint) Begin(ctx context.Context) (database.Transaction, error) {
|
||||||
|
return s.parent.Begin(ctx)
|
||||||
|
}
|
103
backend/v3/storage/database/dialect/sql/tx.go
Normal file
103
backend/v3/storage/database/dialect/sql/tx.go
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
type sqlTx struct{ *sql.Tx }
|
||||||
|
|
||||||
|
var _ database.Transaction = (*sqlTx)(nil)
|
||||||
|
|
||||||
|
func SQLTx(tx *sql.Tx) *sqlTx {
|
||||||
|
return &sqlTx{
|
||||||
|
Tx: tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit implements [database.Transaction].
|
||||||
|
func (tx *sqlTx) Commit(ctx context.Context) error {
|
||||||
|
return wrapError(tx.Tx.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rollback implements [database.Transaction].
|
||||||
|
func (tx *sqlTx) Rollback(ctx context.Context) error {
|
||||||
|
return wrapError(tx.Tx.Rollback())
|
||||||
|
}
|
||||||
|
|
||||||
|
// End implements [database.Transaction].
|
||||||
|
func (tx *sqlTx) End(ctx context.Context, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
rollbackErr := tx.Rollback(ctx)
|
||||||
|
if rollbackErr != nil {
|
||||||
|
err = errors.Join(err, rollbackErr)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tx.Commit(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query implements [database.Transaction].
|
||||||
|
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
|
||||||
|
func (tx *sqlTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||||
|
rows, err := tx.QueryContext(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, wrapError(err)
|
||||||
|
}
|
||||||
|
return &Rows{rows}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryRow implements [database.Transaction].
|
||||||
|
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
|
||||||
|
func (tx *sqlTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||||
|
return &Row{tx.QueryRowContext(ctx, sql, args...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec implements [database.Transaction].
|
||||||
|
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||||
|
func (tx *sqlTx) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
|
||||||
|
res, err := tx.ExecContext(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return 0, wrapError(err)
|
||||||
|
}
|
||||||
|
return res.RowsAffected()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Begin implements [database.Transaction].
|
||||||
|
// As postgres does not support nested transactions we use savepoints to emulate them.
|
||||||
|
func (tx *sqlTx) Begin(ctx context.Context) (database.Transaction, error) {
|
||||||
|
_, err := tx.ExecContext(ctx, createSavepoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, wrapError(err)
|
||||||
|
}
|
||||||
|
return &sqlSavepoint{tx}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func transactionOptionsToSQL(opts *database.TransactionOptions) *sql.TxOptions {
|
||||||
|
if opts == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &sql.TxOptions{
|
||||||
|
Isolation: isolationToSQL(opts.IsolationLevel),
|
||||||
|
ReadOnly: accessModeToSQL(opts.AccessMode),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isolationToSQL(isolation database.IsolationLevel) sql.IsolationLevel {
|
||||||
|
switch isolation {
|
||||||
|
case database.IsolationLevelSerializable:
|
||||||
|
return sql.LevelSerializable
|
||||||
|
case database.IsolationLevelReadCommitted:
|
||||||
|
return sql.LevelReadCommitted
|
||||||
|
default:
|
||||||
|
return sql.LevelSerializable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func accessModeToSQL(accessMode database.AccessMode) bool {
|
||||||
|
return accessMode == database.AccessModeReadOnly
|
||||||
|
}
|
Reference in New Issue
Block a user