fix(notifications): improve error handling (#8994)

# Which Problems Are Solved

While running the latest RC / main, we noticed some errors including
context timeouts and rollback issues.

# How the Problems Are Solved

- The transaction context is passed and used for any event being written
and for handling savepoints to be able to handle context timeouts.
- The user projection is not triggered anymore. This will reduce
unnecessary load and potential timeouts if lot of workers are running.
In case a user would not be projected yet, the request event will log an
error and then be skipped / retried on the next run.
- Additionally, the context is checked if being closed after each event
process.
- `latestRetries` now correctly only returns the latest retry events to
be processed
- Default values for notifications have been changed to run workers less
often, more retry delay, but less transaction duration.

# Additional Changes

None

# Additional Context

relates to #8931

---------

Co-authored-by: Tim Möhlmann <tim+github@zitadel.com>
This commit is contained in:
Livio Spring 2024-12-04 21:17:49 +01:00 committed by GitHub
parent 6614aacf78
commit 7f0378636b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 46 additions and 41 deletions

View File

@ -456,13 +456,13 @@ Notifications:
# The amount of events a single worker will process in a run. # The amount of events a single worker will process in a run.
BulkLimit: 10 # ZITADEL_NOTIFIACATIONS_BULKLIMIT BulkLimit: 10 # ZITADEL_NOTIFIACATIONS_BULKLIMIT
# Time interval between scheduled notifications for request events # Time interval between scheduled notifications for request events
RequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY RequeueEvery: 5s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY
# The amount of workers processing the notification retry events. # The amount of workers processing the notification retry events.
# If set to 0, no notification retry events will be handled. This can be useful when running in # If set to 0, no notification retry events will be handled. This can be useful when running in
# multi binary / pod setup and allowing only certain executables to process the events. # multi binary / pod setup and allowing only certain executables to process the events.
RetryWorkers: 1 # ZITADEL_NOTIFIACATIONS_RETRYWORKERS RetryWorkers: 1 # ZITADEL_NOTIFIACATIONS_RETRYWORKERS
# Time interval between scheduled notifications for retry events # Time interval between scheduled notifications for retry events
RetryRequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY RetryRequeueEvery: 5s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY
# Only instances are projected, for which at least a projection-relevant event exists within the timeframe # Only instances are projected, for which at least a projection-relevant event exists within the timeframe
# from HandleActiveInstances duration in the past until the projection's current time # from HandleActiveInstances duration in the past until the projection's current time
# If set to 0 (default), every instance is always considered active # If set to 0 (default), every instance is always considered active
@ -470,15 +470,15 @@ Notifications:
# The maximum duration a transaction remains open # The maximum duration a transaction remains open
# before it spots left folding additional events # before it spots left folding additional events
# and updates the table. # and updates the table.
TransactionDuration: 1m # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION TransactionDuration: 10s # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION
# Automatically cancel the notification after the amount of failed attempts # Automatically cancel the notification after the amount of failed attempts
MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS
# Automatically cancel the notification if it cannot be handled within a specific time # Automatically cancel the notification if it cannot be handled within a specific time
MaxTtl: 5m # ZITADEL_NOTIFIACATIONS_MAXTTL MaxTtl: 5m # ZITADEL_NOTIFIACATIONS_MAXTTL
# Failed attempts are retried after a confogired delay (with exponential backoff). # Failed attempts are retried after a confogired delay (with exponential backoff).
# Set a minimum and maximum delay and a factor for the backoff # Set a minimum and maximum delay and a factor for the backoff
MinRetryDelay: 1s # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY MinRetryDelay: 5s # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY
MaxRetryDelay: 20s # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY MaxRetryDelay: 1m # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY
# Any factor below 1 will be set to 1 # Any factor below 1 will be set to 1
RetryDelayFactor: 1.5 # ZITADEL_NOTIFIACATIONS_RETRYDELAYFACTOR RetryDelayFactor: 1.5 # ZITADEL_NOTIFIACATIONS_RETRYDELAYFACTOR

View File

@ -27,9 +27,8 @@ import (
) )
const ( const (
Domain = "Domain" Code = "Code"
Code = "Code" OTP = "OTP"
OTP = "OTP"
) )
type NotificationWorker struct { type NotificationWorker struct {
@ -106,17 +105,19 @@ func (w *NotificationWorker) Start(ctx context.Context) {
} }
} }
func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx *sql.Tx, event *notification.RequestedEvent) (err error) { func (w *NotificationWorker) reduceNotificationRequested(ctx, txCtx context.Context, tx *sql.Tx, event *notification.RequestedEvent) (err error) {
ctx = ContextWithNotifier(ctx, event.Aggregate()) ctx = ContextWithNotifier(ctx, event.Aggregate())
// if the notification is too old, we can directly cancel // if the notification is too old, we can directly cancel
if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) { if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, nil) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, nil)
} }
// Get the notify user first, so if anything fails afterward we have the current state of the user // Get the notify user first, so if anything fails afterward we have the current state of the user
// and can pass that to the retry request. // and can pass that to the retry request.
notifyUser, err := w.queries.GetNotifyUserByID(ctx, true, event.UserID) // We do not trigger the projection to reduce load on the database. By the time the notification is processed,
// the user should be projected anyway. If not, it will just wait for the next run.
notifyUser, err := w.queries.GetNotifyUserByID(ctx, false, event.UserID)
if err != nil { if err != nil {
return err return err
} }
@ -128,17 +129,17 @@ func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx
event.Request.Args.Domain = notifyUser.LastEmail[index+1:] event.Request.Args.Domain = notifyUser.LastEmail[index+1:]
} }
err = w.sendNotification(ctx, tx, event.Request, notifyUser, event) err = w.sendNotification(ctx, txCtx, tx, event.Request, notifyUser, event)
if err == nil { if err == nil {
return nil return nil
} }
// if retries are disabled or if the error explicitly specifies, we cancel the notification // if retries are disabled or if the error explicitly specifies, we cancel the notification
if w.config.MaxAttempts <= 1 || errors.Is(err, &channels.CancelError{}) { if w.config.MaxAttempts <= 1 || errors.Is(err, &channels.CancelError{}) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err)
} }
// otherwise we retry after a backoff delay // otherwise we retry after a backoff delay
return w.commands.NotificationRetryRequested( return w.commands.NotificationRetryRequested(
ctx, txCtx,
tx, tx,
event.Aggregate().ID, event.Aggregate().ID,
event.Aggregate().ResourceOwner, event.Aggregate().ResourceOwner,
@ -147,49 +148,44 @@ func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx
) )
} }
func (w *NotificationWorker) reduceNotificationRetry(ctx context.Context, tx *sql.Tx, event *notification.RetryRequestedEvent) (err error) { func (w *NotificationWorker) reduceNotificationRetry(ctx, txCtx context.Context, tx *sql.Tx, event *notification.RetryRequestedEvent) (err error) {
ctx = ContextWithNotifier(ctx, event.Aggregate()) ctx = ContextWithNotifier(ctx, event.Aggregate())
// if the notification is too old, we can directly cancel // if the notification is too old, we can directly cancel
if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) { if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err)
} }
if event.CreatedAt().Add(event.BackOff).After(w.now()) { if event.CreatedAt().Add(event.BackOff).After(w.now()) {
return nil return nil
} }
err = w.sendNotification(ctx, tx, event.Request, event.NotifyUser, event) err = w.sendNotification(ctx, txCtx, tx, event.Request, event.NotifyUser, event)
if err == nil { if err == nil {
return nil return nil
} }
// if the max attempts are reached or if the error explicitly specifies, we cancel the notification // if the max attempts are reached or if the error explicitly specifies, we cancel the notification
if event.Sequence() >= uint64(w.config.MaxAttempts) || errors.Is(err, &channels.CancelError{}) { if event.Sequence() >= uint64(w.config.MaxAttempts) || errors.Is(err, &channels.CancelError{}) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err)
} }
// otherwise we retry after a backoff delay // otherwise we retry after a backoff delay
return w.commands.NotificationRetryRequested(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, notificationEventToRequest( return w.commands.NotificationRetryRequested(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, notificationEventToRequest(
event.Request, event.Request,
event.NotifyUser, event.NotifyUser,
w.backOff(event.BackOff), w.backOff(event.BackOff),
), err) ), err)
} }
func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error { func (w *NotificationWorker) sendNotification(ctx, txCtx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error {
ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin) ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin)
if err != nil { if err != nil {
err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err) return channels.NewCancelError(err)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID).
OnError(err).Error("could not cancel notification")
return nil
} }
// check early that a "sent" handler exists, otherwise we can cancel early // check early that a "sent" handler exists, otherwise we can cancel early
sentHandler, ok := sentHandlers[request.EventType] sentHandler, ok := sentHandlers[request.EventType]
if !ok { if !ok {
err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err) logging.Errorf(`no "sent" handler registered for %s`, request.EventType)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID). return channels.NewCancelError(err)
OnError(err).Errorf(`no "sent" handler registered for %s`, request.EventType)
return nil
} }
var code string var code string
@ -233,7 +229,7 @@ func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, r
if err := notify(request.URLTemplate, args, request.MessageType, request.UnverifiedNotificationChannel); err != nil { if err := notify(request.URLTemplate, args, request.MessageType, request.UnverifiedNotificationChannel); err != nil {
return err return err
} }
err = w.commands.NotificationSent(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner) err = w.commands.NotificationSent(txCtx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner)
if err != nil { if err != nil {
// In case the notification event cannot be pushed, we most likely cannot create a retry or cancel event. // In case the notification event cannot be pushed, we most likely cannot create a retry or cancel event.
// Therefore, we'll only log the error and also do not need to try to push to the user / session. // Therefore, we'll only log the error and also do not need to try to push to the user / session.
@ -241,7 +237,7 @@ func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, r
OnError(err).Error("could not set sent notification event") OnError(err).Error("could not set sent notification event")
return nil return nil
} }
err = sentHandler(ctx, w.commands, request.NotificationAggregateID(), request.NotificationAggregateResourceOwner(), generatorInfo, args) err = sentHandler(txCtx, w.commands, request.NotificationAggregateID(), request.NotificationAggregateResourceOwner(), generatorInfo, args)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID). logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID).
OnError(err).Error("could not set notification event on aggregate") OnError(err).Error("could not set notification event on aggregate")
return nil return nil
@ -363,7 +359,7 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo
err = database.CloseTransaction(tx, err) err = database.CloseTransaction(tx, err)
}() }()
events, err := w.searchEvents(ctx, tx, retry) events, err := w.searchEvents(txCtx, tx, retry)
if err != nil { if err != nil {
return err return err
} }
@ -382,11 +378,11 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo
var err error var err error
switch e := event.(type) { switch e := event.(type) {
case *notification.RequestedEvent: case *notification.RequestedEvent:
w.createSavepoint(ctx, tx, event, workerID, retry) w.createSavepoint(txCtx, tx, event, workerID, retry)
err = w.reduceNotificationRequested(ctx, tx, e) err = w.reduceNotificationRequested(ctx, txCtx, tx, e)
case *notification.RetryRequestedEvent: case *notification.RetryRequestedEvent:
w.createSavepoint(ctx, tx, event, workerID, retry) w.createSavepoint(txCtx, tx, event, workerID, retry)
err = w.reduceNotificationRetry(ctx, tx, e) err = w.reduceNotificationRetry(ctx, txCtx, tx, e)
} }
if err != nil { if err != nil {
w.log(workerID, retry).OnError(err). w.log(workerID, retry).OnError(err).
@ -394,14 +390,20 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo
WithField("notificationID", event.Aggregate().ID). WithField("notificationID", event.Aggregate().ID).
WithField("sequence", event.Sequence()). WithField("sequence", event.Sequence()).
WithField("type", event.Type()). WithField("type", event.Type()).
Error("could not push notification event") Error("could not handle notification event")
w.rollbackToSavepoint(ctx, tx, event, workerID, retry) // if we have an error, we rollback to the savepoint and continue with the next event
// we use the txCtx to make sure we can rollback the transaction in case the ctx is canceled
w.rollbackToSavepoint(txCtx, tx, event, workerID, retry)
}
// if the context is canceled, we stop the processing
if ctx.Err() != nil {
return nil
} }
} }
return nil return nil
} }
func (w *NotificationWorker) latestRetries(events []eventstore.Event) { func (w *NotificationWorker) latestRetries(events []eventstore.Event) []eventstore.Event {
for i := len(events) - 1; i > 0; i-- { for i := len(events) - 1; i > 0; i-- {
// since we delete during the iteration, we need to make sure we don't panic // since we delete during the iteration, we need to make sure we don't panic
if len(events) <= i { if len(events) <= i {
@ -413,6 +415,7 @@ func (w *NotificationWorker) latestRetries(events []eventstore.Event) {
e.Sequence() < events[i].Sequence() e.Sequence() < events[i].Sequence()
}) })
} }
return events
} }
func (w *NotificationWorker) createSavepoint(ctx context.Context, tx *sql.Tx, event eventstore.Event, workerID int, retry bool) { func (w *NotificationWorker) createSavepoint(ctx context.Context, tx *sql.Tx, event eventstore.Event, workerID int, retry bool) {
@ -476,8 +479,7 @@ func (w *NotificationWorker) searchRetryEvents(ctx context.Context, tx *sql.Tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.latestRetries(events) return w.latestRetries(events), nil
return events, nil
} }
type existingInstances []string type existingInstances []string

View File

@ -420,6 +420,7 @@ func Test_userNotifier_reduceNotificationRequested(t *testing.T) {
commands := mock.NewMockCommands(ctrl) commands := mock.NewMockCommands(ctrl)
f, a, w := tt.test(ctrl, queries, commands) f, a, w := tt.test(ctrl, queries, commands)
err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRequested( err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRequested(
authz.WithInstanceID(context.Background(), instanceID),
authz.WithInstanceID(context.Background(), instanceID), authz.WithInstanceID(context.Background(), instanceID),
&sql.Tx{}, &sql.Tx{},
a.event.(*notification.RequestedEvent)) a.event.(*notification.RequestedEvent))
@ -798,9 +799,11 @@ func Test_userNotifier_reduceNotificationRetry(t *testing.T) {
commands := mock.NewMockCommands(ctrl) commands := mock.NewMockCommands(ctrl)
f, a, w := tt.test(ctrl, queries, commands) f, a, w := tt.test(ctrl, queries, commands)
err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRetry( err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRetry(
authz.WithInstanceID(context.Background(), instanceID),
authz.WithInstanceID(context.Background(), instanceID), authz.WithInstanceID(context.Background(), instanceID),
&sql.Tx{}, &sql.Tx{},
a.event.(*notification.RetryRequestedEvent)) a.event.(*notification.RetryRequestedEvent),
)
if w.err != nil { if w.err != nil {
w.err(t, err) w.err(t, err)
} else { } else {