improve cancel and context handling and fix latestRetries

This commit is contained in:
Livio Spring
2024-12-02 07:13:51 +01:00
parent 7109fa9004
commit b034ec12f8

View File

@@ -178,19 +178,14 @@ func (w *NotificationWorker) reduceNotificationRetry(ctx, txCtx context.Context,
func (w *NotificationWorker) sendNotification(ctx, txCtx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error { func (w *NotificationWorker) sendNotification(ctx, txCtx context.Context, tx *sql.Tx, request notification.Request, notifyUser *query.NotifyUser, e eventstore.Event) error {
ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin) ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin)
if err != nil { if err != nil {
err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err) return channels.NewCancelError(err)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID).
OnError(err).Error("could not cancel notification")
return nil
} }
// check early that a "sent" handler exists, otherwise we can cancel early // check early that a "sent" handler exists, otherwise we can cancel early
sentHandler, ok := sentHandlers[request.EventType] sentHandler, ok := sentHandlers[request.EventType]
if !ok { if !ok {
err := w.commands.NotificationCanceled(ctx, tx, e.Aggregate().ID, e.Aggregate().ResourceOwner, err) logging.Errorf(`no "sent" handler registered for %s`, request.EventType)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "notification", e.Aggregate().ID). return channels.NewCancelError(err)
OnError(err).Errorf(`no "sent" handler registered for %s`, request.EventType)
return nil
} }
var code string var code string
@@ -400,11 +395,15 @@ func (w *NotificationWorker) trigger(ctx context.Context, workerID int, retry bo
// we use the txCtx to make sure we can rollback the transaction in case the ctx is canceled // we use the txCtx to make sure we can rollback the transaction in case the ctx is canceled
w.rollbackToSavepoint(txCtx, tx, event, workerID, retry) w.rollbackToSavepoint(txCtx, tx, event, workerID, retry)
} }
// if the context is canceled, we stop the processing
if ctx.Err() != nil {
return nil
}
} }
return nil return nil
} }
func (w *NotificationWorker) latestRetries(events []eventstore.Event) { func (w *NotificationWorker) latestRetries(events []eventstore.Event) []eventstore.Event {
for i := len(events) - 1; i > 0; i-- { for i := len(events) - 1; i > 0; i-- {
// since we delete during the iteration, we need to make sure we don't panic // since we delete during the iteration, we need to make sure we don't panic
if len(events) <= i { if len(events) <= i {
@@ -416,6 +415,7 @@ func (w *NotificationWorker) latestRetries(events []eventstore.Event) {
e.Sequence() < events[i].Sequence() e.Sequence() < events[i].Sequence()
}) })
} }
return events
} }
func (w *NotificationWorker) createSavepoint(ctx context.Context, tx *sql.Tx, event eventstore.Event, workerID int, retry bool) { func (w *NotificationWorker) createSavepoint(ctx context.Context, tx *sql.Tx, event eventstore.Event, workerID int, retry bool) {
@@ -479,8 +479,7 @@ func (w *NotificationWorker) searchRetryEvents(ctx context.Context, tx *sql.Tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.latestRetries(events) return w.latestRetries(events), nil
return events, nil
} }
type existingInstances []string type existingInstances []string