diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 4176747ee6..326dcc69a8 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -110,24 +110,13 @@ PublicHostHeaders: # ZITADEL_PUBLICHOSTHEADERS WebAuthNName: ZITADEL # ZITADEL_WEBAUTHNNAME Database: - # ZITADEL manages three database connection pools. - # The *ConnRatio settings define the ratio of how many connections from - # MaxOpenConns and MaxIdleConns are used to push events and spool projections. - # Remaining connection are used for queries (search). - # Values may not be negative and the sum of the ratios must always be less than 1. - # For example this defaults define 15 MaxOpenConns overall. - # - 15*0.2=3 connections are allocated to the event pusher; - # - 15*0.135=2 connections are allocated to the projection spooler; - # - 15-(3+2)=10 connections are remaining for queries; - EventPushConnRatio: 0.2 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO - ProjectionSpoolerConnRatio: 0.135 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO # CockroachDB is the default database of ZITADEL cockroach: Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE - MaxOpenConns: 15 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS - MaxIdleConns: 12 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS + MaxOpenConns: 5 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS + MaxIdleConns: 2 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS diff --git a/cmd/initialise/init.go b/cmd/initialise/init.go index fba5098fa2..02fd481eab 100644 --- a/cmd/initialise/init.go +++ b/cmd/initialise/init.go @@ -9,7 +9,6 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" ) var ( @@ -79,7 +78,7 @@ func initialise(ctx context.Context, config database.Config, steps ...func(conte return err } - db, err := database.Connect(config, true, dialect.DBPurposeQuery) + db, err := database.Connect(config, true) if err != nil { return err } diff --git a/cmd/initialise/verify_zitadel.go b/cmd/initialise/verify_zitadel.go index a5ce1fd57c..1ae85a21fa 100644 --- a/cmd/initialise/verify_zitadel.go +++ b/cmd/initialise/verify_zitadel.go @@ -11,7 +11,6 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" es_v3 "github.com/zitadel/zitadel/internal/eventstore/v3" ) @@ -85,7 +84,7 @@ func VerifyZitadel(ctx context.Context, db *database.DB, config database.Config) func verifyZitadel(ctx context.Context, config database.Config) error { logging.WithFields("database", config.DatabaseName()).Info("verify zitadel") - db, err := database.Connect(config, false, dialect.DBPurposeQuery) + db, err := database.Connect(config, false) if err != nil { return err } diff --git a/cmd/key/key.go b/cmd/key/key.go index 2691932784..1dba8fd969 100644 --- a/cmd/key/key.go +++ b/cmd/key/key.go @@ -12,7 +12,6 @@ import ( "github.com/zitadel/zitadel/internal/crypto" cryptoDB "github.com/zitadel/zitadel/internal/crypto/database" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -124,7 +123,7 @@ func openFile(fileName string) (io.Reader, error) { } func keyStorage(config database.Config, masterKey string) (crypto.KeyStorage, error) { - db, err := database.Connect(config, false, dialect.DBPurposeQuery) + db, err := database.Connect(config, false) if err != nil { return nil, err } diff --git a/cmd/mirror/auth.go b/cmd/mirror/auth.go index df94708e71..0eba10d05f 100644 --- a/cmd/mirror/auth.go +++ b/cmd/mirror/auth.go @@ -12,7 +12,6 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" ) func authCmd() *cobra.Command { @@ -34,11 +33,11 @@ Only auth requests are mirrored`, } func copyAuth(ctx context.Context, config *Migration) { - sourceClient, err := database.Connect(config.Source, false, dialect.DBPurposeQuery) + sourceClient, err := database.Connect(config.Source, false) logging.OnError(err).Fatal("unable to connect to source database") defer sourceClient.Close() - destClient, err := database.Connect(config.Destination, false, dialect.DBPurposeEventPusher) + destClient, err := database.Connect(config.Destination, false) logging.OnError(err).Fatal("unable to connect to destination database") defer destClient.Close() diff --git a/cmd/mirror/event_store.go b/cmd/mirror/event_store.go index 23145bdc37..3825462126 100644 --- a/cmd/mirror/event_store.go +++ b/cmd/mirror/event_store.go @@ -14,7 +14,6 @@ import ( "github.com/zitadel/logging" db "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/v2/database" "github.com/zitadel/zitadel/internal/v2/eventstore" @@ -44,11 +43,11 @@ Migrate only copies events2 and unique constraints`, } func copyEventstore(ctx context.Context, config *Migration) { - sourceClient, err := db.Connect(config.Source, false, dialect.DBPurposeEventPusher) + sourceClient, err := db.Connect(config.Source, false) logging.OnError(err).Fatal("unable to connect to source database") defer sourceClient.Close() - destClient, err := db.Connect(config.Destination, false, dialect.DBPurposeEventPusher) + destClient, err := db.Connect(config.Destination, false) logging.OnError(err).Fatal("unable to connect to destination database") defer destClient.Close() diff --git a/cmd/mirror/projections.go b/cmd/mirror/projections.go index ae903d90c5..a4987a48f6 100644 --- a/cmd/mirror/projections.go +++ b/cmd/mirror/projections.go @@ -30,7 +30,6 @@ import ( "github.com/zitadel/zitadel/internal/config/systemdefaults" crypto_db "github.com/zitadel/zitadel/internal/crypto/database" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" @@ -106,7 +105,7 @@ func projections( ) { start := time.Now() - client, err := database.Connect(config.Destination, false, dialect.DBPurposeQuery) + client, err := database.Connect(config.Destination, false) logging.OnError(err).Fatal("unable to connect to database") keyStorage, err := crypto_db.NewKeyStorage(client, masterKey) @@ -119,9 +118,7 @@ func projections( logging.OnError(err).Fatal("unable create static storage") config.Eventstore.Querier = old_es.NewCRDB(client) - esPusherDBClient, err := database.Connect(config.Destination, false, dialect.DBPurposeEventPusher) - logging.OnError(err).Fatal("unable to connect eventstore push client") - config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) + config.Eventstore.Pusher = new_es.NewEventstore(client) es := eventstore.NewEventstore(config.Eventstore) esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{ MaxRetries: config.Eventstore.MaxRetries, diff --git a/cmd/mirror/system.go b/cmd/mirror/system.go index e16836aa8c..00b48eb491 100644 --- a/cmd/mirror/system.go +++ b/cmd/mirror/system.go @@ -12,7 +12,6 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" ) func systemCmd() *cobra.Command { @@ -34,11 +33,11 @@ Only keys and assets are mirrored`, } func copySystem(ctx context.Context, config *Migration) { - sourceClient, err := database.Connect(config.Source, false, dialect.DBPurposeQuery) + sourceClient, err := database.Connect(config.Source, false) logging.OnError(err).Fatal("unable to connect to source database") defer sourceClient.Close() - destClient, err := database.Connect(config.Destination, false, dialect.DBPurposeEventPusher) + destClient, err := database.Connect(config.Destination, false) logging.OnError(err).Fatal("unable to connect to destination database") defer destClient.Close() diff --git a/cmd/mirror/verify.go b/cmd/mirror/verify.go index 68c927d091..e1a507d9fe 100644 --- a/cmd/mirror/verify.go +++ b/cmd/mirror/verify.go @@ -13,7 +13,6 @@ import ( cryptoDatabase "github.com/zitadel/zitadel/internal/crypto/database" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/query/projection" ) @@ -37,11 +36,11 @@ var schemas = []string{ } func verifyMigration(ctx context.Context, config *Migration) { - sourceClient, err := database.Connect(config.Source, false, dialect.DBPurposeQuery) + sourceClient, err := database.Connect(config.Source, false) logging.OnError(err).Fatal("unable to connect to source database") defer sourceClient.Close() - destClient, err := database.Connect(config.Destination, false, dialect.DBPurposeEventPusher) + destClient, err := database.Connect(config.Destination, false) logging.OnError(err).Fatal("unable to connect to destination database") defer destClient.Close() diff --git a/cmd/setup/cleanup.go b/cmd/setup/cleanup.go index e9bc832d21..943ac164ea 100644 --- a/cmd/setup/cleanup.go +++ b/cmd/setup/cleanup.go @@ -8,7 +8,6 @@ import ( "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" new_es "github.com/zitadel/zitadel/internal/eventstore/v3" @@ -32,13 +31,11 @@ func Cleanup(config *Config) { logging.Info("cleanup started") - queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery) - logging.OnError(err).Fatal("unable to connect to database") - esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher) + dbClient, err := database.Connect(config.Database, false) logging.OnError(err).Fatal("unable to connect to database") - config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) - config.Eventstore.Querier = old_es.NewCRDB(queryDBClient) + config.Eventstore.Pusher = new_es.NewEventstore(dbClient) + config.Eventstore.Querier = old_es.NewCRDB(dbClient) es := eventstore.NewEventstore(config.Eventstore) step, err := migration.LastStuckStep(ctx, es) diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 33dba00602..cd9d3d9673 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -26,7 +26,6 @@ import ( "github.com/zitadel/zitadel/internal/command" cryptoDB "github.com/zitadel/zitadel/internal/crypto/database" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" @@ -102,26 +101,22 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) i18n.MustLoadSupportedLanguagesFromDir() - queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery) - logging.OnError(err).Fatal("unable to connect to database") - esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher) - logging.OnError(err).Fatal("unable to connect to database") - projectionDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeProjectionSpooler) + dbClient, err := database.Connect(config.Database, false) logging.OnError(err).Fatal("unable to connect to database") - config.Eventstore.Querier = old_es.NewCRDB(queryDBClient) - esV3 := new_es.NewEventstore(esPusherDBClient) + config.Eventstore.Querier = old_es.NewCRDB(dbClient) + esV3 := new_es.NewEventstore(dbClient) config.Eventstore.Pusher = esV3 config.Eventstore.Searcher = esV3 eventstoreClient := eventstore.NewEventstore(config.Eventstore) logging.OnError(err).Fatal("unable to start eventstore") - eventstoreV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(queryDBClient, &es_v4_pg.Config{ + eventstoreV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(dbClient, &es_v4_pg.Config{ MaxRetries: config.Eventstore.MaxRetries, })) - steps.s1ProjectionTable = &ProjectionTable{dbClient: queryDBClient.DB} - steps.s2AssetsTable = &AssetTable{dbClient: queryDBClient.DB} + steps.s1ProjectionTable = &ProjectionTable{dbClient: dbClient.DB} + steps.s2AssetsTable = &AssetTable{dbClient: dbClient.DB} steps.FirstInstance.Skip = config.ForMirror || steps.FirstInstance.Skip steps.FirstInstance.instanceSetup = config.DefaultInstance @@ -129,7 +124,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.FirstInstance.smtpEncryptionKey = config.EncryptionKeys.SMTP steps.FirstInstance.oidcEncryptionKey = config.EncryptionKeys.OIDC steps.FirstInstance.masterKey = masterKey - steps.FirstInstance.db = queryDBClient + steps.FirstInstance.db = dbClient steps.FirstInstance.es = eventstoreClient steps.FirstInstance.defaults = config.SystemDefaults steps.FirstInstance.zitadelRoles = config.InternalAuthZ.RolePermissionMappings @@ -137,46 +132,46 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.FirstInstance.externalSecure = config.ExternalSecure steps.FirstInstance.externalPort = config.ExternalPort - steps.s5LastFailed = &LastFailed{dbClient: queryDBClient.DB} - steps.s6OwnerRemoveColumns = &OwnerRemoveColumns{dbClient: queryDBClient.DB} - steps.s7LogstoreTables = &LogstoreTables{dbClient: queryDBClient.DB, username: config.Database.Username(), dbType: config.Database.Type()} - steps.s8AuthTokens = &AuthTokenIndexes{dbClient: queryDBClient} - steps.CorrectCreationDate.dbClient = esPusherDBClient - steps.s12AddOTPColumns = &AddOTPColumns{dbClient: queryDBClient} - steps.s13FixQuotaProjection = &FixQuotaConstraints{dbClient: queryDBClient} - steps.s14NewEventsTable = &NewEventsTable{dbClient: esPusherDBClient} - steps.s15CurrentStates = &CurrentProjectionState{dbClient: queryDBClient} - steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: queryDBClient} - steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: queryDBClient} - steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: queryDBClient} - steps.s19AddCurrentStatesIndex = &AddCurrentSequencesIndex{dbClient: queryDBClient} - steps.s20AddByUserSessionIndex = &AddByUserIndexToSession{dbClient: queryDBClient} - steps.s21AddBlockFieldToLimits = &AddBlockFieldToLimits{dbClient: queryDBClient} - steps.s22ActiveInstancesIndex = &ActiveInstanceEvents{dbClient: queryDBClient} - steps.s23CorrectGlobalUniqueConstraints = &CorrectGlobalUniqueConstraints{dbClient: esPusherDBClient} - steps.s24AddActorToAuthTokens = &AddActorToAuthTokens{dbClient: queryDBClient} - steps.s25User11AddLowerFieldsToVerifiedEmail = &User11AddLowerFieldsToVerifiedEmail{dbClient: esPusherDBClient} - steps.s26AuthUsers3 = &AuthUsers3{dbClient: esPusherDBClient} - steps.s27IDPTemplate6SAMLNameIDFormat = &IDPTemplate6SAMLNameIDFormat{dbClient: esPusherDBClient} - steps.s28AddFieldTable = &AddFieldTable{dbClient: esPusherDBClient} + steps.s5LastFailed = &LastFailed{dbClient: dbClient.DB} + steps.s6OwnerRemoveColumns = &OwnerRemoveColumns{dbClient: dbClient.DB} + steps.s7LogstoreTables = &LogstoreTables{dbClient: dbClient.DB, username: config.Database.Username(), dbType: config.Database.Type()} + steps.s8AuthTokens = &AuthTokenIndexes{dbClient: dbClient} + steps.CorrectCreationDate.dbClient = dbClient + steps.s12AddOTPColumns = &AddOTPColumns{dbClient: dbClient} + steps.s13FixQuotaProjection = &FixQuotaConstraints{dbClient: dbClient} + steps.s14NewEventsTable = &NewEventsTable{dbClient: dbClient} + steps.s15CurrentStates = &CurrentProjectionState{dbClient: dbClient} + steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: dbClient} + steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: dbClient} + steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: dbClient} + steps.s19AddCurrentStatesIndex = &AddCurrentSequencesIndex{dbClient: dbClient} + steps.s20AddByUserSessionIndex = &AddByUserIndexToSession{dbClient: dbClient} + steps.s21AddBlockFieldToLimits = &AddBlockFieldToLimits{dbClient: dbClient} + steps.s22ActiveInstancesIndex = &ActiveInstanceEvents{dbClient: dbClient} + steps.s23CorrectGlobalUniqueConstraints = &CorrectGlobalUniqueConstraints{dbClient: dbClient} + steps.s24AddActorToAuthTokens = &AddActorToAuthTokens{dbClient: dbClient} + steps.s25User11AddLowerFieldsToVerifiedEmail = &User11AddLowerFieldsToVerifiedEmail{dbClient: dbClient} + steps.s26AuthUsers3 = &AuthUsers3{dbClient: dbClient} + steps.s27IDPTemplate6SAMLNameIDFormat = &IDPTemplate6SAMLNameIDFormat{dbClient: dbClient} + steps.s28AddFieldTable = &AddFieldTable{dbClient: dbClient} steps.s29FillFieldsForProjectGrant = &FillFieldsForProjectGrant{eventstore: eventstoreClient} steps.s30FillFieldsForOrgDomainVerified = &FillFieldsForOrgDomainVerified{eventstore: eventstoreClient} - steps.s31AddAggregateIndexToFields = &AddAggregateIndexToFields{dbClient: esPusherDBClient} - steps.s32AddAuthSessionID = &AddAuthSessionID{dbClient: esPusherDBClient} - steps.s33SMSConfigs3TwilioAddVerifyServiceSid = &SMSConfigs3TwilioAddVerifyServiceSid{dbClient: esPusherDBClient} - steps.s34AddCacheSchema = &AddCacheSchema{dbClient: queryDBClient} - steps.s35AddPositionToIndexEsWm = &AddPositionToIndexEsWm{dbClient: esPusherDBClient} - steps.s36FillV2Milestones = &FillV3Milestones{dbClient: queryDBClient, eventstore: eventstoreClient} - steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: esPusherDBClient} - steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: esPusherDBClient, esClient: eventstoreClient} - steps.s40InitPushFunc = &InitPushFunc{dbClient: esPusherDBClient} - steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: esPusherDBClient} - steps.s43CreateFieldsDomainIndex = &CreateFieldsDomainIndex{dbClient: queryDBClient} - steps.s44ReplaceCurrentSequencesIndex = &ReplaceCurrentSequencesIndex{dbClient: esPusherDBClient} + steps.s31AddAggregateIndexToFields = &AddAggregateIndexToFields{dbClient: dbClient} + steps.s32AddAuthSessionID = &AddAuthSessionID{dbClient: dbClient} + steps.s33SMSConfigs3TwilioAddVerifyServiceSid = &SMSConfigs3TwilioAddVerifyServiceSid{dbClient: dbClient} + steps.s34AddCacheSchema = &AddCacheSchema{dbClient: dbClient} + steps.s35AddPositionToIndexEsWm = &AddPositionToIndexEsWm{dbClient: dbClient} + steps.s36FillV2Milestones = &FillV3Milestones{dbClient: dbClient, eventstore: eventstoreClient} + steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: dbClient} + steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient, esClient: eventstoreClient} + steps.s40InitPushFunc = &InitPushFunc{dbClient: dbClient} + steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: dbClient} + steps.s43CreateFieldsDomainIndex = &CreateFieldsDomainIndex{dbClient: dbClient} + steps.s44ReplaceCurrentSequencesIndex = &ReplaceCurrentSequencesIndex{dbClient: dbClient} steps.s45CorrectProjectOwners = &CorrectProjectOwners{eventstore: eventstoreClient} - steps.s46InitPermissionFunctions = &InitPermissionFunctions{eventstoreClient: esPusherDBClient} + steps.s46InitPermissionFunctions = &InitPermissionFunctions{eventstoreClient: dbClient} - err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil) + err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") repeatableSteps := []migration.RepeatableMigration{ @@ -264,8 +259,8 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) ctx, eventstoreClient, eventstoreV4, - queryDBClient, - projectionDBClient, + dbClient, + dbClient, masterKey, config, ) diff --git a/cmd/start/start.go b/cmd/start/start.go index db9c9afc54..4091213d2d 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -77,7 +77,6 @@ import ( "github.com/zitadel/zitadel/internal/crypto" cryptoDB "github.com/zitadel/zitadel/internal/crypto/database" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" @@ -150,20 +149,12 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server i18n.MustLoadSupportedLanguagesFromDir() - queryDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeQuery) + dbClient, err := database.Connect(config.Database, false) if err != nil { return fmt.Errorf("cannot start DB client for queries: %w", err) } - esPusherDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeEventPusher) - if err != nil { - return fmt.Errorf("cannot start client for event store pusher: %w", err) - } - projectionDBClient, err := database.Connect(config.Database, false, dialect.DBPurposeProjectionSpooler) - if err != nil { - return fmt.Errorf("cannot start client for projection spooler: %w", err) - } - keyStorage, err := cryptoDB.NewKeyStorage(queryDBClient, masterKey) + keyStorage, err := cryptoDB.NewKeyStorage(dbClient, masterKey) if err != nil { return fmt.Errorf("cannot start key storage: %w", err) } @@ -172,16 +163,16 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server return err } - config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient) - config.Eventstore.Searcher = new_es.NewEventstore(queryDBClient) - config.Eventstore.Querier = old_es.NewCRDB(queryDBClient) + config.Eventstore.Pusher = new_es.NewEventstore(dbClient) + config.Eventstore.Searcher = new_es.NewEventstore(dbClient) + config.Eventstore.Querier = old_es.NewCRDB(dbClient) eventstoreClient := eventstore.NewEventstore(config.Eventstore) - eventstoreV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(queryDBClient, &es_v4_pg.Config{ + eventstoreV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(dbClient, &es_v4_pg.Config{ MaxRetries: config.Eventstore.MaxRetries, })) sessionTokenVerifier := internal_authz.SessionTokenVerifier(keys.OIDC) - cacheConnectors, err := connector.StartConnectors(config.Caches, queryDBClient) + cacheConnectors, err := connector.StartConnectors(config.Caches, dbClient) if err != nil { return fmt.Errorf("unable to start caches: %w", err) } @@ -190,8 +181,8 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server ctx, eventstoreClient, eventstoreV4.Querier, - queryDBClient, - projectionDBClient, + dbClient, + dbClient, cacheConnectors, config.Projections, config.SystemDefaults, @@ -215,7 +206,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server return fmt.Errorf("cannot start queries: %w", err) } - authZRepo, err := authz.Start(queries, eventstoreClient, queryDBClient, keys.OIDC, config.ExternalSecure) + authZRepo, err := authz.Start(queries, eventstoreClient, dbClient, keys.OIDC, config.ExternalSecure) if err != nil { return fmt.Errorf("error starting authz repo: %w", err) } @@ -223,7 +214,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server return internal_authz.CheckPermission(ctx, authZRepo, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID) } - storage, err := config.AssetStorage.NewStorage(queryDBClient.DB) + storage, err := config.AssetStorage.NewStorage(dbClient.DB) if err != nil { return fmt.Errorf("cannot start asset storage client: %w", err) } @@ -268,7 +259,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server if err != nil { return err } - actionsExecutionDBEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, config.Quotas.Execution, execution.NewDatabaseLogStorage(queryDBClient, commands, queries)) + actionsExecutionDBEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, config.Quotas.Execution, execution.NewDatabaseLogStorage(dbClient, commands, queries)) if err != nil { return err } @@ -297,7 +288,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server keys.SMS, keys.OIDC, config.OIDC.DefaultBackChannelLogoutLifetime, - queryDBClient, + dbClient, ) notification.Start(ctx) @@ -313,7 +304,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server commands, queries, eventstoreClient, - queryDBClient, + dbClient, config, storage, authZRepo, @@ -333,7 +324,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server if server != nil { server <- &Server{ Config: config, - DB: queryDBClient, + DB: dbClient, KeyStorage: keyStorage, Keys: keys, Eventstore: eventstoreClient, diff --git a/internal/database/cockroach/crdb.go b/internal/database/cockroach/crdb.go index cc89be8687..48e912b5f5 100644 --- a/internal/database/cockroach/crdb.go +++ b/internal/database/cockroach/crdb.go @@ -3,7 +3,6 @@ package cockroach import ( "context" "database/sql" - "fmt" "strconv" "strings" "time" @@ -14,7 +13,6 @@ import ( "github.com/mitchellh/mapstructure" "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database/dialect" ) @@ -74,19 +72,16 @@ func (_ *Config) Decode(configs []interface{}) (dialect.Connector, error) { return connector, nil } -func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, *pgxpool.Pool, error) { +func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { dialect.RegisterAfterConnect(func(ctx context.Context, c *pgx.Conn) error { // CockroachDB by default does not allow multiple modifications of the same table using ON CONFLICT // This is needed to fill the fields table of the eventstore during eventstore.Push. _, err := c.Exec(ctx, "SET enable_multiple_modifications_of_table = on") return err }) - connConfig, err := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns, pusherRatio, spoolerRatio, purpose) - if err != nil { - return nil, nil, err - } + connConfig := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns) - config, err := pgxpool.ParseConfig(c.String(useAdmin, purpose.AppName())) + config, err := pgxpool.ParseConfig(c.String(useAdmin)) if err != nil { return nil, nil, err } @@ -102,18 +97,6 @@ func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpo } } - // For the pusher we set the app name with the instance ID - if purpose == dialect.DBPurposeEventPusher { - config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { - return setAppNameWithID(ctx, conn, purpose, authz.GetInstance(ctx).InstanceID()) - } - config.AfterRelease = func(conn *pgx.Conn) bool { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - return setAppNameWithID(ctx, conn, purpose, "IDLE") - } - } - if connConfig.MaxOpenConns != 0 { config.MaxConns = int32(connConfig.MaxOpenConns) } @@ -195,7 +178,7 @@ func (c *Config) checkSSL(user User) { } } -func (c Config) String(useAdmin bool, appName string) string { +func (c Config) String(useAdmin bool) string { user := c.User if useAdmin { user = c.Admin.User @@ -206,7 +189,7 @@ func (c Config) String(useAdmin bool, appName string) string { "port=" + strconv.Itoa(int(c.Port)), "user=" + user.Username, "dbname=" + c.Database, - "application_name=" + appName, + "application_name=" + dialect.DefaultAppName, "sslmode=" + user.SSL.Mode, } if c.Options != "" { @@ -232,11 +215,3 @@ func (c Config) String(useAdmin bool, appName string) string { return strings.Join(fields, " ") } - -func setAppNameWithID(ctx context.Context, conn *pgx.Conn, purpose dialect.DBPurpose, id string) bool { - // needs to be set like this because psql complains about parameters in the SET statement - query := fmt.Sprintf("SET application_name = '%s_%s'", purpose.AppName(), id) - _, err := conn.Exec(ctx, query) - logging.OnError(err).Warn("failed to set application name") - return err == nil -} diff --git a/internal/database/database.go b/internal/database/database.go index b86a9f247c..e254edadc1 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -65,10 +65,8 @@ func CloseTransaction(tx Tx, err error) error { } type Config struct { - Dialects map[string]interface{} `mapstructure:",remain"` - EventPushConnRatio float64 - ProjectionSpoolerConnRatio float64 - connector dialect.Connector + Dialects map[string]interface{} `mapstructure:",remain"` + connector dialect.Connector } func (c *Config) SetConnector(connector dialect.Connector) { @@ -134,8 +132,8 @@ func QueryJSONObject[T any](ctx context.Context, db *DB, query string, args ...a return obj, nil } -func Connect(config Config, useAdmin bool, purpose dialect.DBPurpose) (*DB, error) { - client, pool, err := config.connector.Connect(useAdmin, config.EventPushConnRatio, config.ProjectionSpoolerConnRatio, purpose) +func Connect(config Config, useAdmin bool) (*DB, error) { + client, pool, err := config.connector.Connect(useAdmin) if err != nil { return nil, err } diff --git a/internal/database/dialect/config.go b/internal/database/dialect/config.go index 8ca4e7f748..71fb477ea1 100644 --- a/internal/database/dialect/config.go +++ b/internal/database/dialect/config.go @@ -26,36 +26,11 @@ type Matcher interface { } const ( - QueryAppName = "zitadel_queries" - EventstorePusherAppName = "zitadel_es_pusher" - ProjectionSpoolerAppName = "zitadel_projection_spooler" - defaultAppName = "zitadel" + DefaultAppName = "zitadel" ) -// DBPurpose is what the resulting connection pool is used for. -type DBPurpose int - -const ( - DBPurposeQuery DBPurpose = iota - DBPurposeEventPusher - DBPurposeProjectionSpooler -) - -func (p DBPurpose) AppName() string { - switch p { - case DBPurposeQuery: - return QueryAppName - case DBPurposeEventPusher: - return EventstorePusherAppName - case DBPurposeProjectionSpooler: - return ProjectionSpoolerAppName - default: - return defaultAppName - } -} - type Connector interface { - Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose DBPurpose) (*sql.DB, *pgxpool.Pool, error) + Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) Password() string Database } diff --git a/internal/database/dialect/config_test.go b/internal/database/dialect/config_test.go deleted file mode 100644 index d7297f8b67..0000000000 --- a/internal/database/dialect/config_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package dialect - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestDBPurpose_AppName(t *testing.T) { - tests := []struct { - p DBPurpose - want string - }{ - { - p: DBPurposeQuery, - want: QueryAppName, - }, - { - p: DBPurposeEventPusher, - want: EventstorePusherAppName, - }, - { - p: DBPurposeProjectionSpooler, - want: ProjectionSpoolerAppName, - }, - { - p: 99, - want: defaultAppName, - }, - } - for _, tt := range tests { - t.Run(tt.want, func(t *testing.T) { - assert.Equal(t, tt.want, tt.p.AppName()) - }) - } -} diff --git a/internal/database/dialect/connections.go b/internal/database/dialect/connections.go index f957870df0..13a4d657c3 100644 --- a/internal/database/dialect/connections.go +++ b/internal/database/dialect/connections.go @@ -3,7 +3,6 @@ package dialect import ( "context" "errors" - "fmt" "reflect" "github.com/jackc/pgx/v5" @@ -11,11 +10,8 @@ import ( ) var ( - ErrNegativeRatio = errors.New("ratio cannot be negative") - ErrHighSumRatio = errors.New("sum of pusher and projection ratios must be < 1") ErrIllegalMaxOpenConns = errors.New("MaxOpenConns of the database must be higher than 3 or 0 for unlimited") ErrIllegalMaxIdleConns = errors.New("MaxIdleConns of the database must be higher than 3 or 0 for unlimited") - ErrInvalidPurpose = errors.New("DBPurpose out of range") ) // ConnectionConfig defines the Max Open and Idle connections for a DB connection pool. @@ -25,28 +21,6 @@ type ConnectionConfig struct { AfterConnect []func(ctx context.Context, c *pgx.Conn) error } -// takeRatio of MaxOpenConns and MaxIdleConns from config and returns -// a new ConnectionConfig with the resulting values. -func (c *ConnectionConfig) takeRatio(ratio float64) (*ConnectionConfig, error) { - if ratio < 0 { - return nil, ErrNegativeRatio - } - - out := &ConnectionConfig{ - MaxOpenConns: uint32(ratio * float64(c.MaxOpenConns)), - MaxIdleConns: uint32(ratio * float64(c.MaxIdleConns)), - AfterConnect: c.AfterConnect, - } - if c.MaxOpenConns != 0 && out.MaxOpenConns < 1 && ratio > 0 { - out.MaxOpenConns = 1 - } - if c.MaxIdleConns != 0 && out.MaxIdleConns < 1 && ratio > 0 { - out.MaxIdleConns = 1 - } - - return out, nil -} - var afterConnectFuncs []func(ctx context.Context, c *pgx.Conn) error func RegisterAfterConnect(f func(ctx context.Context, c *pgx.Conn) error) { @@ -82,48 +56,10 @@ func RegisterDefaultPgTypeVariants[T any](m *pgtype.Map, name, arrayName string) // // openConns and idleConns must be at least 3 or 0, which means no limit. // The pusherRatio and spoolerRatio must be between 0 and 1. -func NewConnectionConfig(openConns, idleConns uint32, pusherRatio, projectionRatio float64, purpose DBPurpose) (*ConnectionConfig, error) { - if openConns != 0 && openConns < 3 { - return nil, ErrIllegalMaxOpenConns - } - if idleConns != 0 && idleConns < 3 { - return nil, ErrIllegalMaxIdleConns - } - if pusherRatio+projectionRatio >= 1 { - return nil, ErrHighSumRatio - } - - queryConfig := &ConnectionConfig{ +func NewConnectionConfig(openConns, idleConns uint32) *ConnectionConfig { + return &ConnectionConfig{ MaxOpenConns: openConns, MaxIdleConns: idleConns, AfterConnect: afterConnectFuncs, } - pusherConfig, err := queryConfig.takeRatio(pusherRatio) - if err != nil { - return nil, fmt.Errorf("event pusher: %w", err) - } - - spoolerConfig, err := queryConfig.takeRatio(projectionRatio) - if err != nil { - return nil, fmt.Errorf("projection spooler: %w", err) - } - - // subtract the claimed amount - if queryConfig.MaxOpenConns > 0 { - queryConfig.MaxOpenConns -= pusherConfig.MaxOpenConns + spoolerConfig.MaxOpenConns - } - if queryConfig.MaxIdleConns > 0 { - queryConfig.MaxIdleConns -= pusherConfig.MaxIdleConns + spoolerConfig.MaxIdleConns - } - - switch purpose { - case DBPurposeQuery: - return queryConfig, nil - case DBPurposeEventPusher: - return pusherConfig, nil - case DBPurposeProjectionSpooler: - return spoolerConfig, nil - default: - return nil, fmt.Errorf("%w: %v", ErrInvalidPurpose, purpose) - } } diff --git a/internal/database/dialect/connections_test.go b/internal/database/dialect/connections_test.go deleted file mode 100644 index 6256658d0a..0000000000 --- a/internal/database/dialect/connections_test.go +++ /dev/null @@ -1,252 +0,0 @@ -package dialect - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestConnectionConfig_takeRatio(t *testing.T) { - type fields struct { - MaxOpenConns uint32 - MaxIdleConns uint32 - } - tests := []struct { - name string - fields fields - ratio float64 - wantOut *ConnectionConfig - wantErr error - }{ - { - name: "ratio less than 0 error", - ratio: -0.1, - wantErr: ErrNegativeRatio, - }, - { - name: "zero values", - fields: fields{ - MaxOpenConns: 0, - MaxIdleConns: 0, - }, - ratio: 0, - wantOut: &ConnectionConfig{ - MaxOpenConns: 0, - MaxIdleConns: 0, - }, - }, - { - name: "max conns, ratio 0", - fields: fields{ - MaxOpenConns: 10, - MaxIdleConns: 5, - }, - ratio: 0, - wantOut: &ConnectionConfig{ - MaxOpenConns: 0, - MaxIdleConns: 0, - }, - }, - { - name: "half ratio", - fields: fields{ - MaxOpenConns: 10, - MaxIdleConns: 5, - }, - ratio: 0.5, - wantOut: &ConnectionConfig{ - MaxOpenConns: 5, - MaxIdleConns: 2, - }, - }, - { - name: "minimal 1", - fields: fields{ - MaxOpenConns: 2, - MaxIdleConns: 2, - }, - ratio: 0.1, - wantOut: &ConnectionConfig{ - MaxOpenConns: 1, - MaxIdleConns: 1, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - in := &ConnectionConfig{ - MaxOpenConns: tt.fields.MaxOpenConns, - MaxIdleConns: tt.fields.MaxIdleConns, - } - got, err := in.takeRatio(tt.ratio) - require.ErrorIs(t, err, tt.wantErr) - assert.Equal(t, tt.wantOut, got) - }) - } -} - -func TestNewConnectionConfig(t *testing.T) { - type args struct { - openConns uint32 - idleConns uint32 - pusherRatio float64 - projectionRatio float64 - purpose DBPurpose - } - tests := []struct { - name string - args args - want *ConnectionConfig - wantErr error - }{ - { - name: "illegal open conns error", - args: args{ - openConns: 2, - idleConns: 3, - }, - wantErr: ErrIllegalMaxOpenConns, - }, - { - name: "illegal idle conns error", - args: args{ - openConns: 3, - idleConns: 2, - }, - wantErr: ErrIllegalMaxIdleConns, - }, - { - name: "high ration sum error", - args: args{ - openConns: 3, - idleConns: 3, - pusherRatio: 0.5, - projectionRatio: 0.5, - }, - wantErr: ErrHighSumRatio, - }, - { - name: "illegal pusher ratio error", - args: args{ - openConns: 3, - idleConns: 3, - pusherRatio: -0.1, - projectionRatio: 0.5, - }, - wantErr: ErrNegativeRatio, - }, - { - name: "illegal projection ratio error", - args: args{ - openConns: 3, - idleConns: 3, - pusherRatio: 0.5, - projectionRatio: -0.1, - }, - wantErr: ErrNegativeRatio, - }, - { - name: "invalid purpose error", - args: args{ - openConns: 3, - idleConns: 3, - pusherRatio: 0.4, - projectionRatio: 0.4, - purpose: 99, - }, - wantErr: ErrInvalidPurpose, - }, - { - name: "min values, query purpose", - args: args{ - openConns: 3, - idleConns: 3, - pusherRatio: 0.2, - projectionRatio: 0.2, - purpose: DBPurposeQuery, - }, - want: &ConnectionConfig{ - MaxOpenConns: 1, - MaxIdleConns: 1, - }, - }, - { - name: "min values, pusher purpose", - args: args{ - openConns: 3, - idleConns: 3, - pusherRatio: 0.2, - projectionRatio: 0.2, - purpose: DBPurposeEventPusher, - }, - want: &ConnectionConfig{ - MaxOpenConns: 1, - MaxIdleConns: 1, - }, - }, - { - name: "min values, projection purpose", - args: args{ - openConns: 3, - idleConns: 3, - pusherRatio: 0.2, - projectionRatio: 0.2, - purpose: DBPurposeProjectionSpooler, - }, - want: &ConnectionConfig{ - MaxOpenConns: 1, - MaxIdleConns: 1, - }, - }, - { - name: "high values, query purpose", - args: args{ - openConns: 10, - idleConns: 5, - pusherRatio: 0.2, - projectionRatio: 0.2, - purpose: DBPurposeQuery, - }, - want: &ConnectionConfig{ - MaxOpenConns: 6, - MaxIdleConns: 3, - }, - }, - { - name: "high values, pusher purpose", - args: args{ - openConns: 10, - idleConns: 5, - pusherRatio: 0.2, - projectionRatio: 0.2, - purpose: DBPurposeEventPusher, - }, - want: &ConnectionConfig{ - MaxOpenConns: 2, - MaxIdleConns: 1, - }, - }, - { - name: "high values, projection purpose", - args: args{ - openConns: 10, - idleConns: 5, - pusherRatio: 0.2, - projectionRatio: 0.2, - purpose: DBPurposeProjectionSpooler, - }, - want: &ConnectionConfig{ - MaxOpenConns: 2, - MaxIdleConns: 1, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := NewConnectionConfig(tt.args.openConns, tt.args.idleConns, tt.args.pusherRatio, tt.args.projectionRatio, tt.args.purpose) - require.ErrorIs(t, err, tt.wantErr) - assert.Equal(t, tt.want, got) - }) - } -} diff --git a/internal/database/postgres/pg.go b/internal/database/postgres/pg.go index c12e122437..5f4d9a6c9b 100644 --- a/internal/database/postgres/pg.go +++ b/internal/database/postgres/pg.go @@ -3,7 +3,6 @@ package postgres import ( "context" "database/sql" - "fmt" "strconv" "strings" "time" @@ -14,7 +13,6 @@ import ( "github.com/mitchellh/mapstructure" "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database/dialect" ) @@ -75,13 +73,10 @@ func (_ *Config) Decode(configs []interface{}) (dialect.Connector, error) { return connector, nil } -func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, *pgxpool.Pool, error) { - connConfig, err := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns, pusherRatio, spoolerRatio, purpose) - if err != nil { - return nil, nil, err - } +func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { + connConfig := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns) - config, err := pgxpool.ParseConfig(c.String(useAdmin, purpose.AppName())) + config, err := pgxpool.ParseConfig(c.String(useAdmin)) if err != nil { return nil, nil, err } @@ -95,18 +90,6 @@ func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpo return nil } - // For the pusher we set the app name with the instance ID - if purpose == dialect.DBPurposeEventPusher { - config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { - return setAppNameWithID(ctx, conn, purpose, authz.GetInstance(ctx).InstanceID()) - } - config.AfterRelease = func(conn *pgx.Conn) bool { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - return setAppNameWithID(ctx, conn, purpose, "IDLE") - } - } - if connConfig.MaxOpenConns != 0 { config.MaxConns = int32(connConfig.MaxOpenConns) } @@ -191,7 +174,7 @@ func (s *Config) checkSSL(user User) { } } -func (c Config) String(useAdmin bool, appName string) string { +func (c Config) String(useAdmin bool) string { user := c.User if useAdmin { user = c.Admin.User @@ -201,7 +184,7 @@ func (c Config) String(useAdmin bool, appName string) string { "host=" + c.Host, "port=" + strconv.Itoa(int(c.Port)), "user=" + user.Username, - "application_name=" + appName, + "application_name=" + dialect.DefaultAppName, "sslmode=" + user.SSL.Mode, } if c.Options != "" { @@ -233,11 +216,3 @@ func (c Config) String(useAdmin bool, appName string) string { return strings.Join(fields, " ") } - -func setAppNameWithID(ctx context.Context, conn *pgx.Conn, purpose dialect.DBPurpose, id string) bool { - // needs to be set like this because psql complains about parameters in the SET statement - query := fmt.Sprintf("SET application_name = '%s_%s'", purpose.AppName(), id) - _, err := conn.Exec(ctx, query) - logging.OnError(err).Warn("failed to set application name") - return err == nil -} diff --git a/internal/eventstore/repository/sql/query.go b/internal/eventstore/repository/sql/query.go index b93e663b17..4e1cc87aff 100644 --- a/internal/eventstore/repository/sql/query.go +++ b/internal/eventstore/repository/sql/query.go @@ -309,7 +309,7 @@ func prepareConditions(criteria querier, query *repository.SearchQuery, useV1 bo } for i := range instanceIDs { - instanceIDs[i] = dialect.DBPurposeEventPusher.AppName() + "_" + instanceIDs[i] + instanceIDs[i] = "zitadel_es_pusher_" + instanceIDs[i] } clauses += awaitOpenTransactions(useV1) diff --git a/internal/eventstore/v3/push.go b/internal/eventstore/v3/push.go index fb597021e2..6497b96ed8 100644 --- a/internal/eventstore/v3/push.go +++ b/internal/eventstore/v3/push.go @@ -4,9 +4,11 @@ import ( "context" "database/sql" _ "embed" + "fmt" "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/telemetry/tracing" @@ -55,6 +57,11 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context }() } + _, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL application_name = '%s'", fmt.Sprintf("zitadel_es_pusher_%s", authz.GetInstance(ctx).InstanceID()))) + if err != nil { + return nil, err + } + events, err := writeEvents(ctx, tx, commands) if err != nil { return nil, err