fix(db): add additional connection pool for projection spooling (#7094)

* fix(db): add additional connection pool for projection spooling

* use correct connection pool for projections

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Tim Möhlmann 2023-12-20 18:13:04 +02:00 committed by GitHub
parent f4e73b9b75
commit fe1337536f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 478 additions and 119 deletions

View File

@ -88,10 +88,17 @@ HTTP1HostHeader: "host" # ZITADEL_HTTP1HOSTHEADER
WebAuthNName: ZITADEL # ZITADEL_WEBAUTHN_NAME WebAuthNName: ZITADEL # ZITADEL_WEBAUTHN_NAME
Database: Database:
# This setting defines the ratio of how many connections defined below # ZITADEL manages three database connection pools.
# are used to push events. ZITADEL manages two database connection pools # The *ConnRatio settings define the ratio of how many connections from
# one to push events and one for the remaining queries. # MaxOpenConns and MaxIdleConns are used to push events and spool projections.
# Remaining connection are used for queries (search).
# Values may not be negative and the sum of the ratios must always be less than 1.
# For example this defaults define 40 MaxOpenConns overall.
# - 40*0.2=8 connections are allocated to the event pusher;
# - 40*0.2=8 connections are allocated to the projection spooler;
# - 40-(8+8)=24 connections are remaining for queries;
EventPushConnRatio: 0.2 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO EventPushConnRatio: 0.2 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO
ProjectionSpoolerConnRatio: 0.2 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
# CockroachDB is the default database of ZITADEL # CockroachDB is the default database of ZITADEL
cockroach: cockroach:
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST

View File

@ -8,6 +8,7 @@ import (
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
) )
var ( var (
@ -75,7 +76,7 @@ func initialise(config database.Config, steps ...func(*database.DB) error) error
return err return err
} }
db, err := database.Connect(config, true, false) db, err := database.Connect(config, true, dialect.DBPurposeQuery)
if err != nil { if err != nil {
return err return err
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
) )
func newZitadel() *cobra.Command { func newZitadel() *cobra.Command {
@ -75,7 +76,7 @@ func VerifyZitadel(db *database.DB, config database.Config) error {
func verifyZitadel(config database.Config) error { func verifyZitadel(config database.Config) error {
logging.WithFields("database", config.DatabaseName()).Info("verify zitadel") logging.WithFields("database", config.DatabaseName()).Info("verify zitadel")
db, err := database.Connect(config, false, false) db, err := database.Connect(config, false, dialect.DBPurposeQuery)
if err != nil { if err != nil {
return err return err
} }

View File

@ -12,6 +12,7 @@ import (
"github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/crypto"
cryptoDB "github.com/zitadel/zitadel/internal/crypto/database" cryptoDB "github.com/zitadel/zitadel/internal/crypto/database"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
) )
@ -123,7 +124,7 @@ func openFile(fileName string) (io.Reader, error) {
} }
func keyStorage(config database.Config, masterKey string) (crypto.KeyStorage, error) { func keyStorage(config database.Config, masterKey string) (crypto.KeyStorage, error) {
db, err := database.Connect(config, false, false) db, err := database.Connect(config, false, dialect.DBPurposeQuery)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3" new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
@ -31,13 +32,13 @@ func Cleanup(config *Config) {
logging.Info("cleanup started") logging.Info("cleanup started")
zitadelDBClient, err := database.Connect(config.Database, false, false) queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery)
logging.OnError(err).Fatal("unable to connect to database") logging.OnError(err).Fatal("unable to connect to database")
esPusherDBClient, err := database.Connect(config.Database, false, true) esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher)
logging.OnError(err).Fatal("unable to connect to database") logging.OnError(err).Fatal("unable to connect to database")
config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient)
config.Eventstore.Querier = old_es.NewCRDB(zitadelDBClient) config.Eventstore.Querier = old_es.NewCRDB(queryDBClient)
es := eventstore.NewEventstore(config.Eventstore) es := eventstore.NewEventstore(config.Eventstore)
migration.RegisterMappers(es) migration.RegisterMappers(es)

View File

@ -13,6 +13,7 @@ import (
"github.com/zitadel/zitadel/cmd/key" "github.com/zitadel/zitadel/cmd/key"
"github.com/zitadel/zitadel/cmd/tls" "github.com/zitadel/zitadel/cmd/tls"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3" new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
@ -67,26 +68,28 @@ func Setup(config *Config, steps *Steps, masterKey string) {
i18n.MustLoadSupportedLanguagesFromDir() i18n.MustLoadSupportedLanguagesFromDir()
zitadelDBClient, err := database.Connect(config.Database, false, false) queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery)
logging.OnError(err).Fatal("unable to connect to database") logging.OnError(err).Fatal("unable to connect to database")
esPusherDBClient, err := database.Connect(config.Database, false, true) esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher)
logging.OnError(err).Fatal("unable to connect to database")
projectionDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeProjectionSpooler)
logging.OnError(err).Fatal("unable to connect to database") logging.OnError(err).Fatal("unable to connect to database")
config.Eventstore.Querier = old_es.NewCRDB(zitadelDBClient) config.Eventstore.Querier = old_es.NewCRDB(queryDBClient)
config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient)
eventstoreClient := eventstore.NewEventstore(config.Eventstore) eventstoreClient := eventstore.NewEventstore(config.Eventstore)
logging.OnError(err).Fatal("unable to start eventstore") logging.OnError(err).Fatal("unable to start eventstore")
migration.RegisterMappers(eventstoreClient) migration.RegisterMappers(eventstoreClient)
steps.s1ProjectionTable = &ProjectionTable{dbClient: zitadelDBClient.DB} steps.s1ProjectionTable = &ProjectionTable{dbClient: queryDBClient.DB}
steps.s2AssetsTable = &AssetTable{dbClient: zitadelDBClient.DB} steps.s2AssetsTable = &AssetTable{dbClient: queryDBClient.DB}
steps.FirstInstance.instanceSetup = config.DefaultInstance steps.FirstInstance.instanceSetup = config.DefaultInstance
steps.FirstInstance.userEncryptionKey = config.EncryptionKeys.User steps.FirstInstance.userEncryptionKey = config.EncryptionKeys.User
steps.FirstInstance.smtpEncryptionKey = config.EncryptionKeys.SMTP steps.FirstInstance.smtpEncryptionKey = config.EncryptionKeys.SMTP
steps.FirstInstance.oidcEncryptionKey = config.EncryptionKeys.OIDC steps.FirstInstance.oidcEncryptionKey = config.EncryptionKeys.OIDC
steps.FirstInstance.masterKey = masterKey steps.FirstInstance.masterKey = masterKey
steps.FirstInstance.db = zitadelDBClient steps.FirstInstance.db = queryDBClient
steps.FirstInstance.es = eventstoreClient steps.FirstInstance.es = eventstoreClient
steps.FirstInstance.defaults = config.SystemDefaults steps.FirstInstance.defaults = config.SystemDefaults
steps.FirstInstance.zitadelRoles = config.InternalAuthZ.RolePermissionMappings steps.FirstInstance.zitadelRoles = config.InternalAuthZ.RolePermissionMappings
@ -94,20 +97,20 @@ func Setup(config *Config, steps *Steps, masterKey string) {
steps.FirstInstance.externalSecure = config.ExternalSecure steps.FirstInstance.externalSecure = config.ExternalSecure
steps.FirstInstance.externalPort = config.ExternalPort steps.FirstInstance.externalPort = config.ExternalPort
steps.s5LastFailed = &LastFailed{dbClient: zitadelDBClient.DB} steps.s5LastFailed = &LastFailed{dbClient: queryDBClient.DB}
steps.s6OwnerRemoveColumns = &OwnerRemoveColumns{dbClient: zitadelDBClient.DB} steps.s6OwnerRemoveColumns = &OwnerRemoveColumns{dbClient: queryDBClient.DB}
steps.s7LogstoreTables = &LogstoreTables{dbClient: zitadelDBClient.DB, username: config.Database.Username(), dbType: config.Database.Type()} steps.s7LogstoreTables = &LogstoreTables{dbClient: queryDBClient.DB, username: config.Database.Username(), dbType: config.Database.Type()}
steps.s8AuthTokens = &AuthTokenIndexes{dbClient: zitadelDBClient} steps.s8AuthTokens = &AuthTokenIndexes{dbClient: queryDBClient}
steps.CorrectCreationDate.dbClient = esPusherDBClient steps.CorrectCreationDate.dbClient = esPusherDBClient
steps.s12AddOTPColumns = &AddOTPColumns{dbClient: zitadelDBClient} steps.s12AddOTPColumns = &AddOTPColumns{dbClient: queryDBClient}
steps.s13FixQuotaProjection = &FixQuotaConstraints{dbClient: zitadelDBClient} steps.s13FixQuotaProjection = &FixQuotaConstraints{dbClient: queryDBClient}
steps.s14NewEventsTable = &NewEventsTable{dbClient: esPusherDBClient} steps.s14NewEventsTable = &NewEventsTable{dbClient: esPusherDBClient}
steps.s15CurrentStates = &CurrentProjectionState{dbClient: zitadelDBClient} steps.s15CurrentStates = &CurrentProjectionState{dbClient: queryDBClient}
steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: zitadelDBClient} steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: queryDBClient}
steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: zitadelDBClient} steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: queryDBClient}
steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: zitadelDBClient} steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: queryDBClient}
err = projection.Create(ctx, zitadelDBClient, eventstoreClient, config.Projections, nil, nil, nil) err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections") logging.OnError(err).Fatal("unable to start projections")
repeatableSteps := []migration.RepeatableMigration{ repeatableSteps := []migration.RepeatableMigration{

View File

@ -58,6 +58,7 @@ import (
"github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/crypto"
cryptoDB "github.com/zitadel/zitadel/internal/crypto/database" cryptoDB "github.com/zitadel/zitadel/internal/crypto/database"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
@ -125,16 +126,20 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
i18n.MustLoadSupportedLanguagesFromDir() i18n.MustLoadSupportedLanguagesFromDir()
zitadelDBClient, err := database.Connect(config.Database, false, false) queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery)
if err != nil { if err != nil {
return fmt.Errorf("cannot start client for projection: %w", err) return fmt.Errorf("cannot start DB client for queries: %w", err)
} }
esPusherDBClient, err := database.Connect(config.Database, false, true) esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher)
if err != nil { if err != nil {
return fmt.Errorf("cannot start client for projection: %w", err) return fmt.Errorf("cannot start client for event store pusher: %w", err)
}
projectionDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeProjectionSpooler)
if err != nil {
return fmt.Errorf("cannot start client for projection spooler: %w", err)
} }
keyStorage, err := cryptoDB.NewKeyStorage(zitadelDBClient, masterKey) keyStorage, err := cryptoDB.NewKeyStorage(queryDBClient, masterKey)
if err != nil { if err != nil {
return fmt.Errorf("cannot start key storage: %w", err) return fmt.Errorf("cannot start key storage: %w", err)
} }
@ -144,7 +149,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
} }
config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient)
config.Eventstore.Querier = old_es.NewCRDB(zitadelDBClient) config.Eventstore.Querier = old_es.NewCRDB(queryDBClient)
eventstoreClient := eventstore.NewEventstore(config.Eventstore) eventstoreClient := eventstore.NewEventstore(config.Eventstore)
sessionTokenVerifier := internal_authz.SessionTokenVerifier(keys.OIDC) sessionTokenVerifier := internal_authz.SessionTokenVerifier(keys.OIDC)
@ -152,7 +157,8 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
queries, err := query.StartQueries( queries, err := query.StartQueries(
ctx, ctx,
eventstoreClient, eventstoreClient,
zitadelDBClient, queryDBClient,
projectionDBClient,
config.Projections, config.Projections,
config.SystemDefaults, config.SystemDefaults,
keys.IDPConfig, keys.IDPConfig,
@ -173,7 +179,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
return fmt.Errorf("cannot start queries: %w", err) return fmt.Errorf("cannot start queries: %w", err)
} }
authZRepo, err := authz.Start(queries, eventstoreClient, zitadelDBClient, keys.OIDC, config.ExternalSecure) authZRepo, err := authz.Start(queries, eventstoreClient, queryDBClient, keys.OIDC, config.ExternalSecure)
if err != nil { if err != nil {
return fmt.Errorf("error starting authz repo: %w", err) return fmt.Errorf("error starting authz repo: %w", err)
} }
@ -181,7 +187,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
return internal_authz.CheckPermission(ctx, authZRepo, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID) return internal_authz.CheckPermission(ctx, authZRepo, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID)
} }
storage, err := config.AssetStorage.NewStorage(zitadelDBClient.DB) storage, err := config.AssetStorage.NewStorage(queryDBClient.DB)
if err != nil { if err != nil {
return fmt.Errorf("cannot start asset storage client: %w", err) return fmt.Errorf("cannot start asset storage client: %w", err)
} }
@ -224,7 +230,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
if err != nil { if err != nil {
return err return err
} }
actionsExecutionDBEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, config.Quotas.Execution, execution.NewDatabaseLogStorage(zitadelDBClient, commands, queries)) actionsExecutionDBEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, config.Quotas.Execution, execution.NewDatabaseLogStorage(queryDBClient, commands, queries))
if err != nil { if err != nil {
return err return err
} }
@ -263,7 +269,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
commands, commands,
queries, queries,
eventstoreClient, eventstoreClient,
zitadelDBClient, queryDBClient,
config, config,
storage, storage,
authZRepo, authZRepo,
@ -280,7 +286,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error
if server != nil { if server != nil {
server <- &Server{ server <- &Server{
Config: config, Config: config,
DB: zitadelDBClient, DB: queryDBClient,
KeyStorage: keyStorage, KeyStorage: keyStorage,
Keys: keys, Keys: keys,
Eventstore: eventstoreClient, Eventstore: eventstoreClient,

View File

@ -68,28 +68,19 @@ func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) {
return c, nil return c, nil
} }
func (c *Config) Connect(useAdmin, isEventPusher bool, pusherRatio float32, appName string) (*sql.DB, error) { func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, error) {
client, err := sql.Open("pgx", c.String(useAdmin, appName)) client, err := sql.Open("pgx", c.String(useAdmin, purpose.AppName()))
if err != nil { if err != nil {
return nil, err return nil, err
} }
connInfo, err := dialect.NewConnectionInfo(c.MaxOpenConns, c.MaxIdleConns, float64(pusherRatio)) connConfig, err := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns, spoolerRatio, pusherRatio, purpose)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var maxConns, maxIdleConns uint32 client.SetMaxOpenConns(int(connConfig.MaxIdleConns))
if isEventPusher { client.SetMaxIdleConns(int(connConfig.MaxIdleConns))
maxConns = connInfo.EventstorePusher.MaxOpenConns
maxIdleConns = connInfo.EventstorePusher.MaxIdleConns
} else {
maxConns = connInfo.ZITADEL.MaxOpenConns
maxIdleConns = connInfo.ZITADEL.MaxIdleConns
}
client.SetMaxOpenConns(int(maxConns))
client.SetMaxIdleConns(int(maxIdleConns))
client.SetConnMaxLifetime(c.MaxConnLifetime) client.SetConnMaxLifetime(c.MaxConnLifetime)
client.SetConnMaxIdleTime(c.MaxConnIdleTime) client.SetConnMaxIdleTime(c.MaxConnIdleTime)

View File

@ -17,9 +17,10 @@ import (
) )
type Config struct { type Config struct {
Dialects map[string]interface{} `mapstructure:",remain"` Dialects map[string]interface{} `mapstructure:",remain"`
EventPushConnRatio float32 EventPushConnRatio float64
connector dialect.Connector ProjectionSpoolerConnRatio float64
connector dialect.Connector
} }
func (c *Config) SetConnector(connector dialect.Connector) { func (c *Config) SetConnector(connector dialect.Connector) {
@ -109,18 +110,8 @@ func QueryJSONObject[T any](ctx context.Context, db *DB, query string, args ...a
return obj, nil return obj, nil
} }
const ( func Connect(config Config, useAdmin bool, purpose dialect.DBPurpose) (*DB, error) {
zitadelAppName = "zitadel" client, err := config.connector.Connect(useAdmin, config.EventPushConnRatio, config.ProjectionSpoolerConnRatio, purpose)
EventstorePusherAppName = "zitadel_es_pusher"
)
func Connect(config Config, useAdmin, isEventPusher bool) (*DB, error) {
appName := zitadelAppName
if isEventPusher {
appName = EventstorePusherAppName
}
client, err := config.connector.Connect(useAdmin, isEventPusher, config.EventPushConnRatio, appName)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -23,8 +23,37 @@ type Matcher interface {
Decode([]interface{}) (Connector, error) Decode([]interface{}) (Connector, error)
} }
const (
QueryAppName = "zitadel_queries"
EventstorePusherAppName = "zitadel_es_pusher"
ProjectionSpoolerAppName = "zitadel_projection_spooler"
defaultAppName = "zitadel"
)
// DBPurpose is what the resulting connection pool is used for.
type DBPurpose int
const (
DBPurposeQuery DBPurpose = iota
DBPurposeEventPusher
DBPurposeProjectionSpooler
)
func (p DBPurpose) AppName() string {
switch p {
case DBPurposeQuery:
return QueryAppName
case DBPurposeEventPusher:
return EventstorePusherAppName
case DBPurposeProjectionSpooler:
return ProjectionSpoolerAppName
default:
return defaultAppName
}
}
type Connector interface { type Connector interface {
Connect(useAdmin, isEventPusher bool, pusherRatio float32, appName string) (*sql.DB, error) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose DBPurpose) (*sql.DB, error)
Password() string Password() string
Database Database
} }

View File

@ -0,0 +1,36 @@
package dialect
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDBPurpose_AppName(t *testing.T) {
tests := []struct {
p DBPurpose
want string
}{
{
p: DBPurposeQuery,
want: QueryAppName,
},
{
p: DBPurposeEventPusher,
want: EventstorePusherAppName,
},
{
p: DBPurposeProjectionSpooler,
want: ProjectionSpoolerAppName,
},
{
p: 99,
want: defaultAppName,
},
}
for _, tt := range tests {
t.Run(tt.want, func(t *testing.T) {
assert.Equal(t, tt.want, tt.p.AppName())
})
}
}

View File

@ -1,43 +1,90 @@
package dialect package dialect
import "errors" import (
"errors"
"fmt"
)
type ConnectionInfo struct { var (
EventstorePusher ConnectionConfig ErrNegativeRatio = errors.New("ratio cannot be negative")
ZITADEL ConnectionConfig ErrHighSumRatio = errors.New("sum of pusher and projection ratios must be < 1")
} ErrIllegalMaxOpenConns = errors.New("MaxOpenConns of the database must be higher than 3 or 0 for unlimited")
ErrIllegalMaxIdleConns = errors.New("MaxIdleConns of the database must be higher than 3 or 0 for unlimited")
ErrInvalidPurpose = errors.New("DBPurpose out of range")
)
// ConnectionConfig defines the Max Open and Idle connections for a DB connection pool.
type ConnectionConfig struct { type ConnectionConfig struct {
MaxOpenConns, MaxOpenConns,
MaxIdleConns uint32 MaxIdleConns uint32
} }
func NewConnectionInfo(openConns, idleConns uint32, pusherRatio float64) (*ConnectionInfo, error) { // takeRatio of MaxOpenConns and MaxIdleConns from config and returns
if pusherRatio < 0 || pusherRatio > 1 { // a new ConnectionConfig with the resulting values.
return nil, errors.New("EventPushConnRatio must be between 0 and 1") func (c *ConnectionConfig) takeRatio(ratio float64) (*ConnectionConfig, error) {
} if ratio < 0 {
if openConns != 0 && openConns < 2 { return nil, ErrNegativeRatio
return nil, errors.New("MaxOpenConns of the database must be higher that 1")
} }
info := new(ConnectionInfo) out := &ConnectionConfig{
MaxOpenConns: uint32(ratio * float64(c.MaxOpenConns)),
info.EventstorePusher.MaxOpenConns = uint32(pusherRatio * float64(openConns)) MaxIdleConns: uint32(ratio * float64(c.MaxIdleConns)),
info.EventstorePusher.MaxIdleConns = uint32(pusherRatio * float64(idleConns))
if openConns != 0 && info.EventstorePusher.MaxOpenConns < 1 && pusherRatio > 0 {
info.EventstorePusher.MaxOpenConns = 1
} }
if idleConns != 0 && info.EventstorePusher.MaxIdleConns < 1 && pusherRatio > 0 { if c.MaxOpenConns != 0 && out.MaxOpenConns < 1 && ratio > 0 {
info.EventstorePusher.MaxIdleConns = 1 out.MaxOpenConns = 1
}
if c.MaxIdleConns != 0 && out.MaxIdleConns < 1 && ratio > 0 {
out.MaxIdleConns = 1
} }
if openConns != 0 { return out, nil
info.ZITADEL.MaxOpenConns = openConns - info.EventstorePusher.MaxOpenConns }
}
if idleConns != 0 { // NewConnectionConfig calculates [ConnectionConfig] values from the passed ratios
info.ZITADEL.MaxIdleConns = idleConns - info.EventstorePusher.MaxIdleConns // and returns the config applicable for the requested purpose.
} //
// openConns and idleConns must be at least 3 or 0, which means no limit.
return info, nil // The pusherRatio and spoolerRatio must be between 0 and 1.
func NewConnectionConfig(openConns, idleConns uint32, pusherRatio, projectionRatio float64, purpose DBPurpose) (*ConnectionConfig, error) {
if openConns != 0 && openConns < 3 {
return nil, ErrIllegalMaxOpenConns
}
if idleConns != 0 && idleConns < 3 {
return nil, ErrIllegalMaxIdleConns
}
if pusherRatio+projectionRatio >= 1 {
return nil, ErrHighSumRatio
}
queryConfig := &ConnectionConfig{
MaxOpenConns: openConns,
MaxIdleConns: idleConns,
}
pusherConfig, err := queryConfig.takeRatio(pusherRatio)
if err != nil {
return nil, fmt.Errorf("event pusher: %w", err)
}
spoolerConfig, err := queryConfig.takeRatio(projectionRatio)
if err != nil {
return nil, fmt.Errorf("projection spooler: %w", err)
}
// subtract the claimed amount
if queryConfig.MaxOpenConns > 0 {
queryConfig.MaxOpenConns -= pusherConfig.MaxOpenConns + spoolerConfig.MaxOpenConns
}
if queryConfig.MaxIdleConns > 0 {
queryConfig.MaxIdleConns -= pusherConfig.MaxIdleConns + spoolerConfig.MaxIdleConns
}
switch purpose {
case DBPurposeQuery:
return queryConfig, nil
case DBPurposeEventPusher:
return pusherConfig, nil
case DBPurposeProjectionSpooler:
return spoolerConfig, nil
default:
return nil, fmt.Errorf("%w: %v", ErrInvalidPurpose, purpose)
}
} }

View File

@ -0,0 +1,252 @@
package dialect
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConnectionConfig_takeRatio(t *testing.T) {
type fields struct {
MaxOpenConns uint32
MaxIdleConns uint32
}
tests := []struct {
name string
fields fields
ratio float64
wantOut *ConnectionConfig
wantErr error
}{
{
name: "ratio less than 0 error",
ratio: -0.1,
wantErr: ErrNegativeRatio,
},
{
name: "zero values",
fields: fields{
MaxOpenConns: 0,
MaxIdleConns: 0,
},
ratio: 0,
wantOut: &ConnectionConfig{
MaxOpenConns: 0,
MaxIdleConns: 0,
},
},
{
name: "max conns, ratio 0",
fields: fields{
MaxOpenConns: 10,
MaxIdleConns: 5,
},
ratio: 0,
wantOut: &ConnectionConfig{
MaxOpenConns: 0,
MaxIdleConns: 0,
},
},
{
name: "half ratio",
fields: fields{
MaxOpenConns: 10,
MaxIdleConns: 5,
},
ratio: 0.5,
wantOut: &ConnectionConfig{
MaxOpenConns: 5,
MaxIdleConns: 2,
},
},
{
name: "minimal 1",
fields: fields{
MaxOpenConns: 2,
MaxIdleConns: 2,
},
ratio: 0.1,
wantOut: &ConnectionConfig{
MaxOpenConns: 1,
MaxIdleConns: 1,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
in := &ConnectionConfig{
MaxOpenConns: tt.fields.MaxOpenConns,
MaxIdleConns: tt.fields.MaxIdleConns,
}
got, err := in.takeRatio(tt.ratio)
require.ErrorIs(t, err, tt.wantErr)
assert.Equal(t, tt.wantOut, got)
})
}
}
func TestNewConnectionConfig(t *testing.T) {
type args struct {
openConns uint32
idleConns uint32
pusherRatio float64
projectionRatio float64
purpose DBPurpose
}
tests := []struct {
name string
args args
want *ConnectionConfig
wantErr error
}{
{
name: "illegal open conns error",
args: args{
openConns: 2,
idleConns: 3,
},
wantErr: ErrIllegalMaxOpenConns,
},
{
name: "illegal idle conns error",
args: args{
openConns: 3,
idleConns: 2,
},
wantErr: ErrIllegalMaxIdleConns,
},
{
name: "high ration sum error",
args: args{
openConns: 3,
idleConns: 3,
pusherRatio: 0.5,
projectionRatio: 0.5,
},
wantErr: ErrHighSumRatio,
},
{
name: "illegal pusher ratio error",
args: args{
openConns: 3,
idleConns: 3,
pusherRatio: -0.1,
projectionRatio: 0.5,
},
wantErr: ErrNegativeRatio,
},
{
name: "illegal projection ratio error",
args: args{
openConns: 3,
idleConns: 3,
pusherRatio: 0.5,
projectionRatio: -0.1,
},
wantErr: ErrNegativeRatio,
},
{
name: "invalid purpose error",
args: args{
openConns: 3,
idleConns: 3,
pusherRatio: 0.4,
projectionRatio: 0.4,
purpose: 99,
},
wantErr: ErrInvalidPurpose,
},
{
name: "min values, query purpose",
args: args{
openConns: 3,
idleConns: 3,
pusherRatio: 0.2,
projectionRatio: 0.2,
purpose: DBPurposeQuery,
},
want: &ConnectionConfig{
MaxOpenConns: 1,
MaxIdleConns: 1,
},
},
{
name: "min values, pusher purpose",
args: args{
openConns: 3,
idleConns: 3,
pusherRatio: 0.2,
projectionRatio: 0.2,
purpose: DBPurposeEventPusher,
},
want: &ConnectionConfig{
MaxOpenConns: 1,
MaxIdleConns: 1,
},
},
{
name: "min values, projection purpose",
args: args{
openConns: 3,
idleConns: 3,
pusherRatio: 0.2,
projectionRatio: 0.2,
purpose: DBPurposeProjectionSpooler,
},
want: &ConnectionConfig{
MaxOpenConns: 1,
MaxIdleConns: 1,
},
},
{
name: "high values, query purpose",
args: args{
openConns: 10,
idleConns: 5,
pusherRatio: 0.2,
projectionRatio: 0.2,
purpose: DBPurposeQuery,
},
want: &ConnectionConfig{
MaxOpenConns: 6,
MaxIdleConns: 3,
},
},
{
name: "high values, pusher purpose",
args: args{
openConns: 10,
idleConns: 5,
pusherRatio: 0.2,
projectionRatio: 0.2,
purpose: DBPurposeEventPusher,
},
want: &ConnectionConfig{
MaxOpenConns: 2,
MaxIdleConns: 1,
},
},
{
name: "high values, projection purpose",
args: args{
openConns: 10,
idleConns: 5,
pusherRatio: 0.2,
projectionRatio: 0.2,
purpose: DBPurposeProjectionSpooler,
},
want: &ConnectionConfig{
MaxOpenConns: 2,
MaxIdleConns: 1,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewConnectionConfig(tt.args.openConns, tt.args.idleConns, tt.args.pusherRatio, tt.args.projectionRatio, tt.args.purpose)
require.ErrorIs(t, err, tt.wantErr)
assert.Equal(t, tt.want, got)
})
}
}

View File

@ -69,31 +69,23 @@ func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) {
return c, nil return c, nil
} }
func (c *Config) Connect(useAdmin, isEventPusher bool, pusherRatio float32, appName string) (*sql.DB, error) { func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, error) {
db, err := sql.Open("pgx", c.String(useAdmin, appName)) client, err := sql.Open("pgx", c.String(useAdmin, purpose.AppName()))
if err != nil { if err != nil {
return nil, err return nil, err
} }
connInfo, err := dialect.NewConnectionInfo(c.MaxOpenConns, c.MaxIdleConns, float64(pusherRatio)) connConfig, err := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns, spoolerRatio, pusherRatio, purpose)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var maxConns, maxIdleConns uint32 client.SetMaxOpenConns(int(connConfig.MaxIdleConns))
if isEventPusher { client.SetMaxIdleConns(int(connConfig.MaxIdleConns))
maxConns = connInfo.EventstorePusher.MaxOpenConns client.SetConnMaxLifetime(c.MaxConnLifetime)
maxIdleConns = connInfo.EventstorePusher.MaxIdleConns client.SetConnMaxIdleTime(c.MaxConnIdleTime)
} else {
maxConns = connInfo.ZITADEL.MaxOpenConns
maxIdleConns = connInfo.ZITADEL.MaxIdleConns
}
db.SetMaxOpenConns(int(maxConns))
db.SetMaxIdleConns(int(maxIdleConns))
db.SetConnMaxLifetime(c.MaxConnLifetime)
db.SetConnMaxIdleTime(c.MaxConnIdleTime)
return db, nil return client, nil
} }
func (c *Config) DatabaseName() string { func (c *Config) DatabaseName() string {

View File

@ -16,6 +16,7 @@ import (
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository" "github.com/zitadel/zitadel/internal/eventstore/repository"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
@ -124,11 +125,11 @@ type CRDB struct {
func NewCRDB(client *database.DB) *CRDB { func NewCRDB(client *database.DB) *CRDB {
switch client.Type() { switch client.Type() {
case "cockroach": case "cockroach":
awaitOpenTransactionsV1 = " AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '" + database.EventstorePusherAppName + "')" awaitOpenTransactionsV1 = " AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '" + dialect.EventstorePusherAppName + "')"
awaitOpenTransactionsV2 = ` AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '` + database.EventstorePusherAppName + `')` awaitOpenTransactionsV2 = ` AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = '` + dialect.EventstorePusherAppName + `')`
case "postgres": case "postgres":
awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + database.EventstorePusherAppName + `' AND state <> 'idle')` awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + dialect.EventstorePusherAppName + `' AND state <> 'idle')`
awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + database.EventstorePusherAppName + `' AND state <> 'idle')` awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = '` + dialect.EventstorePusherAppName + `' AND state <> 'idle')`
} }
return &CRDB{client} return &CRDB{client}

View File

@ -58,7 +58,7 @@ type Queries struct {
func StartQueries( func StartQueries(
ctx context.Context, ctx context.Context,
es *eventstore.Eventstore, es *eventstore.Eventstore,
sqlClient *database.DB, querySqlClient, projectionSqlClient *database.DB,
projections projection.Config, projections projection.Config,
defaults sd.SystemDefaults, defaults sd.SystemDefaults,
idpConfigEncryption, otpEncryption, keyEncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm, idpConfigEncryption, otpEncryption, keyEncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm,
@ -70,7 +70,7 @@ func StartQueries(
) (repo *Queries, err error) { ) (repo *Queries, err error) {
repo = &Queries{ repo = &Queries{
eventstore: es, eventstore: es,
client: sqlClient, client: querySqlClient,
DefaultLanguage: language.Und, DefaultLanguage: language.Und,
LoginTranslationFileContents: make(map[string][]byte), LoginTranslationFileContents: make(map[string][]byte),
NotificationTranslationFileContents: make(map[string][]byte), NotificationTranslationFileContents: make(map[string][]byte),
@ -104,7 +104,7 @@ func StartQueries(
repo.checkPermission = permissionCheck(repo) repo.checkPermission = permissionCheck(repo)
err = projection.Create(ctx, sqlClient, es, projections, keyEncryptionAlgorithm, certEncryptionAlgorithm, systemAPIUsers) err = projection.Create(ctx, projectionSqlClient, es, projections, keyEncryptionAlgorithm, certEncryptionAlgorithm, systemAPIUsers)
if err != nil { if err != nil {
return nil, err return nil, err
} }