refactor(notification): use new queue package (#9360)

# Which Problems Are Solved

The recently introduced notification queue have potential race conditions.

# How the Problems Are Solved

Current code is refactored to use the queue package, which is safe in
regards of concurrency.

# Additional Changes

- the queue is included in startup
- improved code quality of queue

# Additional Context

- closes https://github.com/zitadel/zitadel/issues/9278
This commit is contained in:
Silvan
2025-02-27 11:49:12 +01:00
committed by GitHub
parent 83614562a2
commit 444f682e25
45 changed files with 1936 additions and 2818 deletions

View File

@@ -7,12 +7,13 @@ import (
http_util "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/api/ui/console"
"github.com/zitadel/zitadel/internal/api/ui/login"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
"github.com/zitadel/zitadel/internal/notification/senders"
"github.com/zitadel/zitadel/internal/notification/types"
"github.com/zitadel/zitadel/internal/queue"
"github.com/zitadel/zitadel/internal/repository/notification"
"github.com/zitadel/zitadel/internal/repository/session"
"github.com/zitadel/zitadel/internal/repository/user"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -86,9 +87,11 @@ const (
)
type userNotifier struct {
commands Commands
queries *NotificationQueries
otpEmailTmpl string
queue Queue
maxAttempts uint8
}
func NewUserNotifier(
@@ -98,15 +101,17 @@ func NewUserNotifier(
queries *NotificationQueries,
channels types.ChannelChains,
otpEmailTmpl string,
legacyMode bool,
workerConfig WorkerConfig,
queue Queue,
) *handler.Handler {
if legacyMode {
if workerConfig.LegacyEnabled {
return NewUserNotifierLegacy(ctx, config, commands, queries, channels, otpEmailTmpl)
}
return handler.NewHandler(ctx, &config, &userNotifier{
commands: commands,
queries: queries,
otpEmailTmpl: otpEmailTmpl,
queue: queue,
maxAttempts: workerConfig.MaxAttempts,
})
}
@@ -198,7 +203,6 @@ func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Sta
if !ok {
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
}
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
@@ -215,23 +219,26 @@ func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Sta
return err
}
origin := http_util.DomainContext(ctx).Origin()
return u.commands.RequestNotification(
ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.InitCodeMessageType,
).
WithURLTemplate(login.InitUserLinkTemplate(origin, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.AuthRequestID)).
WithCode(e.Code, e.Expiry).
WithArgs(&domain.NotificationArguments{
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.InitCodeMessageType,
Code: e.Code,
CodeExpiry: e.Expiry,
IsOTP: false,
UnverifiedNotificationChannel: true,
URLTemplate: login.InitUserLinkTemplate(origin, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.AuthRequestID),
Args: &domain.NotificationArguments{
AuthRequestID: e.AuthRequestID,
}).
WithUnverifiedChannel(),
},
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -262,23 +269,26 @@ func (u *userNotifier) reduceEmailCodeAdded(event eventstore.Event) (*handler.St
return err
}
origin := http_util.DomainContext(ctx).Origin()
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.VerifyEmailMessageType,
).
WithURLTemplate(u.emailCodeTemplate(origin, e)).
WithCode(e.Code, e.Expiry).
WithArgs(&domain.NotificationArguments{
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.VerifyEmailMessageType,
Code: e.Code,
CodeExpiry: e.Expiry,
IsOTP: false,
UnverifiedNotificationChannel: true,
URLTemplate: u.emailCodeTemplate(origin, e),
Args: &domain.NotificationArguments{
AuthRequestID: e.AuthRequestID,
}).
WithUnverifiedChannel(),
},
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -315,22 +325,26 @@ func (u *userNotifier) reducePasswordCodeAdded(event eventstore.Event) (*handler
return err
}
origin := http_util.DomainContext(ctx).Origin()
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
e.NotificationType,
domain.PasswordResetMessageType,
).
WithURLTemplate(u.passwordCodeTemplate(origin, e)).
WithCode(e.Code, e.Expiry).
WithArgs(&domain.NotificationArguments{
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: e.NotificationType,
MessageType: domain.PasswordResetMessageType,
Code: e.Code,
CodeExpiry: e.Expiry,
IsOTP: false,
UnverifiedNotificationChannel: true,
URLTemplate: u.passwordCodeTemplate(origin, e),
Args: &domain.NotificationArguments{
AuthRequestID: e.AuthRequestID,
}).
WithUnverifiedChannel(),
},
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -363,19 +377,22 @@ func (u *userNotifier) reduceOTPSMSCodeAdded(event eventstore.Event) (*handler.S
if err != nil {
return err
}
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
http_util.DomainContext(ctx).Origin(),
e.EventType,
domain.NotificationTypeSms,
domain.VerifySMSOTPMessageType,
).
WithCode(e.Code, e.Expiry).
WithArgs(otpArgs(ctx, e.Expiry)).
WithOTP(),
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
EventType: e.EventType,
NotificationType: domain.NotificationTypeSms,
MessageType: domain.VerifySMSOTPMessageType,
Code: e.Code,
CodeExpiry: e.Expiry,
IsOTP: true,
Args: otpArgs(ctx, e.Expiry),
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -412,20 +429,22 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
args := otpArgs(ctx, e.Expiry)
args.SessionID = e.Aggregate().ID
return u.commands.RequestNotification(ctx,
s.UserFactor.ResourceOwner,
command.NewNotificationRequest(
s.UserFactor.UserID,
s.UserFactor.ResourceOwner,
http_util.DomainContext(ctx).Origin(),
e.EventType,
domain.NotificationTypeSms,
domain.VerifySMSOTPMessageType,
).
WithAggregate(e.Aggregate().ID, e.Aggregate().ResourceOwner).
WithCode(e.Code, e.Expiry).
WithOTP().
WithArgs(args),
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: s.UserFactor.UserID,
UserResourceOwner: s.UserFactor.ResourceOwner,
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
EventType: e.EventType,
NotificationType: domain.NotificationTypeSms,
MessageType: domain.VerifySMSOTPMessageType,
Code: e.Code,
CodeExpiry: e.Expiry,
IsOTP: true,
Args: args,
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -459,20 +478,23 @@ func (u *userNotifier) reduceOTPEmailCodeAdded(event eventstore.Event) (*handler
}
args := otpArgs(ctx, e.Expiry)
args.AuthRequestID = authRequestID
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.VerifyEmailOTPMessageType,
).
WithURLTemplate(login.OTPLinkTemplate(origin, authRequestID, domain.MFATypeOTPEmail)).
WithCode(e.Code, e.Expiry).
WithOTP().
WithArgs(args),
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.VerifyEmailOTPMessageType,
Code: e.Code,
CodeExpiry: e.Expiry,
IsOTP: true,
URLTemplate: login.OTPLinkTemplate(origin, authRequestID, domain.MFATypeOTPEmail),
Args: args,
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -509,21 +531,23 @@ func (u *userNotifier) reduceSessionOTPEmailChallenged(event eventstore.Event) (
args := otpArgs(ctx, e.Expiry)
args.SessionID = e.Aggregate().ID
return u.commands.RequestNotification(ctx,
s.UserFactor.ResourceOwner,
command.NewNotificationRequest(
s.UserFactor.UserID,
s.UserFactor.ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.VerifyEmailOTPMessageType,
).
WithAggregate(e.Aggregate().ID, e.Aggregate().ResourceOwner).
WithURLTemplate(u.otpEmailTemplate(origin, e)).
WithCode(e.Code, e.Expiry).
WithOTP().
WithArgs(args),
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: s.UserFactor.UserID,
UserResourceOwner: s.UserFactor.ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.VerifyEmailOTPMessageType,
Code: e.Code,
CodeExpiry: e.Expiry,
IsOTP: true,
URLTemplate: u.otpEmailTemplate(origin, e),
Args: args,
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -564,22 +588,24 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta
return err
}
origin := http_util.DomainContext(ctx).Origin()
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.DomainClaimedMessageType,
).
WithURLTemplate(login.LoginLink(origin, e.Aggregate().ResourceOwner)).
WithUnverifiedChannel().
WithPreviousDomain().
WithArgs(&domain.NotificationArguments{
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.DomainClaimedMessageType,
URLTemplate: login.LoginLink(origin, e.Aggregate().ResourceOwner),
UnverifiedNotificationChannel: true,
Args: &domain.NotificationArguments{
TempUsername: e.UserName,
}),
},
RequiresPreviousDomain: true,
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -607,21 +633,24 @@ func (u *userNotifier) reducePasswordlessCodeRequested(event eventstore.Event) (
return err
}
origin := http_util.DomainContext(ctx).Origin()
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.PasswordlessRegistrationMessageType,
).
WithURLTemplate(u.passwordlessCodeTemplate(origin, e)).
WithCode(e.Code, e.Expiry).
WithArgs(&domain.NotificationArguments{
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.PasswordlessRegistrationMessageType,
URLTemplate: u.passwordlessCodeTemplate(origin, e),
Args: &domain.NotificationArguments{
CodeID: e.ID,
}),
},
CodeExpiry: e.Expiry,
Code: e.Code,
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -664,18 +693,20 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S
}
origin := http_util.DomainContext(ctx).Origin()
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.PasswordChangeMessageType,
).
WithURLTemplate(console.LoginHintLink(origin, "{{.PreferredLoginName}}")).
WithUnverifiedChannel(),
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.PasswordChangeMessageType,
URLTemplate: console.LoginHintLink(origin, "{{.PreferredLoginName}}"),
UnverifiedNotificationChannel: true,
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -706,21 +737,24 @@ func (u *userNotifier) reducePhoneCodeAdded(event eventstore.Event) (*handler.St
return err
}
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
http_util.DomainContext(ctx).Origin(),
e.EventType,
domain.NotificationTypeSms,
domain.VerifyPhoneMessageType,
).
WithCode(e.Code, e.Expiry).
WithUnverifiedChannel().
WithArgs(&domain.NotificationArguments{
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
EventType: e.EventType,
NotificationType: domain.NotificationTypeSms,
MessageType: domain.VerifyPhoneMessageType,
CodeExpiry: e.Expiry,
Code: e.Code,
UnverifiedNotificationChannel: true,
Args: &domain.NotificationArguments{
Domain: http_util.DomainContext(ctx).RequestedDomain(),
}),
},
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}
@@ -755,23 +789,26 @@ func (u *userNotifier) reduceInviteCodeAdded(event eventstore.Event) (*handler.S
if applicationName == "" {
applicationName = "ZITADEL"
}
return u.commands.RequestNotification(ctx,
e.Aggregate().ResourceOwner,
command.NewNotificationRequest(
e.Aggregate().ID,
e.Aggregate().ResourceOwner,
origin,
e.EventType,
domain.NotificationTypeEmail,
domain.InviteUserMessageType,
).
WithURLTemplate(u.inviteCodeTemplate(origin, e)).
WithCode(e.Code, e.Expiry).
WithUnverifiedChannel().
WithArgs(&domain.NotificationArguments{
return u.queue.Insert(ctx,
&notification.Request{
Aggregate: e.Aggregate(),
UserID: e.Aggregate().ID,
UserResourceOwner: e.Aggregate().ResourceOwner,
TriggeredAtOrigin: origin,
EventType: e.EventType,
NotificationType: domain.NotificationTypeEmail,
MessageType: domain.InviteUserMessageType,
CodeExpiry: e.Expiry,
Code: e.Code,
UnverifiedNotificationChannel: true,
URLTemplate: u.inviteCodeTemplate(origin, e),
Args: &domain.NotificationArguments{
AuthRequestID: e.AuthRequestID,
ApplicationName: applicationName,
}),
},
},
queue.WithQueueName(notification.QueueName),
queue.WithMaxAttempts(u.maxAttempts),
)
}), nil
}