mirror of
https://github.com/zitadel/zitadel.git
synced 2025-12-06 16:52:18 +00:00
## Which problems are solved
The execution of statements of projections did not have the context
present.
## How the problems were solved
Pass the context to the execute function
## Additional info
This change is required to use the repositories of the relational tables
in projections.
(cherry picked from commit 20e7807ee5)
832 lines
28 KiB
Go
832 lines
28 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
http_util "github.com/zitadel/zitadel/internal/api/http"
|
|
"github.com/zitadel/zitadel/internal/api/ui/console"
|
|
"github.com/zitadel/zitadel/internal/api/ui/login"
|
|
"github.com/zitadel/zitadel/internal/command"
|
|
"github.com/zitadel/zitadel/internal/domain"
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
|
"github.com/zitadel/zitadel/internal/notification/senders"
|
|
"github.com/zitadel/zitadel/internal/notification/types"
|
|
"github.com/zitadel/zitadel/internal/queue"
|
|
"github.com/zitadel/zitadel/internal/repository/notification"
|
|
"github.com/zitadel/zitadel/internal/repository/session"
|
|
"github.com/zitadel/zitadel/internal/repository/user"
|
|
"github.com/zitadel/zitadel/internal/zerrors"
|
|
)
|
|
|
|
func init() {
|
|
RegisterSentHandler(user.HumanInitialCodeAddedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, _ *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.HumanInitCodeSent(ctx, orgID, id)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanEmailCodeAddedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.HumanEmailVerificationCodeSent(ctx, orgID, id)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanPasswordCodeAddedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.PasswordCodeSent(ctx, orgID, id, generatorInfo)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanOTPSMSCodeAddedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.HumanOTPSMSCodeSent(ctx, id, orgID, generatorInfo)
|
|
},
|
|
)
|
|
RegisterSentHandler(session.OTPSMSChallengedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.OTPSMSSent(ctx, id, orgID, generatorInfo)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanOTPEmailCodeAddedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.HumanOTPEmailCodeSent(ctx, id, orgID)
|
|
},
|
|
)
|
|
RegisterSentHandler(session.OTPEmailChallengedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.OTPEmailSent(ctx, id, orgID)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.UserDomainClaimedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.UserDomainClaimedSent(ctx, orgID, id)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanPasswordlessInitCodeRequestedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.HumanPasswordlessInitCodeSent(ctx, id, orgID, args["CodeID"].(string))
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanPasswordChangedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.PasswordChangeSent(ctx, orgID, id)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanPhoneCodeAddedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.HumanPhoneVerificationCodeSent(ctx, orgID, id, generatorInfo)
|
|
},
|
|
)
|
|
RegisterSentHandler(user.HumanInviteCodeAddedType,
|
|
func(ctx context.Context, commands Commands, id, orgID string, _ *senders.CodeGeneratorInfo, args map[string]any) error {
|
|
return commands.InviteCodeSent(ctx, orgID, id)
|
|
},
|
|
)
|
|
}
|
|
|
|
const (
|
|
UserNotificationsProjectionTable = "projections.notifications"
|
|
)
|
|
|
|
type userNotifier struct {
|
|
queries *NotificationQueries
|
|
otpEmailTmpl string
|
|
|
|
queue Queue
|
|
maxAttempts uint8
|
|
}
|
|
|
|
func NewUserNotifier(
|
|
ctx context.Context,
|
|
config handler.Config,
|
|
commands Commands,
|
|
queries *NotificationQueries,
|
|
channels types.ChannelChains,
|
|
otpEmailTmpl string,
|
|
workerConfig WorkerConfig,
|
|
queue Queue,
|
|
) *handler.Handler {
|
|
if workerConfig.LegacyEnabled {
|
|
return NewUserNotifierLegacy(ctx, config, commands, queries, channels, otpEmailTmpl)
|
|
}
|
|
return handler.NewHandler(ctx, &config, &userNotifier{
|
|
queries: queries,
|
|
otpEmailTmpl: otpEmailTmpl,
|
|
queue: queue,
|
|
maxAttempts: workerConfig.MaxAttempts,
|
|
})
|
|
}
|
|
|
|
func (u *userNotifier) Name() string {
|
|
return UserNotificationsProjectionTable
|
|
}
|
|
|
|
func (u *userNotifier) Reducers() []handler.AggregateReducer {
|
|
return []handler.AggregateReducer{
|
|
{
|
|
Aggregate: user.AggregateType,
|
|
EventReducers: []handler.EventReducer{
|
|
{
|
|
Event: user.UserV1InitialCodeAddedType,
|
|
Reduce: u.reduceInitCodeAdded,
|
|
},
|
|
{
|
|
Event: user.HumanInitialCodeAddedType,
|
|
Reduce: u.reduceInitCodeAdded,
|
|
},
|
|
{
|
|
Event: user.UserV1EmailCodeAddedType,
|
|
Reduce: u.reduceEmailCodeAdded,
|
|
},
|
|
{
|
|
Event: user.HumanEmailCodeAddedType,
|
|
Reduce: u.reduceEmailCodeAdded,
|
|
},
|
|
{
|
|
Event: user.UserV1PasswordCodeAddedType,
|
|
Reduce: u.reducePasswordCodeAdded,
|
|
},
|
|
{
|
|
Event: user.HumanPasswordCodeAddedType,
|
|
Reduce: u.reducePasswordCodeAdded,
|
|
},
|
|
{
|
|
Event: user.UserDomainClaimedType,
|
|
Reduce: u.reduceDomainClaimed,
|
|
},
|
|
{
|
|
Event: user.HumanPasswordlessInitCodeRequestedType,
|
|
Reduce: u.reducePasswordlessCodeRequested,
|
|
},
|
|
{
|
|
Event: user.UserV1PhoneCodeAddedType,
|
|
Reduce: u.reducePhoneCodeAdded,
|
|
},
|
|
{
|
|
Event: user.HumanPhoneCodeAddedType,
|
|
Reduce: u.reducePhoneCodeAdded,
|
|
},
|
|
{
|
|
Event: user.HumanPasswordChangedType,
|
|
Reduce: u.reducePasswordChanged,
|
|
},
|
|
{
|
|
Event: user.HumanOTPSMSCodeAddedType,
|
|
Reduce: u.reduceOTPSMSCodeAdded,
|
|
},
|
|
{
|
|
Event: user.HumanOTPEmailCodeAddedType,
|
|
Reduce: u.reduceOTPEmailCodeAdded,
|
|
},
|
|
{
|
|
Event: user.HumanInviteCodeAddedType,
|
|
Reduce: u.reduceInviteCodeAdded,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Aggregate: session.AggregateType,
|
|
EventReducers: []handler.EventReducer{
|
|
{
|
|
Event: session.OTPSMSChallengedType,
|
|
Reduce: u.reduceSessionOTPSMSChallenged,
|
|
},
|
|
{
|
|
Event: session.OTPEmailChallengedType,
|
|
Reduce: u.reduceSessionOTPEmailChallenged,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanInitialCodeAddedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
|
|
}
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType,
|
|
user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.InitCodeMessageType,
|
|
Code: e.Code,
|
|
CodeExpiry: e.Expiry,
|
|
IsOTP: false,
|
|
UnverifiedNotificationChannel: true,
|
|
URLTemplate: login.InitUserLinkTemplate(origin, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.AuthRequestID),
|
|
Args: &domain.NotificationArguments{
|
|
AuthRequestID: e.AuthRequestID,
|
|
},
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) reduceEmailCodeAdded(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanEmailCodeAddedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-SWf3g", "reduce.wrong.event.type %s", user.HumanEmailCodeAddedType)
|
|
}
|
|
|
|
if e.CodeReturned {
|
|
return handler.NewNoOpStatement(e), nil
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType,
|
|
user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.VerifyEmailMessageType,
|
|
Code: e.Code,
|
|
CodeExpiry: e.Expiry,
|
|
IsOTP: false,
|
|
UnverifiedNotificationChannel: true,
|
|
URLTemplate: u.emailCodeTemplate(origin, e),
|
|
Args: &domain.NotificationArguments{
|
|
AuthRequestID: e.AuthRequestID,
|
|
},
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) emailCodeTemplate(origin string, e *user.HumanEmailCodeAddedEvent) string {
|
|
if e.URLTemplate != "" {
|
|
return e.URLTemplate
|
|
}
|
|
return login.MailVerificationLinkTemplate(origin, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.AuthRequestID)
|
|
}
|
|
|
|
func (u *userNotifier) reducePasswordCodeAdded(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanPasswordCodeAddedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Eeg3s", "reduce.wrong.event.type %s", user.HumanPasswordCodeAddedType)
|
|
}
|
|
if e.CodeReturned {
|
|
return handler.NewNoOpStatement(e), nil
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType,
|
|
user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: e.NotificationType,
|
|
MessageType: domain.PasswordResetMessageType,
|
|
Code: e.Code,
|
|
CodeExpiry: e.Expiry,
|
|
IsOTP: false,
|
|
UnverifiedNotificationChannel: true,
|
|
URLTemplate: u.passwordCodeTemplate(origin, e),
|
|
Args: &domain.NotificationArguments{
|
|
AuthRequestID: e.AuthRequestID,
|
|
},
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) passwordCodeTemplate(origin string, e *user.HumanPasswordCodeAddedEvent) string {
|
|
if e.URLTemplate != "" {
|
|
return e.URLTemplate
|
|
}
|
|
return login.InitPasswordLinkTemplate(origin, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.AuthRequestID)
|
|
}
|
|
|
|
func (u *userNotifier) reduceOTPSMSCodeAdded(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanOTPSMSCodeAddedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-ASF3g", "reduce.wrong.event.type %s", user.HumanOTPSMSCodeAddedType)
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
user.HumanOTPSMSCodeAddedType,
|
|
user.HumanOTPSMSCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeSms,
|
|
MessageType: domain.VerifySMSOTPMessageType,
|
|
Code: e.Code,
|
|
CodeExpiry: e.Expiry,
|
|
IsOTP: true,
|
|
Args: otpArgs(ctx, e.Expiry),
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*session.OTPSMSChallengedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Sk32L", "reduce.wrong.event.type %s", session.OTPSMSChallengedType)
|
|
}
|
|
if e.CodeReturned {
|
|
return handler.NewNoOpStatement(e), nil
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
session.OTPSMSChallengedType,
|
|
session.OTPSMSSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sessionWriteModel := command.NewSessionWriteModel(e.Aggregate().ID, e.Aggregate().InstanceID)
|
|
err = u.queries.es.FilterToQueryReducer(ctx, sessionWriteModel)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
args := otpArgs(ctx, e.Expiry)
|
|
args.SessionID = e.Aggregate().ID
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: sessionWriteModel.UserID,
|
|
UserResourceOwner: sessionWriteModel.UserResourceOwner,
|
|
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeSms,
|
|
MessageType: domain.VerifySMSOTPMessageType,
|
|
Code: e.Code,
|
|
CodeExpiry: e.Expiry,
|
|
IsOTP: true,
|
|
Args: args,
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) reduceOTPEmailCodeAdded(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanOTPEmailCodeAddedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-JL3hw", "reduce.wrong.event.type %s", user.HumanOTPEmailCodeAddedType)
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
user.HumanOTPEmailCodeAddedType,
|
|
user.HumanOTPEmailCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
var authRequestID string
|
|
if e.AuthRequestInfo != nil {
|
|
authRequestID = e.AuthRequestInfo.ID
|
|
}
|
|
args := otpArgs(ctx, e.Expiry)
|
|
args.AuthRequestID = authRequestID
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.VerifyEmailOTPMessageType,
|
|
Code: e.Code,
|
|
CodeExpiry: e.Expiry,
|
|
IsOTP: true,
|
|
URLTemplate: login.OTPLinkTemplate(origin, authRequestID, domain.MFATypeOTPEmail),
|
|
Args: args,
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) reduceSessionOTPEmailChallenged(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*session.OTPEmailChallengedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-zbsgt", "reduce.wrong.event.type %s", session.OTPEmailChallengedType)
|
|
}
|
|
if e.ReturnCode {
|
|
return handler.NewNoOpStatement(e), nil
|
|
}
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
session.OTPEmailChallengedType,
|
|
session.OTPEmailSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
|
|
args := otpArgs(ctx, e.Expiry)
|
|
args.SessionID = e.Aggregate().ID
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: s.UserFactor.UserID,
|
|
UserResourceOwner: s.UserFactor.ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.VerifyEmailOTPMessageType,
|
|
Code: e.Code,
|
|
CodeExpiry: e.Expiry,
|
|
IsOTP: true,
|
|
URLTemplate: u.otpEmailTemplate(origin, e),
|
|
Args: args,
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) otpEmailTemplate(origin string, e *session.OTPEmailChallengedEvent) string {
|
|
if e.URLTmpl != "" {
|
|
return e.URLTmpl
|
|
}
|
|
return origin + u.otpEmailTmpl
|
|
}
|
|
|
|
func otpArgs(ctx context.Context, expiry time.Duration) *domain.NotificationArguments {
|
|
domainCtx := http_util.DomainContext(ctx)
|
|
return &domain.NotificationArguments{
|
|
Origin: domainCtx.Origin(),
|
|
Domain: domainCtx.RequestedDomain(),
|
|
Expiry: expiry,
|
|
}
|
|
}
|
|
|
|
func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.DomainClaimedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
|
|
}
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
|
|
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.DomainClaimedMessageType,
|
|
URLTemplate: login.LoginLink(origin, e.Aggregate().ResourceOwner),
|
|
UnverifiedNotificationChannel: true,
|
|
Args: &domain.NotificationArguments{
|
|
TempUsername: e.UserName,
|
|
},
|
|
RequiresPreviousDomain: true,
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) reducePasswordlessCodeRequested(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanPasswordlessInitCodeRequestedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EDtjd", "reduce.wrong.event.type %s", user.HumanPasswordlessInitCodeAddedType)
|
|
}
|
|
if e.CodeReturned {
|
|
return handler.NewNoOpStatement(e), nil
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.PasswordlessRegistrationMessageType,
|
|
URLTemplate: u.passwordlessCodeTemplate(origin, e),
|
|
Args: &domain.NotificationArguments{
|
|
CodeID: e.ID,
|
|
},
|
|
CodeExpiry: e.Expiry,
|
|
Code: e.Code,
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) passwordlessCodeTemplate(origin string, e *user.HumanPasswordlessInitCodeRequestedEvent) string {
|
|
if e.URLTemplate != "" {
|
|
return e.URLTemplate
|
|
}
|
|
return domain.PasswordlessInitCodeLinkTemplate(origin+login.HandlerPrefix+login.EndpointPasswordlessRegistration, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.ID)
|
|
}
|
|
|
|
func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanPasswordChangedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
|
|
notificationPolicy, err := u.queries.NotificationPolicyByOrg(ctx, true, e.Aggregate().ResourceOwner, false)
|
|
if err != nil && !zerrors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
|
|
if !notificationPolicy.PasswordChange {
|
|
return nil
|
|
}
|
|
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.PasswordChangeMessageType,
|
|
URLTemplate: console.LoginHintLink(origin, "{{.PreferredLoginName}}"),
|
|
UnverifiedNotificationChannel: true,
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) reducePhoneCodeAdded(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanPhoneCodeAddedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-He83g", "reduce.wrong.event.type %s", user.HumanPhoneCodeAddedType)
|
|
}
|
|
if e.CodeReturned {
|
|
return handler.NewNoOpStatement(e), nil
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType,
|
|
user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeSms,
|
|
MessageType: domain.VerifyPhoneMessageType,
|
|
CodeExpiry: e.Expiry,
|
|
Code: e.Code,
|
|
UnverifiedNotificationChannel: true,
|
|
Args: &domain.NotificationArguments{
|
|
Domain: http_util.DomainContext(ctx).RequestedDomain(),
|
|
},
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) reduceInviteCodeAdded(event eventstore.Event) (*handler.Statement, error) {
|
|
e, ok := event.(*user.HumanInviteCodeAddedEvent)
|
|
if !ok {
|
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Eeg3s", "reduce.wrong.event.type %s", user.HumanInviteCodeAddedType)
|
|
}
|
|
if e.CodeReturned {
|
|
return handler.NewNoOpStatement(e), nil
|
|
}
|
|
|
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
|
ctx = HandlerContext(ctx, event.Aggregate())
|
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
|
user.HumanInviteCodeAddedType, user.HumanInviteCodeSentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyHandled {
|
|
return nil
|
|
}
|
|
|
|
ctx, err = u.queries.Origin(ctx, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
origin := http_util.DomainContext(ctx).Origin()
|
|
|
|
applicationName := e.ApplicationName
|
|
if applicationName == "" {
|
|
applicationName = "ZITADEL"
|
|
}
|
|
return u.queue.Insert(ctx,
|
|
¬ification.Request{
|
|
Aggregate: e.Aggregate(),
|
|
UserID: e.Aggregate().ID,
|
|
UserResourceOwner: e.Aggregate().ResourceOwner,
|
|
TriggeredAtOrigin: origin,
|
|
EventType: e.EventType,
|
|
NotificationType: domain.NotificationTypeEmail,
|
|
MessageType: domain.InviteUserMessageType,
|
|
CodeExpiry: e.Expiry,
|
|
Code: e.Code,
|
|
UnverifiedNotificationChannel: true,
|
|
URLTemplate: u.inviteCodeTemplate(origin, e),
|
|
Args: &domain.NotificationArguments{
|
|
AuthRequestID: e.AuthRequestID,
|
|
ApplicationName: applicationName,
|
|
},
|
|
},
|
|
queue.WithQueueName(notification.QueueName),
|
|
queue.WithMaxAttempts(u.maxAttempts),
|
|
)
|
|
}), nil
|
|
}
|
|
|
|
func (u *userNotifier) inviteCodeTemplate(origin string, e *user.HumanInviteCodeAddedEvent) string {
|
|
if e.URLTemplate != "" {
|
|
return e.URLTemplate
|
|
}
|
|
return login.InviteUserLinkTemplate(origin, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.AuthRequestID)
|
|
}
|
|
|
|
func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, event eventstore.Event, expiry time.Duration, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
|
|
if expiry > 0 && event.CreatedAt().Add(expiry).Before(time.Now().UTC()) {
|
|
return true, nil
|
|
}
|
|
return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...)
|
|
}
|