use transaction context and do not trigger user projection

This commit is contained in:
Livio Spring
2024-12-02 07:12:24 +01:00
parent 8a969f6527
commit 7109fa9004
2 changed files with 31 additions and 25 deletions

View File

@@ -27,9 +27,8 @@ import (
) )
const ( const (
Domain = "Domain" Code = "Code"
Code = "Code" OTP = "OTP"
OTP = "OTP"
) )
type NotificationWorker struct { type NotificationWorker struct {
@@ -106,17 +105,19 @@ func (w *NotificationWorker) Start(ctx context.Context) {
} }
} }
func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx *sql.Tx, event *notification.RequestedEvent) (err error) { func (w *NotificationWorker) reduceNotificationRequested(ctx, txCtx context.Context, tx *sql.Tx, event *notification.RequestedEvent) (err error) {
ctx = ContextWithNotifier(ctx, event.Aggregate()) ctx = ContextWithNotifier(ctx, event.Aggregate())
// if the notification is too old, we can directly cancel // if the notification is too old, we can directly cancel
if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) { if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, nil) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, nil)
} }
// Get the notify user first, so if anything fails afterward we have the current state of the user // Get the notify user first, so if anything fails afterward we have the current state of the user
// and can pass that to the retry request. // and can pass that to the retry request.
notifyUser, err := w.queries.GetNotifyUserByID(ctx, true, event.UserID) // We do not trigger the projection to reduce load on the database. By the time the notification is processed,
// the user should be projected anyway. If not, it will just wait for the next run.
notifyUser, err := w.queries.GetNotifyUserByID(ctx, false, event.UserID)
if err != nil { if err != nil {
return err return err
} }
@@ -128,17 +129,17 @@ func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx
event.Request.Args.Domain = notifyUser.LastEmail[index+1:] event.Request.Args.Domain = notifyUser.LastEmail[index+1:]
} }
err = w.sendNotification(ctx, tx, event.Request, notifyUser, event) err = w.sendNotification(ctx, txCtx, tx, event.Request, notifyUser, event)
if err == nil { if err == nil {
return nil return nil
} }
// if retries are disabled or if the error explicitly specifies, we cancel the notification // if retries are disabled or if the error explicitly specifies, we cancel the notification
if w.config.MaxAttempts <= 1 || errors.Is(err, &channels.CancelError{}) { if w.config.MaxAttempts <= 1 || errors.Is(err, &channels.CancelError{}) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err)
} }
// otherwise we retry after a backoff delay // otherwise we retry after a backoff delay
return w.commands.NotificationRetryRequested( return w.commands.NotificationRetryRequested(
ctx, txCtx,
tx, tx,
event.Aggregate().ID, event.Aggregate().ID,
event.Aggregate().ResourceOwner, event.Aggregate().ResourceOwner,
@@ -147,34 +148,34 @@ func (w *NotificationWorker) reduceNotificationRequested(ctx context.Context, tx
) )
} }
func (w *NotificationWorker) reduceNotificationRetry(ctx context.Context, tx *sql.Tx, event *notification.RetryRequestedEvent) (err error) { func (w *NotificationWorker) reduceNotificationRetry(ctx, txCtx context.Context, tx *sql.Tx, event *notification.RetryRequestedEvent) (err error) {
ctx = ContextWithNotifier(ctx, event.Aggregate()) ctx = ContextWithNotifier(ctx, event.Aggregate())
// if the notification is too old, we can directly cancel // if the notification is too old, we can directly cancel
if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) { if event.CreatedAt().Add(w.config.MaxTtl).Before(w.now()) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err)
} }
if event.CreatedAt().Add(event.BackOff).After(w.now()) { if event.CreatedAt().Add(event.BackOff).After(w.now()) {
return nil return nil
} }
err = w.sendNotification(ctx, tx, event.Request, event.NotifyUser, event) err = w.sendNotification(ctx, txCtx, tx, event.Request, event.NotifyUser, event)
if err == nil { if err == nil {
return nil return nil
} }
// if the max attempts are reached or if the error explicitly specifies, we cancel the notification // if the max attempts are reached or if the error explicitly specifies, we cancel the notification
if event.Sequence() >= uint64(w.config.MaxAttempts) || errors.Is(err, &channels.CancelError{}) { if event.Sequence() >= uint64(w.config.MaxAttempts) || errors.Is(err, &channels.CancelError{}) {
return w.commands.NotificationCanceled(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err) return w.commands.NotificationCanceled(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, err)
} }
// otherwise we retry after a backoff delay // otherwise we retry after a backoff delay
return w.commands.NotificationRetryRequested(ctx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, notificationEventToRequest( return w.commands.NotificationRetryRequested(txCtx, tx, event.Aggregate().ID, event.Aggregate().ResourceOwner, notificationEventToRequest(
event.Request, event.Request,
event.NotifyUser, event.NotifyUser,
w.backOff(event.BackOff), w.backOff(event.BackOff),
), err) ), err)
} }
func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error { func (w *NotificationWorker) sendNotification(ctx, txCtx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error {
ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin) ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin)
if err != nil { if err != nil {
err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err) err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err)
@@ -233,7 +234,7 @@ func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, r
if err := notify(request.URLTemplate, args, request.MessageType, request.UnverifiedNotificationChannel); err != nil { if err := notify(request.URLTemplate, args, request.MessageType, request.UnverifiedNotificationChannel); err != nil {
return err return err
} }
err = w.commands.NotificationSent(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner) err = w.commands.NotificationSent(txCtx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner)
if err != nil { if err != nil {
// In case the notification event cannot be pushed, we most likely cannot create a retry or cancel event. // In case the notification event cannot be pushed, we most likely cannot create a retry or cancel event.
// Therefore, we'll only log the error and also do not need to try to push to the user / session. // Therefore, we'll only log the error and also do not need to try to push to the user / session.
@@ -241,7 +242,7 @@ func (w *NotificationWorker) sendNotification(ctx context.Context, tx *sql.Tx, r
OnError(err).Error("could not set sent notification event") OnError(err).Error("could not set sent notification event")
return nil return nil
} }
err = sentHandler(ctx, w.commands, request.NotificationAggregateID(), request.NotificationAggregateResourceOwner(), generatorInfo, args) err = sentHandler(txCtx, w.commands, request.NotificationAggregateID(), request.NotificationAggregateResourceOwner(), generatorInfo, args)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID). logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID).
OnError(err).Error("could not set notification event on aggregate") OnError(err).Error("could not set notification event on aggregate")
return nil return nil
@@ -363,7 +364,7 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo
err = database.CloseTransaction(tx, err) err = database.CloseTransaction(tx, err)
}() }()
events, err := w.searchEvents(ctx, tx, retry) events, err := w.searchEvents(txCtx, tx, retry)
if err != nil { if err != nil {
return err return err
} }
@@ -382,11 +383,11 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo
var err error var err error
switch e := event.(type) { switch e := event.(type) {
case *notification.RequestedEvent: case *notification.RequestedEvent:
w.createSavepoint(ctx, tx, event, workerID, retry) w.createSavepoint(txCtx, tx, event, workerID, retry)
err = w.reduceNotificationRequested(ctx, tx, e) err = w.reduceNotificationRequested(ctx, txCtx, tx, e)
case *notification.RetryRequestedEvent: case *notification.RetryRequestedEvent:
w.createSavepoint(ctx, tx, event, workerID, retry) w.createSavepoint(txCtx, tx, event, workerID, retry)
err = w.reduceNotificationRetry(ctx, tx, e) err = w.reduceNotificationRetry(ctx, txCtx, tx, e)
} }
if err != nil { if err != nil {
w.log(workerID, retry).OnError(err). w.log(workerID, retry).OnError(err).
@@ -394,8 +395,10 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo
WithField("notificationID", event.Aggregate().ID). WithField("notificationID", event.Aggregate().ID).
WithField("sequence", event.Sequence()). WithField("sequence", event.Sequence()).
WithField("type", event.Type()). WithField("type", event.Type()).
Error("could not push notification event") Error("could not handle notification event")
w.rollbackToSavepoint(ctx, tx, event, workerID, retry) // if we have an error, we rollback to the savepoint and continue with the next event
// we use the txCtx to make sure we can rollback the transaction in case the ctx is canceled
w.rollbackToSavepoint(txCtx, tx, event, workerID, retry)
} }
} }
return nil return nil

View File

@@ -420,6 +420,7 @@ func Test_userNotifier_reduceNotificationRequested(t *testing.T) {
commands := mock.NewMockCommands(ctrl) commands := mock.NewMockCommands(ctrl)
f, a, w := tt.test(ctrl, queries, commands) f, a, w := tt.test(ctrl, queries, commands)
err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRequested( err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRequested(
authz.WithInstanceID(context.Background(), instanceID),
authz.WithInstanceID(context.Background(), instanceID), authz.WithInstanceID(context.Background(), instanceID),
&sql.Tx{}, &sql.Tx{},
a.event.(*notification.RequestedEvent)) a.event.(*notification.RequestedEvent))
@@ -798,9 +799,11 @@ func Test_userNotifier_reduceNotificationRetry(t *testing.T) {
commands := mock.NewMockCommands(ctrl) commands := mock.NewMockCommands(ctrl)
f, a, w := tt.test(ctrl, queries, commands) f, a, w := tt.test(ctrl, queries, commands)
err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRetry( err := newNotificationWorker(t, ctrl, queries, f, a, w).reduceNotificationRetry(
authz.WithInstanceID(context.Background(), instanceID),
authz.WithInstanceID(context.Background(), instanceID), authz.WithInstanceID(context.Background(), instanceID),
&sql.Tx{}, &sql.Tx{},
a.event.(*notification.RetryRequestedEvent)) a.event.(*notification.RetryRequestedEvent),
)
if w.err != nil { if w.err != nil {
w.err(t, err) w.err(t, err)
} else { } else {