mirror of
https://github.com/zitadel/zitadel.git
synced 2025-07-19 14:28:35 +00:00

This PR summarizes multiple changes specifically only available with ZITADEL v3: - feat: Web Keys management (https://github.com/zitadel/zitadel/pull/9526) - fix(cmd): ensure proper working of mirror (https://github.com/zitadel/zitadel/pull/9509) - feat(Authz): system user support for permission check v2 (https://github.com/zitadel/zitadel/pull/9640) - chore(license): change from Apache to AGPL (https://github.com/zitadel/zitadel/pull/9597) - feat(console): list v2 sessions (https://github.com/zitadel/zitadel/pull/9539) - fix(console): add loginV2 feature flag (https://github.com/zitadel/zitadel/pull/9682) - fix(feature flags): allow reading "own" flags (https://github.com/zitadel/zitadel/pull/9649) - feat(console): add Actions V2 UI (https://github.com/zitadel/zitadel/pull/9591) BREAKING CHANGE - feat(webkey): migrate to v2beta API (https://github.com/zitadel/zitadel/pull/9445) - chore!: remove CockroachDB Support (https://github.com/zitadel/zitadel/pull/9444) - feat(actions): migrate to v2beta API (https://github.com/zitadel/zitadel/pull/9489) --------- Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com> Co-authored-by: Ramon <mail@conblem.me> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Kenta Yamaguchi <56732734+KEY60228@users.noreply.github.com> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Livio Spring <livio@zitadel.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Iraq <66622793+kkrime@users.noreply.github.com> Co-authored-by: Florian Forster <florian@zitadel.com> Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Max Peintner <peintnerm@gmail.com>
187 lines
5.9 KiB
Go
187 lines
5.9 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/riverqueue/river"
|
|
"github.com/zitadel/logging"
|
|
|
|
"github.com/zitadel/zitadel/internal/api/authz"
|
|
"github.com/zitadel/zitadel/internal/crypto"
|
|
"github.com/zitadel/zitadel/internal/domain"
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
"github.com/zitadel/zitadel/internal/notification/channels"
|
|
"github.com/zitadel/zitadel/internal/notification/senders"
|
|
"github.com/zitadel/zitadel/internal/notification/types"
|
|
"github.com/zitadel/zitadel/internal/query"
|
|
"github.com/zitadel/zitadel/internal/repository/notification"
|
|
)
|
|
|
|
const (
|
|
Code = "Code"
|
|
OTP = "OTP"
|
|
)
|
|
|
|
type NotificationWorker struct {
|
|
river.WorkerDefaults[*notification.Request]
|
|
|
|
commands Commands
|
|
queries *NotificationQueries
|
|
channels types.ChannelChains
|
|
config WorkerConfig
|
|
now nowFunc
|
|
}
|
|
|
|
// Timeout implements the Timeout-function of [river.Worker].
|
|
// Maximum time a job can run before the context gets cancelled.
|
|
func (w *NotificationWorker) Timeout(*river.Job[*notification.Request]) time.Duration {
|
|
return w.config.TransactionDuration
|
|
}
|
|
|
|
// Work implements [river.Worker].
|
|
func (w *NotificationWorker) Work(ctx context.Context, job *river.Job[*notification.Request]) error {
|
|
ctx = ContextWithNotifier(ctx, job.Args.Aggregate)
|
|
|
|
// if the notification is too old, we can directly cancel
|
|
if job.CreatedAt.Add(w.config.MaxTtl).Before(w.now()) {
|
|
return river.JobCancel(errors.New("notification is too old"))
|
|
}
|
|
|
|
// We do not trigger the projection to reduce load on the database. By the time the notification is processed,
|
|
// the user should be projected anyway. If not, it will just wait for the next run.
|
|
// We are aware that the user can change during the time the notification is in the queue.
|
|
notifyUser, err := w.queries.GetNotifyUserByID(ctx, false, job.Args.UserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// The domain claimed event requires the domain as argument, but lacks the user when creating the request event.
|
|
// Since we set it into the request arguments, it will be passed into a potential retry event.
|
|
if job.Args.RequiresPreviousDomain && job.Args.Args != nil && job.Args.Args.Domain == "" {
|
|
index := strings.LastIndex(notifyUser.LastEmail, "@")
|
|
job.Args.Args.Domain = notifyUser.LastEmail[index+1:]
|
|
}
|
|
|
|
err = w.sendNotificationQueue(ctx, job.Args, strconv.Itoa(int(job.ID)), notifyUser)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
// if the error explicitly specifies, we cancel the notification
|
|
if errors.Is(err, &channels.CancelError{}) {
|
|
return river.JobCancel(err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
type WorkerConfig struct {
|
|
LegacyEnabled bool
|
|
Workers uint8
|
|
TransactionDuration time.Duration
|
|
MaxTtl time.Duration
|
|
MaxAttempts uint8
|
|
}
|
|
|
|
// nowFunc makes [time.Now] mockable
|
|
type nowFunc func() time.Time
|
|
|
|
type Sent func(ctx context.Context, commands Commands, id, orgID string, generatorInfo *senders.CodeGeneratorInfo, args map[string]any) error
|
|
|
|
var sentHandlers map[eventstore.EventType]Sent
|
|
|
|
func RegisterSentHandler(eventType eventstore.EventType, sent Sent) {
|
|
if sentHandlers == nil {
|
|
sentHandlers = make(map[eventstore.EventType]Sent)
|
|
}
|
|
sentHandlers[eventType] = sent
|
|
}
|
|
|
|
func NewNotificationWorker(
|
|
config WorkerConfig,
|
|
commands Commands,
|
|
queries *NotificationQueries,
|
|
channels types.ChannelChains,
|
|
) *NotificationWorker {
|
|
return &NotificationWorker{
|
|
config: config,
|
|
commands: commands,
|
|
queries: queries,
|
|
channels: channels,
|
|
now: time.Now,
|
|
}
|
|
}
|
|
|
|
var _ river.Worker[*notification.Request] = (*NotificationWorker)(nil)
|
|
|
|
func (w *NotificationWorker) Register(workers *river.Workers, queues map[string]river.QueueConfig) {
|
|
river.AddWorker(workers, w)
|
|
queues[notification.QueueName] = river.QueueConfig{
|
|
MaxWorkers: int(w.config.Workers),
|
|
}
|
|
}
|
|
|
|
func (w *NotificationWorker) sendNotificationQueue(ctx context.Context, request *notification.Request, jobID string, notifyUser *query.NotifyUser) error {
|
|
// check early that a "sent" handler exists, otherwise we can cancel early
|
|
sentHandler, ok := sentHandlers[request.EventType]
|
|
if !ok {
|
|
logging.Errorf(`no "sent" handler registered for %s`, request.EventType)
|
|
return channels.NewCancelError(fmt.Errorf("no sent handler registered for %s", request.EventType))
|
|
}
|
|
|
|
ctx, err := enrichCtx(ctx, request.TriggeredAtOrigin)
|
|
if err != nil {
|
|
return channels.NewCancelError(err)
|
|
}
|
|
|
|
var code string
|
|
if request.Code != nil {
|
|
code, err = crypto.DecryptString(request.Code, w.queries.UserDataCrypto)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
colors, err := w.queries.ActiveLabelPolicyByOrg(ctx, request.UserResourceOwner, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
translator, err := w.queries.GetTranslatorWithOrgTexts(ctx, request.UserResourceOwner, request.MessageType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
generatorInfo := new(senders.CodeGeneratorInfo)
|
|
var notify types.Notify
|
|
switch request.NotificationType {
|
|
case domain.NotificationTypeEmail:
|
|
template, err := w.queries.MailTemplateByOrg(ctx, notifyUser.ResourceOwner, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
notify = types.SendEmail(ctx, w.channels, string(template.Template), translator, notifyUser, colors, request.EventType)
|
|
case domain.NotificationTypeSms:
|
|
notify = types.SendSMS(ctx, w.channels, translator, notifyUser, colors, request.EventType, request.Aggregate.InstanceID, jobID, generatorInfo)
|
|
}
|
|
|
|
args := request.Args.ToMap()
|
|
args[Code] = code
|
|
// existing notifications use `OTP` as argument for the code
|
|
if request.IsOTP {
|
|
args[OTP] = code
|
|
}
|
|
|
|
if err = notify(request.URLTemplate, args, request.MessageType, request.UnverifiedNotificationChannel); err != nil {
|
|
return err
|
|
}
|
|
|
|
err = sentHandler(authz.WithInstanceID(ctx, request.Aggregate.InstanceID), w.commands, request.Aggregate.ID, request.Aggregate.ResourceOwner, generatorInfo, args)
|
|
logging.WithFields("instanceID", request.Aggregate.InstanceID, "notification", request.Aggregate.ID).
|
|
OnError(err).Error("could not set notification event on aggregate")
|
|
return nil
|
|
}
|