From fe1337536f9d7e055e53850b060e9ee806b6c1cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20M=C3=B6hlmann?= Date: Wed, 20 Dec 2023 18:13:04 +0200 Subject: [PATCH] fix(db): add additional connection pool for projection spooling (#7094) * fix(db): add additional connection pool for projection spooling * use correct connection pool for projections --------- Co-authored-by: Livio Spring --- cmd/defaults.yaml | 13 +- cmd/initialise/init.go | 3 +- cmd/initialise/verify_zitadel.go | 3 +- cmd/key/key.go | 3 +- cmd/setup/cleanup.go | 7 +- cmd/setup/setup.go | 37 +-- cmd/start/start.go | 30 ++- internal/database/cockroach/crdb.go | 19 +- internal/database/database.go | 21 +- internal/database/dialect/config.go | 31 ++- internal/database/dialect/config_test.go | 36 +++ internal/database/dialect/connections.go | 103 +++++-- internal/database/dialect/connections_test.go | 252 ++++++++++++++++++ internal/database/postgres/pg.go | 24 +- internal/eventstore/repository/sql/crdb.go | 9 +- internal/query/query.go | 6 +- 16 files changed, 478 insertions(+), 119 deletions(-) create mode 100644 internal/database/dialect/config_test.go create mode 100644 internal/database/dialect/connections_test.go diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index bff039bdf8..ab158f36a7 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -88,10 +88,17 @@ HTTP1HostHeader: "host" # ZITADEL_HTTP1HOSTHEADER WebAuthNName: ZITADEL # ZITADEL_WEBAUTHN_NAME Database: - # This setting defines the ratio of how many connections defined below - # are used to push events. ZITADEL manages two database connection pools - # one to push events and one for the remaining queries. + # ZITADEL manages three database connection pools. + # The *ConnRatio settings define the ratio of how many connections from + # MaxOpenConns and MaxIdleConns are used to push events and spool projections. + # Remaining connection are used for queries (search). + # Values may not be negative and the sum of the ratios must always be less than 1. + # For example this defaults define 40 MaxOpenConns overall. + # - 40*0.2=8 connections are allocated to the event pusher; + # - 40*0.2=8 connections are allocated to the projection spooler; + # - 40-(8+8)=24 connections are remaining for queries; EventPushConnRatio: 0.2 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO + ProjectionSpoolerConnRatio: 0.2 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO # CockroachDB is the default database of ZITADEL cockroach: Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST diff --git a/cmd/initialise/init.go b/cmd/initialise/init.go index 69aadd87be..0af04925c9 100644 --- a/cmd/initialise/init.go +++ b/cmd/initialise/init.go @@ -8,6 +8,7 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" ) var ( @@ -75,7 +76,7 @@ func initialise(config database.Config, steps ...func(*database.DB) error) error return err } - db, err := database.Connect(config, true, false) + db, err := database.Connect(config, true, dialect.DBPurposeQuery) if err != nil { return err } diff --git a/cmd/initialise/verify_zitadel.go b/cmd/initialise/verify_zitadel.go index 1128227ce8..7307ad94d9 100644 --- a/cmd/initialise/verify_zitadel.go +++ b/cmd/initialise/verify_zitadel.go @@ -9,6 +9,7 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" ) func newZitadel() *cobra.Command { @@ -75,7 +76,7 @@ func VerifyZitadel(db *database.DB, config database.Config) error { func verifyZitadel(config database.Config) error { logging.WithFields("database", config.DatabaseName()).Info("verify zitadel") - db, err := database.Connect(config, false, false) + db, err := database.Connect(config, false, dialect.DBPurposeQuery) if err != nil { return err } diff --git a/cmd/key/key.go b/cmd/key/key.go index 833fbb70d3..b18bb867e8 100644 --- a/cmd/key/key.go +++ b/cmd/key/key.go @@ -12,6 +12,7 @@ import ( "github.com/zitadel/zitadel/internal/crypto" cryptoDB "github.com/zitadel/zitadel/internal/crypto/database" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -123,7 +124,7 @@ func openFile(fileName string) (io.Reader, error) { } func keyStorage(config database.Config, masterKey string) (crypto.KeyStorage, error) { - db, err := database.Connect(config, false, false) + db, err := database.Connect(config, false, dialect.DBPurposeQuery) if err != nil { return nil, err } diff --git a/cmd/setup/cleanup.go b/cmd/setup/cleanup.go index 8da873b40e..af93cdd1e2 100644 --- a/cmd/setup/cleanup.go +++ b/cmd/setup/cleanup.go @@ -8,6 +8,7 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" new_es "github.com/zitadel/zitadel/internal/eventstore/v3" @@ -31,13 +32,13 @@ func Cleanup(config *Config) { logging.Info("cleanup started") - zitadelDBClient, err := database.Connect(config.Database, false, false) + queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery) logging.OnError(err).Fatal("unable to connect to database") - esPusherDBClient, err := database.Connect(config.Database, false, true) + esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher) logging.OnError(err).Fatal("unable to connect to database") config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) - config.Eventstore.Querier = old_es.NewCRDB(zitadelDBClient) + config.Eventstore.Querier = old_es.NewCRDB(queryDBClient) es := eventstore.NewEventstore(config.Eventstore) migration.RegisterMappers(es) diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 7a1a2342f0..476068fea3 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -13,6 +13,7 @@ import ( "github.com/zitadel/zitadel/cmd/key" "github.com/zitadel/zitadel/cmd/tls" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" new_es "github.com/zitadel/zitadel/internal/eventstore/v3" @@ -67,26 +68,28 @@ func Setup(config *Config, steps *Steps, masterKey string) { i18n.MustLoadSupportedLanguagesFromDir() - zitadelDBClient, err := database.Connect(config.Database, false, false) + queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery) logging.OnError(err).Fatal("unable to connect to database") - esPusherDBClient, err := database.Connect(config.Database, false, true) + esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher) + logging.OnError(err).Fatal("unable to connect to database") + projectionDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeProjectionSpooler) logging.OnError(err).Fatal("unable to connect to database") - config.Eventstore.Querier = old_es.NewCRDB(zitadelDBClient) + config.Eventstore.Querier = old_es.NewCRDB(queryDBClient) config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) eventstoreClient := eventstore.NewEventstore(config.Eventstore) logging.OnError(err).Fatal("unable to start eventstore") migration.RegisterMappers(eventstoreClient) - steps.s1ProjectionTable = &ProjectionTable{dbClient: zitadelDBClient.DB} - steps.s2AssetsTable = &AssetTable{dbClient: zitadelDBClient.DB} + steps.s1ProjectionTable = &ProjectionTable{dbClient: queryDBClient.DB} + steps.s2AssetsTable = &AssetTable{dbClient: queryDBClient.DB} steps.FirstInstance.instanceSetup = config.DefaultInstance steps.FirstInstance.userEncryptionKey = config.EncryptionKeys.User steps.FirstInstance.smtpEncryptionKey = config.EncryptionKeys.SMTP steps.FirstInstance.oidcEncryptionKey = config.EncryptionKeys.OIDC steps.FirstInstance.masterKey = masterKey - steps.FirstInstance.db = zitadelDBClient + steps.FirstInstance.db = queryDBClient steps.FirstInstance.es = eventstoreClient steps.FirstInstance.defaults = config.SystemDefaults steps.FirstInstance.zitadelRoles = config.InternalAuthZ.RolePermissionMappings @@ -94,20 +97,20 @@ func Setup(config *Config, steps *Steps, masterKey string) { steps.FirstInstance.externalSecure = config.ExternalSecure steps.FirstInstance.externalPort = config.ExternalPort - steps.s5LastFailed = &LastFailed{dbClient: zitadelDBClient.DB} - steps.s6OwnerRemoveColumns = &OwnerRemoveColumns{dbClient: zitadelDBClient.DB} - steps.s7LogstoreTables = &LogstoreTables{dbClient: zitadelDBClient.DB, username: config.Database.Username(), dbType: config.Database.Type()} - steps.s8AuthTokens = &AuthTokenIndexes{dbClient: zitadelDBClient} + steps.s5LastFailed = &LastFailed{dbClient: queryDBClient.DB} + steps.s6OwnerRemoveColumns = &OwnerRemoveColumns{dbClient: queryDBClient.DB} + steps.s7LogstoreTables = &LogstoreTables{dbClient: queryDBClient.DB, username: config.Database.Username(), dbType: config.Database.Type()} + steps.s8AuthTokens = &AuthTokenIndexes{dbClient: queryDBClient} steps.CorrectCreationDate.dbClient = esPusherDBClient - steps.s12AddOTPColumns = &AddOTPColumns{dbClient: zitadelDBClient} - steps.s13FixQuotaProjection = &FixQuotaConstraints{dbClient: zitadelDBClient} + steps.s12AddOTPColumns = &AddOTPColumns{dbClient: queryDBClient} + steps.s13FixQuotaProjection = &FixQuotaConstraints{dbClient: queryDBClient} steps.s14NewEventsTable = &NewEventsTable{dbClient: esPusherDBClient} - steps.s15CurrentStates = &CurrentProjectionState{dbClient: zitadelDBClient} - steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: zitadelDBClient} - steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: zitadelDBClient} - steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: zitadelDBClient} + steps.s15CurrentStates = &CurrentProjectionState{dbClient: queryDBClient} + steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: queryDBClient} + steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: queryDBClient} + steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: queryDBClient} - err = projection.Create(ctx, zitadelDBClient, eventstoreClient, config.Projections, nil, nil, nil) + err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") repeatableSteps := []migration.RepeatableMigration{ diff --git a/cmd/start/start.go b/cmd/start/start.go index f4be4d33ad..f367d923fd 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -58,6 +58,7 @@ import ( "github.com/zitadel/zitadel/internal/crypto" cryptoDB "github.com/zitadel/zitadel/internal/crypto/database" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" @@ -125,16 +126,20 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error i18n.MustLoadSupportedLanguagesFromDir() - zitadelDBClient, err := database.Connect(config.Database, false, false) + queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery) if err != nil { - return fmt.Errorf("cannot start client for projection: %w", err) + return fmt.Errorf("cannot start DB client for queries: %w", err) } - esPusherDBClient, err := database.Connect(config.Database, false, true) + esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher) if err != nil { - return fmt.Errorf("cannot start client for projection: %w", err) + return fmt.Errorf("cannot start client for event store pusher: %w", err) + } + projectionDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeProjectionSpooler) + if err != nil { + return fmt.Errorf("cannot start client for projection spooler: %w", err) } - keyStorage, err := cryptoDB.NewKeyStorage(zitadelDBClient, masterKey) + keyStorage, err := cryptoDB.NewKeyStorage(queryDBClient, masterKey) if err != nil { return fmt.Errorf("cannot start key storage: %w", err) } @@ -144,7 +149,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error } config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) - config.Eventstore.Querier = old_es.NewCRDB(zitadelDBClient) + config.Eventstore.Querier = old_es.NewCRDB(queryDBClient) eventstoreClient := eventstore.NewEventstore(config.Eventstore) sessionTokenVerifier := internal_authz.SessionTokenVerifier(keys.OIDC) @@ -152,7 +157,8 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error queries, err := query.StartQueries( ctx, eventstoreClient, - zitadelDBClient, + queryDBClient, + projectionDBClient, config.Projections, config.SystemDefaults, keys.IDPConfig, @@ -173,7 +179,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error return fmt.Errorf("cannot start queries: %w", err) } - authZRepo, err := authz.Start(queries, eventstoreClient, zitadelDBClient, keys.OIDC, config.ExternalSecure) + authZRepo, err := authz.Start(queries, eventstoreClient, queryDBClient, keys.OIDC, config.ExternalSecure) if err != nil { return fmt.Errorf("error starting authz repo: %w", err) } @@ -181,7 +187,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error return internal_authz.CheckPermission(ctx, authZRepo, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID) } - storage, err := config.AssetStorage.NewStorage(zitadelDBClient.DB) + storage, err := config.AssetStorage.NewStorage(queryDBClient.DB) if err != nil { return fmt.Errorf("cannot start asset storage client: %w", err) } @@ -224,7 +230,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error if err != nil { return err } - actionsExecutionDBEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, config.Quotas.Execution, execution.NewDatabaseLogStorage(zitadelDBClient, commands, queries)) + actionsExecutionDBEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, config.Quotas.Execution, execution.NewDatabaseLogStorage(queryDBClient, commands, queries)) if err != nil { return err } @@ -263,7 +269,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error commands, queries, eventstoreClient, - zitadelDBClient, + queryDBClient, config, storage, authZRepo, @@ -280,7 +286,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error if server != nil { server <- &Server{ Config: config, - DB: zitadelDBClient, + DB: queryDBClient, KeyStorage: keyStorage, Keys: keys, Eventstore: eventstoreClient, diff --git a/internal/database/cockroach/crdb.go b/internal/database/cockroach/crdb.go index 7f43bb109c..487db73e47 100644 --- a/internal/database/cockroach/crdb.go +++ b/internal/database/cockroach/crdb.go @@ -68,28 +68,19 @@ func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) { return c, nil } -func (c *Config) Connect(useAdmin, isEventPusher bool, pusherRatio float32, appName string) (*sql.DB, error) { - client, err := sql.Open("pgx", c.String(useAdmin, appName)) +func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, error) { + client, err := sql.Open("pgx", c.String(useAdmin, purpose.AppName())) if err != nil { return nil, err } - connInfo, err := dialect.NewConnectionInfo(c.MaxOpenConns, c.MaxIdleConns, float64(pusherRatio)) + connConfig, err := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns, spoolerRatio, pusherRatio, purpose) if err != nil { return nil, err } - var maxConns, maxIdleConns uint32 - if isEventPusher { - maxConns = connInfo.EventstorePusher.MaxOpenConns - maxIdleConns = connInfo.EventstorePusher.MaxIdleConns - } else { - maxConns = connInfo.ZITADEL.MaxOpenConns - maxIdleConns = connInfo.ZITADEL.MaxIdleConns - } - - client.SetMaxOpenConns(int(maxConns)) - client.SetMaxIdleConns(int(maxIdleConns)) + client.SetMaxOpenConns(int(connConfig.MaxIdleConns)) + client.SetMaxIdleConns(int(connConfig.MaxIdleConns)) client.SetConnMaxLifetime(c.MaxConnLifetime) client.SetConnMaxIdleTime(c.MaxConnIdleTime) diff --git a/internal/database/database.go b/internal/database/database.go index ce48b89af3..30e03150c5 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -17,9 +17,10 @@ import ( ) type Config struct { - Dialects map[string]interface{} `mapstructure:",remain"` - EventPushConnRatio float32 - connector dialect.Connector + Dialects map[string]interface{} `mapstructure:",remain"` + EventPushConnRatio float64 + ProjectionSpoolerConnRatio float64 + connector dialect.Connector } func (c *Config) SetConnector(connector dialect.Connector) { @@ -109,18 +110,8 @@ func QueryJSONObject[T any](ctx context.Context, db *DB, query string, args ...a return obj, nil } -const ( - zitadelAppName = "zitadel" - EventstorePusherAppName = "zitadel_es_pusher" -) - -func Connect(config Config, useAdmin, isEventPusher bool) (*DB, error) { - appName := zitadelAppName - if isEventPusher { - appName = EventstorePusherAppName - } - - client, err := config.connector.Connect(useAdmin, isEventPusher, config.EventPushConnRatio, appName) +func Connect(config Config, useAdmin bool, purpose dialect.DBPurpose) (*DB, error) { + client, err := config.connector.Connect(useAdmin, config.EventPushConnRatio, config.ProjectionSpoolerConnRatio, purpose) if err != nil { return nil, err } diff --git a/internal/database/dialect/config.go b/internal/database/dialect/config.go index 210e7126d3..645fc6efac 100644 --- a/internal/database/dialect/config.go +++ b/internal/database/dialect/config.go @@ -23,8 +23,37 @@ type Matcher interface { Decode([]interface{}) (Connector, error) } +const ( + QueryAppName = "zitadel_queries" + EventstorePusherAppName = "zitadel_es_pusher" + ProjectionSpoolerAppName = "zitadel_projection_spooler" + defaultAppName = "zitadel" +) + +// DBPurpose is what the resulting connection pool is used for. +type DBPurpose int + +const ( + DBPurposeQuery DBPurpose = iota + DBPurposeEventPusher + DBPurposeProjectionSpooler +) + +func (p DBPurpose) AppName() string { + switch p { + case DBPurposeQuery: + return QueryAppName + case DBPurposeEventPusher: + return EventstorePusherAppName + case DBPurposeProjectionSpooler: + return ProjectionSpoolerAppName + default: + return defaultAppName + } +} + type Connector interface { - Connect(useAdmin, isEventPusher bool, pusherRatio float32, appName string) (*sql.DB, error) + Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose DBPurpose) (*sql.DB, error) Password() string Database } diff --git a/internal/database/dialect/config_test.go b/internal/database/dialect/config_test.go new file mode 100644 index 0000000000..d7297f8b67 --- /dev/null +++ b/internal/database/dialect/config_test.go @@ -0,0 +1,36 @@ +package dialect + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDBPurpose_AppName(t *testing.T) { + tests := []struct { + p DBPurpose + want string + }{ + { + p: DBPurposeQuery, + want: QueryAppName, + }, + { + p: DBPurposeEventPusher, + want: EventstorePusherAppName, + }, + { + p: DBPurposeProjectionSpooler, + want: ProjectionSpoolerAppName, + }, + { + p: 99, + want: defaultAppName, + }, + } + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + assert.Equal(t, tt.want, tt.p.AppName()) + }) + } +} diff --git a/internal/database/dialect/connections.go b/internal/database/dialect/connections.go index cd6e87daa8..48f8d6e223 100644 --- a/internal/database/dialect/connections.go +++ b/internal/database/dialect/connections.go @@ -1,43 +1,90 @@ package dialect -import "errors" +import ( + "errors" + "fmt" +) -type ConnectionInfo struct { - EventstorePusher ConnectionConfig - ZITADEL ConnectionConfig -} +var ( + ErrNegativeRatio = errors.New("ratio cannot be negative") + ErrHighSumRatio = errors.New("sum of pusher and projection ratios must be < 1") + ErrIllegalMaxOpenConns = errors.New("MaxOpenConns of the database must be higher than 3 or 0 for unlimited") + ErrIllegalMaxIdleConns = errors.New("MaxIdleConns of the database must be higher than 3 or 0 for unlimited") + ErrInvalidPurpose = errors.New("DBPurpose out of range") +) +// ConnectionConfig defines the Max Open and Idle connections for a DB connection pool. type ConnectionConfig struct { MaxOpenConns, MaxIdleConns uint32 } -func NewConnectionInfo(openConns, idleConns uint32, pusherRatio float64) (*ConnectionInfo, error) { - if pusherRatio < 0 || pusherRatio > 1 { - return nil, errors.New("EventPushConnRatio must be between 0 and 1") - } - if openConns != 0 && openConns < 2 { - return nil, errors.New("MaxOpenConns of the database must be higher that 1") +// takeRatio of MaxOpenConns and MaxIdleConns from config and returns +// a new ConnectionConfig with the resulting values. +func (c *ConnectionConfig) takeRatio(ratio float64) (*ConnectionConfig, error) { + if ratio < 0 { + return nil, ErrNegativeRatio } - info := new(ConnectionInfo) - - info.EventstorePusher.MaxOpenConns = uint32(pusherRatio * float64(openConns)) - info.EventstorePusher.MaxIdleConns = uint32(pusherRatio * float64(idleConns)) - - if openConns != 0 && info.EventstorePusher.MaxOpenConns < 1 && pusherRatio > 0 { - info.EventstorePusher.MaxOpenConns = 1 + out := &ConnectionConfig{ + MaxOpenConns: uint32(ratio * float64(c.MaxOpenConns)), + MaxIdleConns: uint32(ratio * float64(c.MaxIdleConns)), } - if idleConns != 0 && info.EventstorePusher.MaxIdleConns < 1 && pusherRatio > 0 { - info.EventstorePusher.MaxIdleConns = 1 + if c.MaxOpenConns != 0 && out.MaxOpenConns < 1 && ratio > 0 { + out.MaxOpenConns = 1 + } + if c.MaxIdleConns != 0 && out.MaxIdleConns < 1 && ratio > 0 { + out.MaxIdleConns = 1 } - if openConns != 0 { - info.ZITADEL.MaxOpenConns = openConns - info.EventstorePusher.MaxOpenConns - } - if idleConns != 0 { - info.ZITADEL.MaxIdleConns = idleConns - info.EventstorePusher.MaxIdleConns - } - - return info, nil + return out, nil +} + +// NewConnectionConfig calculates [ConnectionConfig] values from the passed ratios +// and returns the config applicable for the requested purpose. +// +// openConns and idleConns must be at least 3 or 0, which means no limit. +// The pusherRatio and spoolerRatio must be between 0 and 1. +func NewConnectionConfig(openConns, idleConns uint32, pusherRatio, projectionRatio float64, purpose DBPurpose) (*ConnectionConfig, error) { + if openConns != 0 && openConns < 3 { + return nil, ErrIllegalMaxOpenConns + } + if idleConns != 0 && idleConns < 3 { + return nil, ErrIllegalMaxIdleConns + } + if pusherRatio+projectionRatio >= 1 { + return nil, ErrHighSumRatio + } + + queryConfig := &ConnectionConfig{ + MaxOpenConns: openConns, + MaxIdleConns: idleConns, + } + pusherConfig, err := queryConfig.takeRatio(pusherRatio) + if err != nil { + return nil, fmt.Errorf("event pusher: %w", err) + } + spoolerConfig, err := queryConfig.takeRatio(projectionRatio) + if err != nil { + return nil, fmt.Errorf("projection spooler: %w", err) + } + + // subtract the claimed amount + if queryConfig.MaxOpenConns > 0 { + queryConfig.MaxOpenConns -= pusherConfig.MaxOpenConns + spoolerConfig.MaxOpenConns + } + if queryConfig.MaxIdleConns > 0 { + queryConfig.MaxIdleConns -= pusherConfig.MaxIdleConns + spoolerConfig.MaxIdleConns + } + + switch purpose { + case DBPurposeQuery: + return queryConfig, nil + case DBPurposeEventPusher: + return pusherConfig, nil + case DBPurposeProjectionSpooler: + return spoolerConfig, nil + default: + return nil, fmt.Errorf("%w: %v", ErrInvalidPurpose, purpose) + } } diff --git a/internal/database/dialect/connections_test.go b/internal/database/dialect/connections_test.go new file mode 100644 index 0000000000..6256658d0a --- /dev/null +++ b/internal/database/dialect/connections_test.go @@ -0,0 +1,252 @@ +package dialect + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConnectionConfig_takeRatio(t *testing.T) { + type fields struct { + MaxOpenConns uint32 + MaxIdleConns uint32 + } + tests := []struct { + name string + fields fields + ratio float64 + wantOut *ConnectionConfig + wantErr error + }{ + { + name: "ratio less than 0 error", + ratio: -0.1, + wantErr: ErrNegativeRatio, + }, + { + name: "zero values", + fields: fields{ + MaxOpenConns: 0, + MaxIdleConns: 0, + }, + ratio: 0, + wantOut: &ConnectionConfig{ + MaxOpenConns: 0, + MaxIdleConns: 0, + }, + }, + { + name: "max conns, ratio 0", + fields: fields{ + MaxOpenConns: 10, + MaxIdleConns: 5, + }, + ratio: 0, + wantOut: &ConnectionConfig{ + MaxOpenConns: 0, + MaxIdleConns: 0, + }, + }, + { + name: "half ratio", + fields: fields{ + MaxOpenConns: 10, + MaxIdleConns: 5, + }, + ratio: 0.5, + wantOut: &ConnectionConfig{ + MaxOpenConns: 5, + MaxIdleConns: 2, + }, + }, + { + name: "minimal 1", + fields: fields{ + MaxOpenConns: 2, + MaxIdleConns: 2, + }, + ratio: 0.1, + wantOut: &ConnectionConfig{ + MaxOpenConns: 1, + MaxIdleConns: 1, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + in := &ConnectionConfig{ + MaxOpenConns: tt.fields.MaxOpenConns, + MaxIdleConns: tt.fields.MaxIdleConns, + } + got, err := in.takeRatio(tt.ratio) + require.ErrorIs(t, err, tt.wantErr) + assert.Equal(t, tt.wantOut, got) + }) + } +} + +func TestNewConnectionConfig(t *testing.T) { + type args struct { + openConns uint32 + idleConns uint32 + pusherRatio float64 + projectionRatio float64 + purpose DBPurpose + } + tests := []struct { + name string + args args + want *ConnectionConfig + wantErr error + }{ + { + name: "illegal open conns error", + args: args{ + openConns: 2, + idleConns: 3, + }, + wantErr: ErrIllegalMaxOpenConns, + }, + { + name: "illegal idle conns error", + args: args{ + openConns: 3, + idleConns: 2, + }, + wantErr: ErrIllegalMaxIdleConns, + }, + { + name: "high ration sum error", + args: args{ + openConns: 3, + idleConns: 3, + pusherRatio: 0.5, + projectionRatio: 0.5, + }, + wantErr: ErrHighSumRatio, + }, + { + name: "illegal pusher ratio error", + args: args{ + openConns: 3, + idleConns: 3, + pusherRatio: -0.1, + projectionRatio: 0.5, + }, + wantErr: ErrNegativeRatio, + }, + { + name: "illegal projection ratio error", + args: args{ + openConns: 3, + idleConns: 3, + pusherRatio: 0.5, + projectionRatio: -0.1, + }, + wantErr: ErrNegativeRatio, + }, + { + name: "invalid purpose error", + args: args{ + openConns: 3, + idleConns: 3, + pusherRatio: 0.4, + projectionRatio: 0.4, + purpose: 99, + }, + wantErr: ErrInvalidPurpose, + }, + { + name: "min values, query purpose", + args: args{ + openConns: 3, + idleConns: 3, + pusherRatio: 0.2, + projectionRatio: 0.2, + purpose: DBPurposeQuery, + }, + want: &ConnectionConfig{ + MaxOpenConns: 1, + MaxIdleConns: 1, + }, + }, + { + name: "min values, pusher purpose", + args: args{ + openConns: 3, + idleConns: 3, + pusherRatio: 0.2, + projectionRatio: 0.2, + purpose: DBPurposeEventPusher, + }, + want: &ConnectionConfig{ + MaxOpenConns: 1, + MaxIdleConns: 1, + }, + }, + { + name: "min values, projection purpose", + args: args{ + openConns: 3, + idleConns: 3, + pusherRatio: 0.2, + projectionRatio: 0.2, + purpose: DBPurposeProjectionSpooler, + }, + want: &ConnectionConfig{ + MaxOpenConns: 1, + MaxIdleConns: 1, + }, + }, + { + name: "high values, query purpose", + args: args{ + openConns: 10, + idleConns: 5, + pusherRatio: 0.2, + projectionRatio: 0.2, + purpose: DBPurposeQuery, + }, + want: &ConnectionConfig{ + MaxOpenConns: 6, + MaxIdleConns: 3, + }, + }, + { + name: "high values, pusher purpose", + args: args{ + openConns: 10, + idleConns: 5, + pusherRatio: 0.2, + projectionRatio: 0.2, + purpose: DBPurposeEventPusher, + }, + want: &ConnectionConfig{ + MaxOpenConns: 2, + MaxIdleConns: 1, + }, + }, + { + name: "high values, projection purpose", + args: args{ + openConns: 10, + idleConns: 5, + pusherRatio: 0.2, + projectionRatio: 0.2, + purpose: DBPurposeProjectionSpooler, + }, + want: &ConnectionConfig{ + MaxOpenConns: 2, + MaxIdleConns: 1, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewConnectionConfig(tt.args.openConns, tt.args.idleConns, tt.args.pusherRatio, tt.args.projectionRatio, tt.args.purpose) + require.ErrorIs(t, err, tt.wantErr) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/internal/database/postgres/pg.go b/internal/database/postgres/pg.go index f46067a08d..008f399ea1 100644 --- a/internal/database/postgres/pg.go +++ b/internal/database/postgres/pg.go @@ -69,31 +69,23 @@ func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) { return c, nil } -func (c *Config) Connect(useAdmin, isEventPusher bool, pusherRatio float32, appName string) (*sql.DB, error) { - db, err := sql.Open("pgx", c.String(useAdmin, appName)) +func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, error) { + client, err := sql.Open("pgx", c.String(useAdmin, purpose.AppName())) if err != nil { return nil, err } - connInfo, err := dialect.NewConnectionInfo(c.MaxOpenConns, c.MaxIdleConns, float64(pusherRatio)) + connConfig, err := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns, spoolerRatio, pusherRatio, purpose) if err != nil { return nil, err } - var maxConns, maxIdleConns uint32 - if isEventPusher { - maxConns = connInfo.EventstorePusher.MaxOpenConns - maxIdleConns = connInfo.EventstorePusher.MaxIdleConns - } else { - maxConns = connInfo.ZITADEL.MaxOpenConns - maxIdleConns = connInfo.ZITADEL.MaxIdleConns - } - db.SetMaxOpenConns(int(maxConns)) - db.SetMaxIdleConns(int(maxIdleConns)) - db.SetConnMaxLifetime(c.MaxConnLifetime) - db.SetConnMaxIdleTime(c.MaxConnIdleTime) + client.SetMaxOpenConns(int(connConfig.MaxIdleConns)) + client.SetMaxIdleConns(int(connConfig.MaxIdleConns)) + client.SetConnMaxLifetime(c.MaxConnLifetime) + client.SetConnMaxIdleTime(c.MaxConnIdleTime) - return db, nil + return client, nil } func (c *Config) DatabaseName() string { diff --git a/internal/eventstore/repository/sql/crdb.go b/internal/eventstore/repository/sql/crdb.go index e782a4c1be..5e53614be4 100644 --- a/internal/eventstore/repository/sql/crdb.go +++ b/internal/eventstore/repository/sql/crdb.go @@ -16,6 +16,7 @@ import ( "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/repository" "github.com/zitadel/zitadel/internal/zerrors" @@ -124,11 +125,11 @@ type CRDB struct { func NewCRDB(client *database.DB) *CRDB { switch client.Type() { case "cockroach": - awaitOpenTransactionsV1 = " AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '" + database.EventstorePusherAppName + "')" - awaitOpenTransactionsV2 = ` AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '` + database.EventstorePusherAppName + `')` + awaitOpenTransactionsV1 = " AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '" + dialect.EventstorePusherAppName + "')" + awaitOpenTransactionsV2 = ` AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '` + dialect.EventstorePusherAppName + `')` case "postgres": - awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + database.EventstorePusherAppName + `' AND state <> 'idle')` - awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + database.EventstorePusherAppName + `' AND state <> 'idle')` + awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + dialect.EventstorePusherAppName + `' AND state <> 'idle')` + awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + dialect.EventstorePusherAppName + `' AND state <> 'idle')` } return &CRDB{client} diff --git a/internal/query/query.go b/internal/query/query.go index b746909ef3..1b48c44a4f 100644 --- a/internal/query/query.go +++ b/internal/query/query.go @@ -58,7 +58,7 @@ type Queries struct { func StartQueries( ctx context.Context, es *eventstore.Eventstore, - sqlClient *database.DB, + querySqlClient, projectionSqlClient *database.DB, projections projection.Config, defaults sd.SystemDefaults, idpConfigEncryption, otpEncryption, keyEncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm, @@ -70,7 +70,7 @@ func StartQueries( ) (repo *Queries, err error) { repo = &Queries{ eventstore: es, - client: sqlClient, + client: querySqlClient, DefaultLanguage: language.Und, LoginTranslationFileContents: make(map[string][]byte), NotificationTranslationFileContents: make(map[string][]byte), @@ -104,7 +104,7 @@ func StartQueries( repo.checkPermission = permissionCheck(repo) - err = projection.Create(ctx, sqlClient, es, projections, keyEncryptionAlgorithm, certEncryptionAlgorithm, systemAPIUsers) + err = projection.Create(ctx, projectionSqlClient, es, projections, keyEncryptionAlgorithm, certEncryptionAlgorithm, systemAPIUsers) if err != nil { return nil, err }