mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-13 21:36:03 +00:00
fix(eventstore): use decimal, correct mirror (#9903)
back port #9812, #9878, #9881, #9884 --------- Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
This commit is contained in:
@@ -75,10 +75,16 @@ func (_ *Config) Decode(configs []interface{}) (dialect.Connector, error) {
|
||||
}
|
||||
|
||||
func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, *pgxpool.Pool, error) {
|
||||
dialect.RegisterAfterConnect(func(ctx context.Context, c *pgx.Conn) error {
|
||||
dialect.RegisterAfterConnect(func(ctx context.Context, conn *pgx.Conn) error {
|
||||
// CockroachDB by default does not allow multiple modifications of the same table using ON CONFLICT
|
||||
// This is needed to fill the fields table of the eventstore during eventstore.Push.
|
||||
_, err := c.Exec(ctx, "SET enable_multiple_modifications_of_table = on")
|
||||
|
||||
// This modification is only needed on crdb so we check if the connection is crdb
|
||||
// postgres doesn't have parameter so we check if the parameter is empty
|
||||
if conn.PgConn().ParameterStatus("crdb_version") == "" {
|
||||
return nil
|
||||
}
|
||||
_, err := conn.Exec(ctx, "SET enable_multiple_modifications_of_table = on")
|
||||
return err
|
||||
})
|
||||
connConfig, err := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns, pusherRatio, spoolerRatio, purpose)
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
@@ -47,7 +48,12 @@ func (c *ConnectionConfig) takeRatio(ratio float64) (*ConnectionConfig, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
var afterConnectFuncs []func(ctx context.Context, c *pgx.Conn) error
|
||||
var afterConnectFuncs = []func(ctx context.Context, c *pgx.Conn) error{
|
||||
func(ctx context.Context, c *pgx.Conn) error {
|
||||
pgxdecimal.Register(c.TypeMap())
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func RegisterAfterConnect(f func(ctx context.Context, c *pgx.Conn) error) {
|
||||
afterConnectFuncs = append(afterConnectFuncs, f)
|
||||
|
@@ -169,6 +169,7 @@ func TestNewConnectionConfig(t *testing.T) {
|
||||
want: &ConnectionConfig{
|
||||
MaxOpenConns: 1,
|
||||
MaxIdleConns: 1,
|
||||
AfterConnect: afterConnectFuncs,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -183,6 +184,7 @@ func TestNewConnectionConfig(t *testing.T) {
|
||||
want: &ConnectionConfig{
|
||||
MaxOpenConns: 1,
|
||||
MaxIdleConns: 1,
|
||||
AfterConnect: afterConnectFuncs,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -197,6 +199,7 @@ func TestNewConnectionConfig(t *testing.T) {
|
||||
want: &ConnectionConfig{
|
||||
MaxOpenConns: 1,
|
||||
MaxIdleConns: 1,
|
||||
AfterConnect: afterConnectFuncs,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -211,6 +214,7 @@ func TestNewConnectionConfig(t *testing.T) {
|
||||
want: &ConnectionConfig{
|
||||
MaxOpenConns: 6,
|
||||
MaxIdleConns: 3,
|
||||
AfterConnect: afterConnectFuncs,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -225,6 +229,7 @@ func TestNewConnectionConfig(t *testing.T) {
|
||||
want: &ConnectionConfig{
|
||||
MaxOpenConns: 2,
|
||||
MaxIdleConns: 1,
|
||||
AfterConnect: afterConnectFuncs,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -239,6 +244,7 @@ func TestNewConnectionConfig(t *testing.T) {
|
||||
want: &ConnectionConfig{
|
||||
MaxOpenConns: 2,
|
||||
MaxIdleConns: 1,
|
||||
AfterConnect: afterConnectFuncs,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@@ -86,13 +86,15 @@ func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpo
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
|
||||
for _, f := range connConfig.AfterConnect {
|
||||
if err := f(ctx, conn); err != nil {
|
||||
return err
|
||||
if len(connConfig.AfterConnect) > 0 {
|
||||
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
|
||||
for _, f := range connConfig.AfterConnect {
|
||||
if err := f(ctx, conn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// For the pusher we set the app name with the instance ID
|
||||
|
Reference in New Issue
Block a user