From 21167a4bba8b74720422f2b09ff6753ddb24b67d Mon Sep 17 00:00:00 2001 From: Stefan Benz <46600784+stebenz@users.noreply.github.com> Date: Wed, 7 May 2025 16:26:53 +0200 Subject: [PATCH] fix: add current state for execution handler into setup (#9863) # Which Problems Are Solved The execution handler projection handles all events to check if an execution has to be provided to the worker to execute. In this logic all events would be processed from the beginning which is not necessary. # How the Problems Are Solved Add the current state to the execution handler projection, to avoid processing all existing events. # Additional Changes Add custom configuration to the default, so that the transactions are limited to some events. # Additional Context None --- cmd/defaults.yaml | 3 ++- cmd/setup/38.go | 1 - cmd/setup/55.go | 27 +++++++++++++++++++++++++++ cmd/setup/55.sql | 22 ++++++++++++++++++++++ cmd/setup/config.go | 1 + cmd/setup/setup.go | 4 +++- cmd/start/start.go | 2 +- 7 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 cmd/setup/55.go create mode 100644 cmd/setup/55.sql diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 0d71b4d817..c13d5337b1 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -387,7 +387,8 @@ Projections: org_domain_verified_fields: TransactionDuration: 0s BulkLimit: 2000 - + execution_handler: + BulkLimit: 10 # The Notifications projection is used for preparing the messages (emails and SMS) to be sent to users Notifications: # As notification projections don't result in database statements, retries don't have an effect diff --git a/cmd/setup/38.go b/cmd/setup/38.go index 0a102c9d12..810510bfdd 100644 --- a/cmd/setup/38.go +++ b/cmd/setup/38.go @@ -15,7 +15,6 @@ var ( type BackChannelLogoutNotificationStart struct { dbClient *database.DB - esClient *eventstore.Eventstore } func (mig *BackChannelLogoutNotificationStart) Execute(ctx context.Context, e eventstore.Event) error { diff --git a/cmd/setup/55.go b/cmd/setup/55.go new file mode 100644 index 0000000000..19083515e5 --- /dev/null +++ b/cmd/setup/55.go @@ -0,0 +1,27 @@ +package setup + +import ( + "context" + _ "embed" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" +) + +var ( + //go:embed 55.sql + executionHandlerCurrentState string +) + +type ExecutionHandlerStart struct { + dbClient *database.DB +} + +func (mig *ExecutionHandlerStart) Execute(ctx context.Context, e eventstore.Event) error { + _, err := mig.dbClient.ExecContext(ctx, executionHandlerCurrentState, e.Sequence(), e.CreatedAt(), e.Position()) + return err +} + +func (mig *ExecutionHandlerStart) String() string { + return "55_execution_handler_start" +} diff --git a/cmd/setup/55.sql b/cmd/setup/55.sql new file mode 100644 index 0000000000..60c45d5f94 --- /dev/null +++ b/cmd/setup/55.sql @@ -0,0 +1,22 @@ +INSERT INTO projections.current_states AS cs ( instance_id + , projection_name + , last_updated + , sequence + , event_date + , position + , filter_offset) +SELECT instance_id + , 'projections.execution_handler' + , now() + , $1 + , $2 + , $3 + , 0 +FROM eventstore.events2 AS e +WHERE aggregate_type = 'instance' + AND event_type = 'instance.added' +ON CONFLICT (instance_id, projection_name) DO UPDATE SET last_updated = EXCLUDED.last_updated, + sequence = EXCLUDED.sequence, + event_date = EXCLUDED.event_date, + position = EXCLUDED.position, + filter_offset = EXCLUDED.filter_offset; \ No newline at end of file diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 127f9d7599..5e5c842b14 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -151,6 +151,7 @@ type Steps struct { s52IDPTemplate6LDAP2 *IDPTemplate6LDAP2 s53InitPermittedOrgsFunction *InitPermittedOrgsFunction53 s54InstancePositionIndex *InstancePositionIndex + s55ExecutionHandlerStart *ExecutionHandlerStart } func MustNewSteps(v *viper.Viper) *Steps { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index eead1980ed..58bc89d2e4 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -198,7 +198,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s35AddPositionToIndexEsWm = &AddPositionToIndexEsWm{dbClient: dbClient} steps.s36FillV2Milestones = &FillV3Milestones{dbClient: dbClient, eventstore: eventstoreClient} steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: dbClient} - steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient, esClient: eventstoreClient} + steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient} steps.s40InitPushFunc = &InitPushFunc{dbClient: dbClient} steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: dbClient} steps.s43CreateFieldsDomainIndex = &CreateFieldsDomainIndex{dbClient: dbClient} @@ -213,6 +213,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s52IDPTemplate6LDAP2 = &IDPTemplate6LDAP2{dbClient: dbClient} steps.s53InitPermittedOrgsFunction = &InitPermittedOrgsFunction53{dbClient: dbClient} steps.s54InstancePositionIndex = &InstancePositionIndex{dbClient: dbClient} + steps.s55ExecutionHandlerStart = &ExecutionHandlerStart{dbClient: dbClient} err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -256,6 +257,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s52IDPTemplate6LDAP2, steps.s53InitPermittedOrgsFunction, steps.s54InstancePositionIndex, + steps.s55ExecutionHandlerStart, } { setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed") if setupErr != nil { diff --git a/cmd/start/start.go b/cmd/start/start.go index e3d84625b4..52d9c6fba8 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -304,7 +304,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server execution.Register( ctx, - config.Projections.Customizations["executions"], + config.Projections.Customizations["execution_handler"], config.Executions, queries, eventstoreClient.EventTypes(),