This commit is contained in:
adlerhurst
2025-02-17 07:55:00 +01:00
parent b85460152c
commit 77ab8226a5
14 changed files with 676 additions and 0 deletions

View File

@@ -0,0 +1,56 @@
package configure
import (
"fmt"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/zitadel/backend/storage/database/dialect"
)
var (
// ConfigureCmd represents the config command
ConfigureCmd = &cobra.Command{
Use: "configure",
Short: "Guides you through configuring Zitadel",
// Long: `A longer description that spans multiple lines and likely contains examples
// and usage of using your command. For example:
// Cobra is a CLI library for Go that empowers applications.
// This application is a tool to generate the needed files
// to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("config called")
fmt.Println(viper.AllSettings())
fmt.Println(viper.Sub("database").AllSettings())
pool, err := config.Database.Connect(cmd.Context())
_, _ = pool, err
},
PreRun: ReadConfigPreRun[Config](viper.GetViper(), &config),
}
config Config
)
func init() {
// Here you will define your flags and configuration settings.
ConfigureCmd.Flags().BoolVarP(&config.upgrade, "upgrade", "u", false, "Only changed configuration values since the previously used version will be asked for")
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// configureCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
}
type Config struct {
Database dialect.Config
upgrade bool
}
func (c Config) Hooks() []viper.DecoderConfigOption {
return c.Database.Hooks()
}

View File

@@ -0,0 +1,26 @@
package configure
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
type Unmarshaller interface {
Hooks() []viper.DecoderConfigOption
}
func ReadConfigPreRun[C Unmarshaller](v *viper.Viper, config *C) func(cmd *cobra.Command, args []string) {
return func(cmd *cobra.Command, args []string) {
if err := v.Unmarshal(config, (*config).Hooks()...); err != nil {
panic(err)
}
}
}
func ReadConfig[C Unmarshaller](v *viper.Viper) (*C, error) {
var config C
if err := v.Unmarshal(&config, config.Hooks()...); err != nil {
return nil, err
}
return &config, nil
}

View File

@@ -0,0 +1,23 @@
package prepare
import (
"context"
"github.com/zitadel/zitadel/backend/storage/database"
"github.com/zitadel/zitadel/backend/storage/eventstore"
)
type Step001 struct {
Database database.Pool
}
func (v *Step001) Migrate(ctx context.Context) error {
conn, err := v.Database.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release(ctx)
eventstore.New(conn).
return nil
}

View File

@@ -0,0 +1,15 @@
package start
import (
"github.com/spf13/viper"
"github.com/zitadel/zitadel/backend/storage/database/dialect"
)
type Config struct {
Database dialect.Config
}
func (c Config) Hooks() []viper.DecoderConfigOption {
return c.Database.Hooks()
}

View File

@@ -0,0 +1,87 @@
package dialect
import (
"context"
"errors"
"reflect"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"github.com/zitadel/zitadel/backend/storage/database"
"github.com/zitadel/zitadel/backend/storage/database/dialect/gosql"
"github.com/zitadel/zitadel/backend/storage/database/dialect/postgres"
)
type Hook struct {
Match func(string) bool
Decode func(name string, config any) (database.Connector, error)
}
var hooks = make([]Hook, 0)
func init() {
hooks = append(hooks,
Hook{
Match: postgres.NameMatcher,
Decode: postgres.DecodeConfig,
},
Hook{
Match: gosql.NameMatcher,
Decode: gosql.DecodeConfig,
},
)
}
type Config struct {
Dialects map[string]any `mapstructure:",remain"`
connector database.Connector
}
// Hooks implements [configure.Unmarshaller].
func (c Config) Hooks() []viper.DecoderConfigOption {
return []viper.DecoderConfigOption{
viper.DecodeHook(decodeHook),
}
}
func (c Config) Connect(ctx context.Context) (database.Pool, error) {
return c.connector.Connect(ctx)
}
func (c *Config) decodeDialect() error {
for _, hook := range hooks {
for name, config := range c.Dialects {
if !hook.Match(name) {
continue
}
connector, err := hook.Decode(name, config)
if err != nil {
return err
}
c.connector = connector
return nil
}
}
return errors.New("no dialect found")
}
func decodeHook(from, to reflect.Value) (_ interface{}, err error) {
if to.Type() != reflect.TypeOf(Config{}) {
return from.Interface(), nil
}
config := new(Config)
if err = mapstructure.Decode(from.Interface(), config); err != nil {
return nil, err
}
if err = config.decodeDialect(); err != nil {
return nil, err
}
return config, nil
}

View File

@@ -0,0 +1,48 @@
package gosql
import (
"context"
"database/sql"
"errors"
"strings"
"github.com/zitadel/zitadel/backend/storage/database"
)
var _ database.Connector = (*Config)(nil)
type Config struct {
db *sql.DB
}
// Connect implements [database.Connector].
func (c *Config) Connect(ctx context.Context) (database.Pool, error) {
if err := c.db.PingContext(ctx); err != nil {
return nil, err
}
return &sqlPool{c.db}, nil
}
func NameMatcher(name string) bool {
name = strings.ToLower(name)
for _, driver := range sql.Drivers() {
if driver == name {
return true
}
}
return false
}
func DecodeConfig(name string, config any) (database.Connector, error) {
switch c := config.(type) {
case string:
db, err := sql.Open(name, c)
if err != nil {
return nil, err
}
return &Config{db}, nil
case map[string]any:
return nil, errors.New("map configuration not implemented")
}
return nil, errors.New("invalid configuration")
}

View File

@@ -0,0 +1,45 @@
package gosql
import (
"context"
"database/sql"
"github.com/zitadel/zitadel/backend/storage/database"
)
type sqlConn struct{ *sql.Conn }
var _ database.Client = (*sqlConn)(nil)
// Release implements [database.Client].
func (c *sqlConn) Release(_ context.Context) error {
return c.Conn.Close()
}
// Begin implements [database.Client].
func (c *sqlConn) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
tx, err := c.Conn.BeginTx(ctx, transactionOptionsToSql(opts))
if err != nil {
return nil, err
}
return &sqlTx{tx}, nil
}
// Query implements sql.Client.
// Subtle: this method shadows the method (*Conn).Query of pgxConn.Conn.
func (c *sqlConn) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
return c.Conn.QueryContext(ctx, sql, args...)
}
// QueryRow implements sql.Client.
// Subtle: this method shadows the method (*Conn).QueryRow of pgxConn.Conn.
func (c *sqlConn) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return c.Conn.QueryRowContext(ctx, sql, args...)
}
// Exec implements [database.Pool].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (c *sqlConn) Exec(ctx context.Context, sql string, args ...any) error {
_, err := c.Conn.ExecContext(ctx, sql, args...)
return err
}

View File

@@ -0,0 +1,54 @@
package gosql
import (
"context"
"database/sql"
"github.com/zitadel/zitadel/backend/storage/database"
)
type sqlPool struct{ *sql.DB }
var _ database.Pool = (*sqlPool)(nil)
// Acquire implements [database.Pool].
func (c *sqlPool) Acquire(ctx context.Context) (database.Client, error) {
conn, err := c.DB.Conn(ctx)
if err != nil {
return nil, err
}
return &sqlConn{conn}, nil
}
// Query implements [database.Pool].
// Subtle: this method shadows the method (Pool).Query of pgxPool.Pool.
func (c *sqlPool) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
return c.DB.QueryContext(ctx, sql, args...)
}
// QueryRow implements [database.Pool].
// Subtle: this method shadows the method (Pool).QueryRow of pgxPool.Pool.
func (c *sqlPool) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return c.DB.QueryRowContext(ctx, sql, args...)
}
// Exec implements [database.Pool].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (c *sqlPool) Exec(ctx context.Context, sql string, args ...any) error {
_, err := c.DB.ExecContext(ctx, sql, args...)
return err
}
// Begin implements [database.Pool].
func (c *sqlPool) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
tx, err := c.DB.BeginTx(ctx, transactionOptionsToSql(opts))
if err != nil {
return nil, err
}
return &sqlTx{tx}, nil
}
// Close implements [database.Pool].
func (c *sqlPool) Close(_ context.Context) error {
return c.DB.Close()
}

View File

@@ -0,0 +1,72 @@
package gosql
import (
"context"
"database/sql"
"github.com/zitadel/zitadel/backend/storage/database"
)
type sqlTx struct{ *sql.Tx }
var _ database.Transaction = (*sqlTx)(nil)
// Commit implements [database.Transaction].
func (tx *sqlTx) Commit(_ context.Context) error {
return tx.Tx.Commit()
}
// Rollback implements [database.Transaction].
func (tx *sqlTx) Rollback(_ context.Context) error {
return tx.Tx.Rollback()
}
// End implements [database.Transaction].
func (tx *sqlTx) End(ctx context.Context, err error) error {
if err != nil {
tx.Rollback(ctx)
return err
}
return tx.Commit(ctx)
}
// Query implements [database.Transaction].
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
func (tx *sqlTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
return tx.Tx.QueryContext(ctx, sql, args...)
}
// QueryRow implements [database.Transaction].
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
func (tx *sqlTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return tx.Tx.QueryRowContext(ctx, sql, args...)
}
// Exec implements [database.Pool].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (tx *sqlTx) Exec(ctx context.Context, sql string, args ...any) error {
_, err := tx.Tx.ExecContext(ctx, sql, args...)
return err
}
func transactionOptionsToSql(opts *database.TransactionOptions) *sql.TxOptions {
if opts == nil {
return nil
}
return &sql.TxOptions{
Isolation: isolationToSql(opts.IsolationLevel),
ReadOnly: opts.AccessMode == database.AccessModeReadOnly,
}
}
func isolationToSql(isolation database.IsolationLevel) sql.IsolationLevel {
switch isolation {
case database.IsolationLevelSerializable:
return sql.LevelSerializable
case database.IsolationLevelReadCommitted:
return sql.LevelReadCommitted
default:
return sql.LevelSerializable
}
}

View File

@@ -0,0 +1,46 @@
package postgres
import (
"context"
"errors"
"slices"
"strings"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/zitadel/zitadel/backend/storage/database"
)
var _ database.Connector = (*Config)(nil)
type Config struct{ *pgxpool.Config }
// Connect implements [database.Connector].
func (c *Config) Connect(ctx context.Context) (database.Pool, error) {
pool, err := pgxpool.NewWithConfig(ctx, c.Config)
if err != nil {
return nil, err
}
if err = pool.Ping(ctx); err != nil {
return nil, err
}
return &pgxPool{pool}, nil
}
func NameMatcher(name string) bool {
return slices.Contains([]string{"postgres", "pg"}, strings.ToLower(name))
}
func DecodeConfig(_ string, config any) (database.Connector, error) {
switch c := config.(type) {
case string:
config, err := pgxpool.ParseConfig(c)
if err != nil {
return nil, err
}
return &Config{config}, nil
case map[string]any:
return nil, errors.New("map configuration not implemented")
}
return nil, errors.New("invalid configuration")
}

View File

@@ -0,0 +1,47 @@
package postgres
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/zitadel/zitadel/backend/storage/database"
)
type pgxConn struct{ *pgxpool.Conn }
var _ database.Client = (*pgxConn)(nil)
// Release implements [database.Client].
func (c *pgxConn) Release(_ context.Context) error {
c.Conn.Release()
return nil
}
// Begin implements [database.Client].
func (c *pgxConn) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
tx, err := c.Conn.BeginTx(ctx, transactionOptionsToPgx(opts))
if err != nil {
return nil, err
}
return &pgxTx{tx}, nil
}
// Query implements sql.Client.
// Subtle: this method shadows the method (*Conn).Query of pgxConn.Conn.
func (c *pgxConn) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
rows, err := c.Conn.Query(ctx, sql, args...)
return &Rows{rows}, err
}
// QueryRow implements sql.Client.
// Subtle: this method shadows the method (*Conn).QueryRow of pgxConn.Conn.
func (c *pgxConn) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return c.Conn.QueryRow(ctx, sql, args...)
}
// Exec implements [database.Pool].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (c *pgxConn) Exec(ctx context.Context, sql string, args ...any) error {
_, err := c.Conn.Exec(ctx, sql, args...)
return err
}

View File

@@ -0,0 +1,56 @@
package postgres
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/zitadel/zitadel/backend/storage/database"
)
type pgxPool struct{ *pgxpool.Pool }
var _ database.Pool = (*pgxPool)(nil)
// Acquire implements [database.Pool].
func (c *pgxPool) Acquire(ctx context.Context) (database.Client, error) {
conn, err := c.Pool.Acquire(ctx)
if err != nil {
return nil, err
}
return &pgxConn{conn}, nil
}
// Query implements [database.Pool].
// Subtle: this method shadows the method (Pool).Query of pgxPool.Pool.
func (c *pgxPool) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
rows, err := c.Pool.Query(ctx, sql, args...)
return &Rows{rows}, err
}
// QueryRow implements [database.Pool].
// Subtle: this method shadows the method (Pool).QueryRow of pgxPool.Pool.
func (c *pgxPool) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return c.Pool.QueryRow(ctx, sql, args...)
}
// Exec implements [database.Pool].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (c *pgxPool) Exec(ctx context.Context, sql string, args ...any) error {
_, err := c.Pool.Exec(ctx, sql, args...)
return err
}
// Begin implements [database.Pool].
func (c *pgxPool) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
tx, err := c.Pool.BeginTx(ctx, transactionOptionsToPgx(opts))
if err != nil {
return nil, err
}
return &pgxTx{tx}, nil
}
// Close implements [database.Pool].
func (c *pgxPool) Close(_ context.Context) error {
c.Pool.Close()
return nil
}

View File

@@ -0,0 +1,17 @@
package postgres
import (
"github.com/jackc/pgx/v5"
"github.com/zitadel/zitadel/backend/storage/database"
)
var _ database.Rows = (*Rows)(nil)
type Rows struct{ pgx.Rows }
// Close implements [database.Rows].
// Subtle: this method shadows the method (Rows).Close of Rows.Rows.
func (r *Rows) Close() error {
r.Rows.Close()
return nil
}

View File

@@ -0,0 +1,84 @@
package postgres
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/zitadel/zitadel/backend/storage/database"
)
type pgxTx struct{ pgx.Tx }
var _ database.Transaction = (*pgxTx)(nil)
// Commit implements [database.Transaction].
func (tx *pgxTx) Commit(ctx context.Context) error {
return tx.Tx.Commit(ctx)
}
// Rollback implements [database.Transaction].
func (tx *pgxTx) Rollback(ctx context.Context) error {
return tx.Tx.Rollback(ctx)
}
// End implements [database.Transaction].
func (tx *pgxTx) End(ctx context.Context, err error) error {
if err != nil {
tx.Rollback(ctx)
return err
}
return tx.Commit(ctx)
}
// Query implements [database.Transaction].
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
func (tx *pgxTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
rows, err := tx.Tx.Query(ctx, sql, args...)
return &Rows{rows}, err
}
// QueryRow implements [database.Transaction].
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
func (tx *pgxTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return tx.Tx.QueryRow(ctx, sql, args...)
}
// Exec implements [database.Pool].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) error {
_, err := tx.Tx.Exec(ctx, sql, args...)
return err
}
func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions {
if opts == nil {
return pgx.TxOptions{}
}
return pgx.TxOptions{
IsoLevel: isolationToPgx(opts.IsolationLevel),
AccessMode: accessModeToPgx(opts.AccessMode),
}
}
func isolationToPgx(isolation database.IsolationLevel) pgx.TxIsoLevel {
switch isolation {
case database.IsolationLevelSerializable:
return pgx.Serializable
case database.IsolationLevelReadCommitted:
return pgx.ReadCommitted
default:
return pgx.Serializable
}
}
func accessModeToPgx(accessMode database.AccessMode) pgx.TxAccessMode {
switch accessMode {
case database.AccessModeReadWrite:
return pgx.ReadWrite
case database.AccessModeReadOnly:
return pgx.ReadOnly
default:
return pgx.ReadWrite
}
}