diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 21ed1a5e53..a983c7125a 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -456,13 +456,13 @@ Notifications: # The amount of events a single worker will process in a run. BulkLimit: 10 # ZITADEL_NOTIFIACATIONS_BULKLIMIT # Time interval between scheduled notifications for request events - RequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY + RequeueEvery: 5s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY # The amount of workers processing the notification retry events. # If set to 0, no notification retry events will be handled. This can be useful when running in # multi binary / pod setup and allowing only certain executables to process the events. RetryWorkers: 1 # ZITADEL_NOTIFIACATIONS_RETRYWORKERS # Time interval between scheduled notifications for retry events - RetryRequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY + RetryRequeueEvery: 5s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY # Only instances are projected, for which at least a projection-relevant event exists within the timeframe # from HandleActiveInstances duration in the past until the projection's current time # If set to 0 (default), every instance is always considered active @@ -470,15 +470,15 @@ Notifications: # The maximum duration a transaction remains open # before it spots left folding additional events # and updates the table. - TransactionDuration: 1m # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION + TransactionDuration: 10s # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION # Automatically cancel the notification after the amount of failed attempts MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS # Automatically cancel the notification if it cannot be handled within a specific time MaxTtl: 5m # ZITADEL_NOTIFIACATIONS_MAXTTL # Failed attempts are retried after a confogired delay (with exponential backoff). # Set a minimum and maximum delay and a factor for the backoff - MinRetryDelay: 1s # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY - MaxRetryDelay: 20s # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY + MinRetryDelay: 5s # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY + MaxRetryDelay: 1m # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY # Any factor below 1 will be set to 1 RetryDelayFactor: 1.5 # ZITADEL_NOTIFIACATIONS_RETRYDELAYFACTOR diff --git a/internal/notification/handlers/notification_worker.go b/internal/notification/handlers/notification_worker.go index 96ecd755dd..6d90b2acb4 100644 --- a/internal/notification/handlers/notification_worker.go +++ b/internal/notification/handlers/notification_worker.go @@ -27,9 +27,8 @@ import ( ) const ( - Domain = "Domain" - Code = "Code" - OTP = "OTP" + Code = "Code" + OTP = "OTP" ) type NotificationWorker struct { @@ -106,17 +105,19 @@ func (w *NotificationWorker) Start(ctx context.Context) { } } -func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx *sql.Tx, event *notification.RequestedEvent) (err error) { +func (w *NotificationWorker) reduceNotificationRequested(ctx, txCtx context.Context, tx *sql.Tx, event *notification.RequestedEvent) (err error) { ctx = ContextWithNotifier(ctx, event.Aggregate()) // if the notification is too old, we can directly cancel if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) { - return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, nil) + return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, nil) } // Get the notify user first, so if anything fails afterward we have the current state of the user // and can pass that to the retry request. - notifyUser, err := w.queries.GetNotifyUserByID(ctx, true, event.UserID) + // We do not trigger the projection to reduce load on the database. By the time the notification is processed, + // the user should be projected anyway. If not, it will just wait for the next run. + notifyUser, err := w.queries.GetNotifyUserByID(ctx, false, event.UserID) if err != nil { return err } @@ -128,17 +129,17 @@ func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx event.Request.Args.Domain = notifyUser.LastEmail[index+1:] } - err = w.sendNotification(ctx, tx, event.Request, notifyUser, event) + err = w.sendNotification(ctx, txCtx, tx, event.Request, notifyUser, event) if err == nil { return nil } // if retries are disabled or if the error explicitly specifies, we cancel the notification if w.config.MaxAttempts <= 1 || errors.Is(err, &channels.CancelError{}) { - return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) + return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) } // otherwise we retry after a backoff delay return w.commands.NotificationRetryRequested( - ctx, + txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, @@ -147,49 +148,44 @@ func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx ) } -func (w *NotificationWorker) reduceNotificationRetry(ctx context.Context, tx *sql.Tx, event *notification.RetryRequestedEvent) (err error) { +func (w *NotificationWorker) reduceNotificationRetry(ctx, txCtx context.Context, tx *sql.Tx, event *notification.RetryRequestedEvent) (err error) { ctx = ContextWithNotifier(ctx, event.Aggregate()) // if the notification is too old, we can directly cancel if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) { - return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) + return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) } if event.CreatedAt().Add(event.BackOff).After(w.now()) { return nil } - err = w.sendNotification(ctx, tx, event.Request, event.NotifyUser, event) + err = w.sendNotification(ctx, txCtx, tx, event.Request, event.NotifyUser, event) if err == nil { return nil } // if the max attempts are reached or if the error explicitly specifies, we cancel the notification if event.Sequence() >= uint64(w.config.MaxAttempts) || errors.Is(err, &channels.CancelError{}) { - return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) + return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) } // otherwise we retry after a backoff delay - return w.commands.NotificationRetryRequested(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, notificationEventToRequest( + return w.commands.NotificationRetryRequested(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, notificationEventToRequest( event.Request, event.NotifyUser, w.backOff(event.BackOff), ), err) } -func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error { +func (w *NotificationWorker) sendNotification(ctx, txCtx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error { ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin) if err != nil { - err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err) - logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID). - OnError(err).Error("could not cancel notification") - return nil + return channels.NewCancelError(err) } // check early that a "sent" handler exists, otherwise we can cancel early sentHandler, ok := sentHandlers[request.EventType] if !ok { - err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err) - logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID). - OnError(err).Errorf(`no "sent" handler registered for %s`, request.EventType) - return nil + logging.Errorf(`no "sent" handler registered for %s`, request.EventType) + return channels.NewCancelError(err) } var code string @@ -233,7 +229,7 @@ func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, r if err := notify(request.URLTemplate, args, request.MessageType, request.UnverifiedNotificationChannel); err != nil { return err } - err = w.commands.NotificationSent(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner) + err = w.commands.NotificationSent(txCtx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner) if err != nil { // In case the notification event cannot be pushed, we most likely cannot create a retry or cancel event. // Therefore, we'll only log the error and also do not need to try to push to the user / session. @@ -241,7 +237,7 @@ func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, r OnError(err).Error("could not set sent notification event") return nil } - err = sentHandler(ctx, w.commands, request.NotificationAggregateID(), request.NotificationAggregateResourceOwner(), generatorInfo, args) + err = sentHandler(txCtx, w.commands, request.NotificationAggregateID(), request.NotificationAggregateResourceOwner(), generatorInfo, args) logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID). OnError(err).Error("could not set notification event on aggregate") return nil @@ -363,7 +359,7 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo err = database.CloseTransaction(tx, err) }() - events, err := w.searchEvents(ctx, tx, retry) + events, err := w.searchEvents(txCtx, tx, retry) if err != nil { return err } @@ -382,11 +378,11 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo var err error switch e := event.(type) { case *notification.RequestedEvent: - w.createSavepoint(ctx, tx, event, workerID, retry) - err = w.reduceNotificationRequested(ctx, tx, e) + w.createSavepoint(txCtx, tx, event, workerID, retry) + err = w.reduceNotificationRequested(ctx, txCtx, tx, e) case *notification.RetryRequestedEvent: - w.createSavepoint(ctx, tx, event, workerID, retry) - err = w.reduceNotificationRetry(ctx, tx, e) + w.createSavepoint(txCtx, tx, event, workerID, retry) + err = w.reduceNotificationRetry(ctx, txCtx, tx, e) } if err != nil { w.log(workerID, retry).OnError(err). @@ -394,14 +390,20 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo WithField("notificationID", event.Aggregate().ID). WithField("sequence", event.Sequence()). WithField("type", event.Type()). - Error("could not push notification event") - w.rollbackToSavepoint(ctx, tx, event, workerID, retry) + Error("could not handle notification event") + // if we have an error, we rollback to the savepoint and continue with the next event + // we use the txCtx to make sure we can rollback the transaction in case the ctx is canceled + w.rollbackToSavepoint(txCtx, tx, event, workerID, retry) + } + // if the context is canceled, we stop the processing + if ctx.Err() != nil { + return nil } } return nil } -func (w *NotificationWorker) latestRetries(events []eventstore.Event) { +func (w *NotificationWorker) latestRetries(events []eventstore.Event) []eventstore.Event { for i := len(events) - 1; i > 0; i-- { // since we delete during the iteration, we need to make sure we don't panic if len(events) <= i { @@ -413,6 +415,7 @@ func (w *NotificationWorker) latestRetries(events []eventstore.Event) { e.Sequence() < events[i].Sequence() }) } + return events } func (w *NotificationWorker) createSavepoint(ctx context.Context, tx *sql.Tx, event eventstore.Event, workerID int, retry bool) { @@ -476,8 +479,7 @@ func (w *NotificationWorker) searchRetryEvents(ctx context.Context, tx *sql.Tx) if err != nil { return nil, err } - w.latestRetries(events) - return events, nil + return w.latestRetries(events), nil } type existingInstances []string diff --git a/internal/notification/handlers/notification_worker_test.go b/internal/notification/handlers/notification_worker_test.go index 03de5201fc..4ffd33005b 100644 --- a/internal/notification/handlers/notification_worker_test.go +++ b/internal/notification/handlers/notification_worker_test.go @@ -420,6 +420,7 @@ func Test_userNotifier_reduceNotificationRequested(t *testing.T) { commands := mock.NewMockCommands(ctrl) f, a, w := tt.test(ctrl, queries, commands) err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRequested( + authz.WithInstanceID(context.Background(), instanceID), authz.WithInstanceID(context.Background(), instanceID), &sql.Tx{}, a.event.(*notification.RequestedEvent)) @@ -798,9 +799,11 @@ func Test_userNotifier_reduceNotificationRetry(t *testing.T) { commands := mock.NewMockCommands(ctrl) f, a, w := tt.test(ctrl, queries, commands) err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRetry( + authz.WithInstanceID(context.Background(), instanceID), authz.WithInstanceID(context.Background(), instanceID), &sql.Tx{}, - a.event.(*notification.RetryRequestedEvent)) + a.event.(*notification.RetryRequestedEvent), + ) if w.err != nil { w.err(t, err) } else {