diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 0d71b4d817..c13d5337b1 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -387,7 +387,8 @@ Projections: org_domain_verified_fields: TransactionDuration: 0s BulkLimit: 2000 - + execution_handler: + BulkLimit: 10 # The Notifications projection is used for preparing the messages (emails and SMS) to be sent to users Notifications: # As notification projections don't result in database statements, retries don't have an effect diff --git a/cmd/setup/38.go b/cmd/setup/38.go index 0a102c9d12..810510bfdd 100644 --- a/cmd/setup/38.go +++ b/cmd/setup/38.go @@ -15,7 +15,6 @@ var ( type BackChannelLogoutNotificationStart struct { dbClient *database.DB - esClient *eventstore.Eventstore } func (mig *BackChannelLogoutNotificationStart) Execute(ctx context.Context, e eventstore.Event) error { diff --git a/cmd/setup/55.go b/cmd/setup/55.go new file mode 100644 index 0000000000..19083515e5 --- /dev/null +++ b/cmd/setup/55.go @@ -0,0 +1,27 @@ +package setup + +import ( + "context" + _ "embed" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" +) + +var ( + //go:embed 55.sql + executionHandlerCurrentState string +) + +type ExecutionHandlerStart struct { + dbClient *database.DB +} + +func (mig *ExecutionHandlerStart) Execute(ctx context.Context, e eventstore.Event) error { + _, err := mig.dbClient.ExecContext(ctx, executionHandlerCurrentState, e.Sequence(), e.CreatedAt(), e.Position()) + return err +} + +func (mig *ExecutionHandlerStart) String() string { + return "55_execution_handler_start" +} diff --git a/cmd/setup/55.sql b/cmd/setup/55.sql new file mode 100644 index 0000000000..60c45d5f94 --- /dev/null +++ b/cmd/setup/55.sql @@ -0,0 +1,22 @@ +INSERT INTO projections.current_states AS cs ( instance_id + , projection_name + , last_updated + , sequence + , event_date + , position + , filter_offset) +SELECT instance_id + , 'projections.execution_handler' + , now() + , $1 + , $2 + , $3 + , 0 +FROM eventstore.events2 AS e +WHERE aggregate_type = 'instance' + AND event_type = 'instance.added' +ON CONFLICT (instance_id, projection_name) DO UPDATE SET last_updated = EXCLUDED.last_updated, + sequence = EXCLUDED.sequence, + event_date = EXCLUDED.event_date, + position = EXCLUDED.position, + filter_offset = EXCLUDED.filter_offset; \ No newline at end of file diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 127f9d7599..5e5c842b14 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -151,6 +151,7 @@ type Steps struct { s52IDPTemplate6LDAP2 *IDPTemplate6LDAP2 s53InitPermittedOrgsFunction *InitPermittedOrgsFunction53 s54InstancePositionIndex *InstancePositionIndex + s55ExecutionHandlerStart *ExecutionHandlerStart } func MustNewSteps(v *viper.Viper) *Steps { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index eead1980ed..58bc89d2e4 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -198,7 +198,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s35AddPositionToIndexEsWm = &AddPositionToIndexEsWm{dbClient: dbClient} steps.s36FillV2Milestones = &FillV3Milestones{dbClient: dbClient, eventstore: eventstoreClient} steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: dbClient} - steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient, esClient: eventstoreClient} + steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient} steps.s40InitPushFunc = &InitPushFunc{dbClient: dbClient} steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: dbClient} steps.s43CreateFieldsDomainIndex = &CreateFieldsDomainIndex{dbClient: dbClient} @@ -213,6 +213,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s52IDPTemplate6LDAP2 = &IDPTemplate6LDAP2{dbClient: dbClient} steps.s53InitPermittedOrgsFunction = &InitPermittedOrgsFunction53{dbClient: dbClient} steps.s54InstancePositionIndex = &InstancePositionIndex{dbClient: dbClient} + steps.s55ExecutionHandlerStart = &ExecutionHandlerStart{dbClient: dbClient} err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -256,6 +257,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s52IDPTemplate6LDAP2, steps.s53InitPermittedOrgsFunction, steps.s54InstancePositionIndex, + steps.s55ExecutionHandlerStart, } { setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed") if setupErr != nil { diff --git a/cmd/start/start.go b/cmd/start/start.go index e3d84625b4..52d9c6fba8 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -304,7 +304,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server execution.Register( ctx, - config.Projections.Customizations["executions"], + config.Projections.Customizations["execution_handler"], config.Executions, queries, eventstoreClient.EventTypes(),