From c877add3637b678b38265ad8d28feb055b30f220 Mon Sep 17 00:00:00 2001 From: Stefan Benz <46600784+stebenz@users.noreply.github.com> Date: Wed, 7 May 2025 16:26:53 +0200 Subject: [PATCH] fix: add current state for execution handler into setup (#9863) # Which Problems Are Solved The execution handler projection handles all events to check if an execution has to be provided to the worker to execute. In this logic all events would be processed from the beginning which is not necessary. # How the Problems Are Solved Add the current state to the execution handler projection, to avoid processing all existing events. # Additional Changes Add custom configuration to the default, so that the transactions are limited to some events. # Additional Context None (cherry picked from commit 21167a4bba8b74720422f2b09ff6753ddb24b67d) --- cmd/defaults.yaml | 3 ++- cmd/setup/38.go | 1 - cmd/setup/55.go | 27 +++++++++++++++++++++++++++ cmd/setup/55.sql | 22 ++++++++++++++++++++++ cmd/setup/config.go | 1 + cmd/setup/setup.go | 4 +++- cmd/start/start.go | 2 +- 7 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 cmd/setup/55.go create mode 100644 cmd/setup/55.sql diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index dcde29313f..23d0ad8030 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -387,7 +387,8 @@ Projections: org_domain_verified_fields: TransactionDuration: 0s BulkLimit: 2000 - + execution_handler: + BulkLimit: 10 # The Notifications projection is used for preparing the messages (emails and SMS) to be sent to users Notifications: # As notification projections don't result in database statements, retries don't have an effect diff --git a/cmd/setup/38.go b/cmd/setup/38.go index 0a102c9d12..810510bfdd 100644 --- a/cmd/setup/38.go +++ b/cmd/setup/38.go @@ -15,7 +15,6 @@ var ( type BackChannelLogoutNotificationStart struct { dbClient *database.DB - esClient *eventstore.Eventstore } func (mig *BackChannelLogoutNotificationStart) Execute(ctx context.Context, e eventstore.Event) error { diff --git a/cmd/setup/55.go b/cmd/setup/55.go new file mode 100644 index 0000000000..19083515e5 --- /dev/null +++ b/cmd/setup/55.go @@ -0,0 +1,27 @@ +package setup + +import ( + "context" + _ "embed" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" +) + +var ( + //go:embed 55.sql + executionHandlerCurrentState string +) + +type ExecutionHandlerStart struct { + dbClient *database.DB +} + +func (mig *ExecutionHandlerStart) Execute(ctx context.Context, e eventstore.Event) error { + _, err := mig.dbClient.ExecContext(ctx, executionHandlerCurrentState, e.Sequence(), e.CreatedAt(), e.Position()) + return err +} + +func (mig *ExecutionHandlerStart) String() string { + return "55_execution_handler_start" +} diff --git a/cmd/setup/55.sql b/cmd/setup/55.sql new file mode 100644 index 0000000000..60c45d5f94 --- /dev/null +++ b/cmd/setup/55.sql @@ -0,0 +1,22 @@ +INSERT INTO projections.current_states AS cs ( instance_id + , projection_name + , last_updated + , sequence + , event_date + , position + , filter_offset) +SELECT instance_id + , 'projections.execution_handler' + , now() + , $1 + , $2 + , $3 + , 0 +FROM eventstore.events2 AS e +WHERE aggregate_type = 'instance' + AND event_type = 'instance.added' +ON CONFLICT (instance_id, projection_name) DO UPDATE SET last_updated = EXCLUDED.last_updated, + sequence = EXCLUDED.sequence, + event_date = EXCLUDED.event_date, + position = EXCLUDED.position, + filter_offset = EXCLUDED.filter_offset; \ No newline at end of file diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 127f9d7599..5e5c842b14 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -151,6 +151,7 @@ type Steps struct { s52IDPTemplate6LDAP2 *IDPTemplate6LDAP2 s53InitPermittedOrgsFunction *InitPermittedOrgsFunction53 s54InstancePositionIndex *InstancePositionIndex + s55ExecutionHandlerStart *ExecutionHandlerStart } func MustNewSteps(v *viper.Viper) *Steps { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index eead1980ed..58bc89d2e4 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -198,7 +198,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s35AddPositionToIndexEsWm = &AddPositionToIndexEsWm{dbClient: dbClient} steps.s36FillV2Milestones = &FillV3Milestones{dbClient: dbClient, eventstore: eventstoreClient} steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: dbClient} - steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient, esClient: eventstoreClient} + steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient} steps.s40InitPushFunc = &InitPushFunc{dbClient: dbClient} steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: dbClient} steps.s43CreateFieldsDomainIndex = &CreateFieldsDomainIndex{dbClient: dbClient} @@ -213,6 +213,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s52IDPTemplate6LDAP2 = &IDPTemplate6LDAP2{dbClient: dbClient} steps.s53InitPermittedOrgsFunction = &InitPermittedOrgsFunction53{dbClient: dbClient} steps.s54InstancePositionIndex = &InstancePositionIndex{dbClient: dbClient} + steps.s55ExecutionHandlerStart = &ExecutionHandlerStart{dbClient: dbClient} err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -256,6 +257,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s52IDPTemplate6LDAP2, steps.s53InitPermittedOrgsFunction, steps.s54InstancePositionIndex, + steps.s55ExecutionHandlerStart, } { setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed") if setupErr != nil { diff --git a/cmd/start/start.go b/cmd/start/start.go index e3d84625b4..52d9c6fba8 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -304,7 +304,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server execution.Register( ctx, - config.Projections.Customizations["executions"], + config.Projections.Customizations["execution_handler"], config.Executions, queries, eventstoreClient.EventTypes(),