fix: add current state for execution handler into setup (#9863)

# Which Problems Are Solved

The execution handler projection handles all events to check if an
execution has to be provided to the worker to execute.
In this logic all events would be processed from the beginning which is
not necessary.

# How the Problems Are Solved

Add the current state to the execution handler projection, to avoid
processing all existing events.

# Additional Changes

Add custom configuration to the default, so that the transactions are
limited to some events.

# Additional Context

None
This commit is contained in:
Stefan Benz 2025-05-07 16:26:53 +02:00 committed by GitHub
parent c6aa6385b6
commit 21167a4bba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 56 additions and 4 deletions

View File

@ -387,7 +387,8 @@ Projections:
org_domain_verified_fields:
TransactionDuration: 0s
BulkLimit: 2000
execution_handler:
BulkLimit: 10
# The Notifications projection is used for preparing the messages (emails and SMS) to be sent to users
Notifications:
# As notification projections don't result in database statements, retries don't have an effect

View File

@ -15,7 +15,6 @@ var (
type BackChannelLogoutNotificationStart struct {
dbClient *database.DB
esClient *eventstore.Eventstore
}
func (mig *BackChannelLogoutNotificationStart) Execute(ctx context.Context, e eventstore.Event) error {

27
cmd/setup/55.go Normal file
View File

@ -0,0 +1,27 @@
package setup
import (
"context"
_ "embed"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
//go:embed 55.sql
executionHandlerCurrentState string
)
type ExecutionHandlerStart struct {
dbClient *database.DB
}
func (mig *ExecutionHandlerStart) Execute(ctx context.Context, e eventstore.Event) error {
_, err := mig.dbClient.ExecContext(ctx, executionHandlerCurrentState, e.Sequence(), e.CreatedAt(), e.Position())
return err
}
func (mig *ExecutionHandlerStart) String() string {
return "55_execution_handler_start"
}

22
cmd/setup/55.sql Normal file
View File

@ -0,0 +1,22 @@
INSERT INTO projections.current_states AS cs ( instance_id
, projection_name
, last_updated
, sequence
, event_date
, position
, filter_offset)
SELECT instance_id
, 'projections.execution_handler'
, now()
, $1
, $2
, $3
, 0
FROM eventstore.events2 AS e
WHERE aggregate_type = 'instance'
AND event_type = 'instance.added'
ON CONFLICT (instance_id, projection_name) DO UPDATE SET last_updated = EXCLUDED.last_updated,
sequence = EXCLUDED.sequence,
event_date = EXCLUDED.event_date,
position = EXCLUDED.position,
filter_offset = EXCLUDED.filter_offset;

View File

@ -151,6 +151,7 @@ type Steps struct {
s52IDPTemplate6LDAP2 *IDPTemplate6LDAP2
s53InitPermittedOrgsFunction *InitPermittedOrgsFunction53
s54InstancePositionIndex *InstancePositionIndex
s55ExecutionHandlerStart *ExecutionHandlerStart
}
func MustNewSteps(v *viper.Viper) *Steps {

View File

@ -198,7 +198,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s35AddPositionToIndexEsWm = &AddPositionToIndexEsWm{dbClient: dbClient}
steps.s36FillV2Milestones = &FillV3Milestones{dbClient: dbClient, eventstore: eventstoreClient}
steps.s37Apps7OIDConfigsBackChannelLogoutURI = &Apps7OIDConfigsBackChannelLogoutURI{dbClient: dbClient}
steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient, esClient: eventstoreClient}
steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: dbClient}
steps.s40InitPushFunc = &InitPushFunc{dbClient: dbClient}
steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: dbClient}
steps.s43CreateFieldsDomainIndex = &CreateFieldsDomainIndex{dbClient: dbClient}
@ -213,6 +213,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s52IDPTemplate6LDAP2 = &IDPTemplate6LDAP2{dbClient: dbClient}
steps.s53InitPermittedOrgsFunction = &InitPermittedOrgsFunction53{dbClient: dbClient}
steps.s54InstancePositionIndex = &InstancePositionIndex{dbClient: dbClient}
steps.s55ExecutionHandlerStart = &ExecutionHandlerStart{dbClient: dbClient}
err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections")
@ -256,6 +257,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s52IDPTemplate6LDAP2,
steps.s53InitPermittedOrgsFunction,
steps.s54InstancePositionIndex,
steps.s55ExecutionHandlerStart,
} {
setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed")
if setupErr != nil {

View File

@ -304,7 +304,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server
execution.Register(
ctx,
config.Projections.Customizations["executions"],
config.Projections.Customizations["execution_handler"],
config.Executions,
queries,
eventstoreClient.EventTypes(),