zitadel/internal/notification/handlers/commands.go
Livio Spring 8537805ea5
feat(notification): use event worker pool (#8962)
# Which Problems Are Solved

The current handling of notification follows the same pattern as all
other projections:
Created events are handled sequentially (based on "position") by a
handler. During the process, a lot of information is aggregated (user,
texts, templates, ...).
This leads to back pressure on the projection since the handling of
events might take longer than the time before a new event (to be
handled) is created.

# How the Problems Are Solved

- The current user notification handler creates separate notification
events based on the user / session events.
- These events contain all the present and required information
including the userID.
- These notification events get processed by notification workers, which
gather the necessary information (recipient address, texts, templates)
to send out these notifications.
- If a notification fails, a retry event is created based on the current
notification request including the current state of the user (this
prevents race conditions, where a user is changed in the meantime and
the notification already gets the new state).
- The retry event will be handled after a backoff delay. This delay
increases with every attempt.
- If the configured amount of attempts is reached or the message expired
(based on config), a cancel event is created, letting the workers know,
the notification must no longer be handled.
- In case of successful send, a sent event is created for the
notification aggregate and the existing "sent" events for the user /
session object is stored.
- The following is added to the defaults.yaml to allow configuration of
the notification workers:
```yaml

Notifications:
  # The amount of workers processing the notification request events.
  # If set to 0, no notification request events will be handled. This can be useful when running in
  # multi binary / pod setup and allowing only certain executables to process the events.
  Workers: 1 # ZITADEL_NOTIFIACATIONS_WORKERS
  # The amount of events a single worker will process in a run.
  BulkLimit: 10 # ZITADEL_NOTIFIACATIONS_BULKLIMIT
  # Time interval between scheduled notifications for request events
  RequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY
  # The amount of workers processing the notification retry events.
  # If set to 0, no notification retry events will be handled. This can be useful when running in
  # multi binary / pod setup and allowing only certain executables to process the events.
  RetryWorkers: 1 # ZITADEL_NOTIFIACATIONS_RETRYWORKERS
  # Time interval between scheduled notifications for retry events
  RetryRequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY
  # Only instances are projected, for which at least a projection-relevant event exists within the timeframe
  # from HandleActiveInstances duration in the past until the projection's current time
  # If set to 0 (default), every instance is always considered active
  HandleActiveInstances: 0s # ZITADEL_NOTIFIACATIONS_HANDLEACTIVEINSTANCES
  # The maximum duration a transaction remains open
  # before it spots left folding additional events
  # and updates the table.
  TransactionDuration: 1m # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION
  # Automatically cancel the notification after the amount of failed attempts
  MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS
  # Automatically cancel the notification if it cannot be handled within a specific time
  MaxTtl: 5m  # ZITADEL_NOTIFIACATIONS_MAXTTL
  # Failed attempts are retried after a confogired delay (with exponential backoff).
  # Set a minimum and maximum delay and a factor for the backoff
  MinRetryDelay: 1s  # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY
  MaxRetryDelay: 20s # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY
  # Any factor below 1 will be set to 1
  RetryDelayFactor: 1.5 # ZITADEL_NOTIFIACATIONS_RETRYDELAYFACTOR
```


# Additional Changes

None

# Additional Context

- closes #8931
2024-11-27 15:01:17 +00:00

33 lines
2.0 KiB
Go

package handlers
import (
"context"
"database/sql"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/notification/senders"
"github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/repository/quota"
)
type Commands interface {
RequestNotification(ctx context.Context, instanceID string, request *command.NotificationRequest) error
NotificationCanceled(ctx context.Context, tx *sql.Tx, id, resourceOwner string, err error) error
NotificationRetryRequested(ctx context.Context, tx *sql.Tx, id, resourceOwner string, request *command.NotificationRetryRequest, err error) error
NotificationSent(ctx context.Context, tx *sql.Tx, id, instanceID string) error
HumanInitCodeSent(ctx context.Context, orgID, userID string) error
HumanEmailVerificationCodeSent(ctx context.Context, orgID, userID string) error
PasswordCodeSent(ctx context.Context, orgID, userID string, generatorInfo *senders.CodeGeneratorInfo) error
HumanOTPSMSCodeSent(ctx context.Context, userID, resourceOwner string, generatorInfo *senders.CodeGeneratorInfo) error
HumanOTPEmailCodeSent(ctx context.Context, userID, resourceOwner string) error
OTPSMSSent(ctx context.Context, sessionID, resourceOwner string, generatorInfo *senders.CodeGeneratorInfo) error
OTPEmailSent(ctx context.Context, sessionID, resourceOwner string) error
UserDomainClaimedSent(ctx context.Context, orgID, userID string) error
HumanPasswordlessInitCodeSent(ctx context.Context, userID, resourceOwner, codeID string) error
PasswordChangeSent(ctx context.Context, orgID, userID string) error
HumanPhoneVerificationCodeSent(ctx context.Context, orgID, userID string, generatorInfo *senders.CodeGeneratorInfo) error
InviteCodeSent(ctx context.Context, orgID, userID string) error
UsageNotificationSent(ctx context.Context, dueEvent *quota.NotificationDueEvent) error
MilestonePushed(ctx context.Context, instanceID string, msType milestone.Type, endpoints []string) error
}