mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 07:47:32 +00:00
multiple tries
This commit is contained in:
92
backend/v3/storage/database/dialect/config.go
Normal file
92
backend/v3/storage/database/dialect/config.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package dialect
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
"github.com/zitadel/zitadel/backend/storage/database/dialect/postgres"
|
||||
)
|
||||
|
||||
type Hook struct {
|
||||
Match func(string) bool
|
||||
Decode func(config any) (database.Connector, error)
|
||||
Name string
|
||||
Constructor func() database.Connector
|
||||
}
|
||||
|
||||
var hooks = []Hook{
|
||||
{
|
||||
Match: postgres.NameMatcher,
|
||||
Decode: postgres.DecodeConfig,
|
||||
Name: postgres.Name,
|
||||
Constructor: func() database.Connector { return new(postgres.Config) },
|
||||
},
|
||||
// {
|
||||
// Match: gosql.NameMatcher,
|
||||
// Decode: gosql.DecodeConfig,
|
||||
// Name: gosql.Name,
|
||||
// Constructor: func() database.Connector { return new(gosql.Config) },
|
||||
// },
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Dialects map[string]any `mapstructure:",remain" yaml:",inline"`
|
||||
|
||||
connector database.Connector
|
||||
}
|
||||
|
||||
func (c Config) Connect(ctx context.Context) (database.Pool, error) {
|
||||
if len(c.Dialects) != 1 {
|
||||
return nil, errors.New("Exactly one dialect must be configured")
|
||||
}
|
||||
|
||||
return c.connector.Connect(ctx)
|
||||
}
|
||||
|
||||
// Hooks implements [configure.Unmarshaller].
|
||||
func (c Config) Hooks() []viper.DecoderConfigOption {
|
||||
return []viper.DecoderConfigOption{
|
||||
viper.DecodeHook(decodeHook),
|
||||
}
|
||||
}
|
||||
|
||||
func decodeHook(from, to reflect.Value) (_ any, err error) {
|
||||
if to.Type() != reflect.TypeOf(Config{}) {
|
||||
return from.Interface(), nil
|
||||
}
|
||||
|
||||
config := new(Config)
|
||||
if err = mapstructure.Decode(from.Interface(), config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = config.decodeDialect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (c *Config) decodeDialect() error {
|
||||
for _, hook := range hooks {
|
||||
for name, config := range c.Dialects {
|
||||
if !hook.Match(name) {
|
||||
continue
|
||||
}
|
||||
|
||||
connector, err := hook.Decode(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.connector = connector
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.New("no dialect found")
|
||||
}
|
80
backend/v3/storage/database/dialect/postgres/config.go
Normal file
80
backend/v3/storage/database/dialect/postgres/config.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||
)
|
||||
|
||||
var (
|
||||
_ database.Connector = (*Config)(nil)
|
||||
Name = "postgres"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
*pgxpool.Config
|
||||
|
||||
// Host string
|
||||
// Port int32
|
||||
// Database string
|
||||
// MaxOpenConns uint32
|
||||
// MaxIdleConns uint32
|
||||
// MaxConnLifetime time.Duration
|
||||
// MaxConnIdleTime time.Duration
|
||||
// User User
|
||||
// // Additional options to be appended as options=<Options>
|
||||
// // The value will be taken as is. Multiple options are space separated.
|
||||
// Options string
|
||||
|
||||
configuredFields []string
|
||||
}
|
||||
|
||||
// Connect implements [database.Connector].
|
||||
func (c *Config) Connect(ctx context.Context) (database.Pool, error) {
|
||||
pool, err := pgxpool.NewWithConfig(ctx, c.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = pool.Ping(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxPool{pool}, nil
|
||||
}
|
||||
|
||||
func NameMatcher(name string) bool {
|
||||
return slices.Contains([]string{"postgres", "pg"}, strings.ToLower(name))
|
||||
}
|
||||
|
||||
func DecodeConfig(input any) (database.Connector, error) {
|
||||
switch c := input.(type) {
|
||||
case string:
|
||||
config, err := pgxpool.ParseConfig(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Config{Config: config}, nil
|
||||
case map[string]any:
|
||||
connector := new(Config)
|
||||
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
|
||||
WeaklyTypedInput: true,
|
||||
Result: connector,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = decoder.Decode(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Config{
|
||||
Config: &pgxpool.Config{},
|
||||
}, nil
|
||||
}
|
||||
return nil, errors.New("invalid configuration")
|
||||
}
|
48
backend/v3/storage/database/dialect/postgres/conn.go
Normal file
48
backend/v3/storage/database/dialect/postgres/conn.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||
)
|
||||
|
||||
type pgxConn struct{ *pgxpool.Conn }
|
||||
|
||||
var _ database.Client = (*pgxConn)(nil)
|
||||
|
||||
// Release implements [database.Client].
|
||||
func (c *pgxConn) Release(_ context.Context) error {
|
||||
c.Conn.Release()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Begin implements [database.Client].
|
||||
func (c *pgxConn) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
|
||||
tx, err := c.Conn.BeginTx(ctx, transactionOptionsToPgx(opts))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxTx{tx}, nil
|
||||
}
|
||||
|
||||
// Query implements sql.Client.
|
||||
// Subtle: this method shadows the method (*Conn).Query of pgxConn.Conn.
|
||||
func (c *pgxConn) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||
rows, err := c.Conn.Query(ctx, sql, args...)
|
||||
return &Rows{rows}, err
|
||||
}
|
||||
|
||||
// QueryRow implements sql.Client.
|
||||
// Subtle: this method shadows the method (*Conn).QueryRow of pgxConn.Conn.
|
||||
func (c *pgxConn) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||
return c.Conn.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// Exec implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||
func (c *pgxConn) Exec(ctx context.Context, sql string, args ...any) error {
|
||||
_, err := c.Conn.Exec(ctx, sql, args...)
|
||||
return err
|
||||
}
|
57
backend/v3/storage/database/dialect/postgres/pool.go
Normal file
57
backend/v3/storage/database/dialect/postgres/pool.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||
)
|
||||
|
||||
type pgxPool struct{ *pgxpool.Pool }
|
||||
|
||||
var _ database.Pool = (*pgxPool)(nil)
|
||||
|
||||
// Acquire implements [database.Pool].
|
||||
func (c *pgxPool) Acquire(ctx context.Context) (database.Client, error) {
|
||||
conn, err := c.Pool.Acquire(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxConn{conn}, nil
|
||||
}
|
||||
|
||||
// Query implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).Query of pgxPool.Pool.
|
||||
func (c *pgxPool) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||
rows, err := c.Pool.Query(ctx, sql, args...)
|
||||
return &Rows{rows}, err
|
||||
}
|
||||
|
||||
// QueryRow implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).QueryRow of pgxPool.Pool.
|
||||
func (c *pgxPool) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||
return c.Pool.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// Exec implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||
func (c *pgxPool) Exec(ctx context.Context, sql string, args ...any) error {
|
||||
_, err := c.Pool.Exec(ctx, sql, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
// Begin implements [database.Pool].
|
||||
func (c *pgxPool) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
|
||||
tx, err := c.Pool.BeginTx(ctx, transactionOptionsToPgx(opts))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxTx{tx}, nil
|
||||
}
|
||||
|
||||
// Close implements [database.Pool].
|
||||
func (c *pgxPool) Close(_ context.Context) error {
|
||||
c.Pool.Close()
|
||||
return nil
|
||||
}
|
18
backend/v3/storage/database/dialect/postgres/rows.go
Normal file
18
backend/v3/storage/database/dialect/postgres/rows.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||
)
|
||||
|
||||
var _ database.Rows = (*Rows)(nil)
|
||||
|
||||
type Rows struct{ pgx.Rows }
|
||||
|
||||
// Close implements [database.Rows].
|
||||
// Subtle: this method shadows the method (Rows).Close of Rows.Rows.
|
||||
func (r *Rows) Close() error {
|
||||
r.Rows.Close()
|
||||
return nil
|
||||
}
|
95
backend/v3/storage/database/dialect/postgres/tx.go
Normal file
95
backend/v3/storage/database/dialect/postgres/tx.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/v3/storage/database"
|
||||
)
|
||||
|
||||
type pgxTx struct{ pgx.Tx }
|
||||
|
||||
var _ database.Transaction = (*pgxTx)(nil)
|
||||
|
||||
// Commit implements [database.Transaction].
|
||||
func (tx *pgxTx) Commit(ctx context.Context) error {
|
||||
return tx.Tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// Rollback implements [database.Transaction].
|
||||
func (tx *pgxTx) Rollback(ctx context.Context) error {
|
||||
return tx.Tx.Rollback(ctx)
|
||||
}
|
||||
|
||||
// End implements [database.Transaction].
|
||||
func (tx *pgxTx) End(ctx context.Context, err error) error {
|
||||
if err != nil {
|
||||
tx.Rollback(ctx)
|
||||
return err
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// Query implements [database.Transaction].
|
||||
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
|
||||
func (tx *pgxTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||
rows, err := tx.Tx.Query(ctx, sql, args...)
|
||||
return &Rows{rows}, err
|
||||
}
|
||||
|
||||
// QueryRow implements [database.Transaction].
|
||||
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
|
||||
func (tx *pgxTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||
return tx.Tx.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// Exec implements [database.Transaction].
|
||||
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||
func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) error {
|
||||
_, err := tx.Tx.Exec(ctx, sql, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
// Begin implements [database.Transaction].
|
||||
// As postgres does not support nested transactions we use savepoints to emulate them.
|
||||
func (tx *pgxTx) Begin(ctx context.Context) (database.Transaction, error) {
|
||||
savepoint, err := tx.Tx.Begin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxTx{savepoint}, nil
|
||||
}
|
||||
|
||||
func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions {
|
||||
if opts == nil {
|
||||
return pgx.TxOptions{}
|
||||
}
|
||||
|
||||
return pgx.TxOptions{
|
||||
IsoLevel: isolationToPgx(opts.IsolationLevel),
|
||||
AccessMode: accessModeToPgx(opts.AccessMode),
|
||||
}
|
||||
}
|
||||
|
||||
func isolationToPgx(isolation database.IsolationLevel) pgx.TxIsoLevel {
|
||||
switch isolation {
|
||||
case database.IsolationLevelSerializable:
|
||||
return pgx.Serializable
|
||||
case database.IsolationLevelReadCommitted:
|
||||
return pgx.ReadCommitted
|
||||
default:
|
||||
return pgx.Serializable
|
||||
}
|
||||
}
|
||||
|
||||
func accessModeToPgx(accessMode database.AccessMode) pgx.TxAccessMode {
|
||||
switch accessMode {
|
||||
case database.AccessModeReadWrite:
|
||||
return pgx.ReadWrite
|
||||
case database.AccessModeReadOnly:
|
||||
return pgx.ReadOnly
|
||||
default:
|
||||
return pgx.ReadWrite
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user