diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 0798c7dd22..3b2265d10e 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -171,6 +171,14 @@ Projections: Notifications: # As notification projections don't result in database statements, retries don't have an effect MaxFailureCount: 0 + # The NotificationsQuotas projection is used for calling quota webhooks + NotificationsQuotas: + # Delivery guarantee requirements are probably higher for quota webhooks + HandleInactiveInstances: true + # As quota notification projections don't result in database statements, retries don't have an effect + MaxFailureCount: 0 + # Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much. + RequeueEvery: 300s Auth: SearchLimit: 1000 diff --git a/cmd/start/start.go b/cmd/start/start.go index 10c60303ad..f01060bef4 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -167,7 +167,7 @@ func startZitadel(config *Config, masterKey string) error { } actions.SetLogstoreService(actionsLogstoreSvc) - notification.Start(ctx, config.Projections.Customizations["notifications"], config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS) + notification.Start(ctx, config.Projections.Customizations["notifications"], config.Projections.Customizations["notificationsquotas"], config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS) router := mux.NewRouter() tlsConfig, err := config.TLS.Config() diff --git a/docs/docs/self-hosting/manage/production.md b/docs/docs/self-hosting/manage/production.md index 211a419e08..5fbd7465ef 100644 --- a/docs/docs/self-hosting/manage/production.md +++ b/docs/docs/self-hosting/manage/production.md @@ -106,6 +106,8 @@ Projections: HandleInactiveInstances: true # As quota notification projections don't result in database statements, retries don't have an effect MaxFailureCount: 0 + # Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much. + RequeueEvery: 300s ``` ### Manage your Data diff --git a/e2e/config/host.docker.internal/zitadel.yaml b/e2e/config/host.docker.internal/zitadel.yaml index 70f98b7797..12979c9fe6 100644 --- a/e2e/config/host.docker.internal/zitadel.yaml +++ b/e2e/config/host.docker.internal/zitadel.yaml @@ -1,3 +1,6 @@ +Log: + Level: debug + ExternalDomain: host.docker.internal ExternalSecure: false @@ -32,6 +35,11 @@ Quotas: ExhaustedCookieKey: "zitadel.quota.limiting" ExhaustedCookieMaxAge: "60s" +Projections: + Customizations: + NotificationsQuotas: + RequeueEvery: 1s + DefaultInstance: LoginPolicy: MfaInitSkipLifetime: "0" diff --git a/e2e/config/localhost/docker-compose.yaml b/e2e/config/localhost/docker-compose.yaml index 24f4ff8a99..6471c367b4 100644 --- a/e2e/config/localhost/docker-compose.yaml +++ b/e2e/config/localhost/docker-compose.yaml @@ -27,4 +27,5 @@ services: retries: 5 start_period: '20s' ports: - - "26257:26257" \ No newline at end of file + - "26257:26257" + - "9090:9090" \ No newline at end of file diff --git a/e2e/config/localhost/zitadel.yaml b/e2e/config/localhost/zitadel.yaml index fc1c062460..7ba4182d92 100644 --- a/e2e/config/localhost/zitadel.yaml +++ b/e2e/config/localhost/zitadel.yaml @@ -1,3 +1,6 @@ +Log: + Level: debug + ExternalDomain: localhost ExternalSecure: false @@ -32,6 +35,11 @@ Quotas: ExhaustedCookieKey: "zitadel.quota.limiting" ExhaustedCookieMaxAge: "60s" +Projections: + Customizations: + NotificationsQuotas: + RequeueEvery: 1s + DefaultInstance: LoginPolicy: MfaInitSkipLifetime: "0" diff --git a/e2e/cypress.config.ts b/e2e/cypress.config.ts index 60dd5ee2be..720b429fc7 100644 --- a/e2e/cypress.config.ts +++ b/e2e/cypress.config.ts @@ -34,8 +34,8 @@ YkTaa1AFLstnf348ZjuvBN3USUYZo3X3mxnS+uluVuRSGwIKsN0a -----END RSA PRIVATE KEY-----` let tokensCache = new Map() - let webhookEvents = new Array() +let failWebhookEventsCount = 0 export default defineConfig({ reporter: 'mochawesome', @@ -98,10 +98,15 @@ export default defineConfig({ }, resetWebhookEvents() { webhookEvents = [] + failWebhookEventsCount = 0 return null }, handledWebhookEvents(){ return webhookEvents + }, + failWebhookEvents(count: number){ + failWebhookEventsCount = count + return null } }) }, @@ -127,11 +132,17 @@ function startWebhookEventHandler() { req.on("data", (chunk) => { chunks.push(chunk); }); + const sendStatus = failWebhookEventsCount ? 500 : 200 req.on("end", () => { - webhookEvents.push(JSON.parse(Buffer.concat(chunks).toString())); + webhookEvents.push({ + sentStatus: sendStatus, + payload: JSON.parse(Buffer.concat(chunks).toString()) + }); }); - - res.writeHead(200); + if (failWebhookEventsCount > 0){ + failWebhookEventsCount-- + } + res.writeHead(sendStatus); res.end() }); diff --git a/e2e/cypress/e2e/quotas/quotas.cy.ts b/e2e/cypress/e2e/quotas/quotas.cy.ts index 408752f829..14e5c50da9 100644 --- a/e2e/cypress/e2e/quotas/quotas.cy.ts +++ b/e2e/cypress/e2e/quotas/quotas.cy.ts @@ -2,6 +2,7 @@ import { addQuota, ensureQuotaIsAdded, ensureQuotaIsRemoved, removeQuota, Unit } import { createHumanUser, ensureUserDoesntExist } from 'support/api/users'; import { Context } from 'support/commands'; import { ZITADELWebhookEvent } from 'support/types'; +import { textChangeRangeIsUnchanged } from 'typescript'; beforeEach(() => { cy.context().as('ctx'); @@ -144,7 +145,7 @@ describe('quotas', () => { const amount = 100; const percent = 10; - const usage = 25; + const usage = 35; describe('without repetition', () => { beforeEach(() => { @@ -160,7 +161,7 @@ describe('quotas', () => { }); }); - it('fires once with the expected payload', () => { + it('fires at least once with the expected payload', () => { cy.get>('@authenticatedUrls').then((urls) => { cy.get('@ctx').then((ctx) => { for (let i = 0; i < usage; i++) { @@ -175,19 +176,71 @@ describe('quotas', () => { }); cy.waitUntil(() => cy.task>('handledWebhookEvents').then((events) => { - if (events.length != 1) { + if (events.length < 1) { return false; } return Cypress._.matches({ - callURL: callURL, - threshold: percent, - unit: 1, - usage: percent, + sentStatus: 200, + payload: { + callURL: callURL, + threshold: percent, + unit: 1, + usage: percent, + }, })(events[0]); }), ); }); }); + + it('fires until the webhook returns a successful message', () => { + cy.task('failWebhookEvents', 8); + cy.get>('@authenticatedUrls').then((urls) => { + cy.get('@ctx').then((ctx) => { + for (let i = 0; i < usage; i++) { + cy.request({ + url: urls[0], + method: 'GET', + auth: { + bearer: ctx.api.token, + }, + }); + } + }); + cy.waitUntil( + () => + cy.task>('handledWebhookEvents').then((events) => { + if (events.length != 9) { + return false; + } + return events.reduce((a, b, i) => { + return !a + ? a + : i < 8 + ? Cypress._.matches({ + sentStatus: 500, + payload: { + callURL: callURL, + threshold: percent, + unit: 1, + usage: percent, + }, + })(b) + : Cypress._.matches({ + sentStatus: 200, + payload: { + callURL: callURL, + threshold: percent, + unit: 1, + usage: percent, + }, + })(b); + }, true); + }), + { timeout: 60_000 }, + ); + }); + }); }); describe('with repetition', () => { @@ -222,23 +275,25 @@ describe('quotas', () => { }); cy.waitUntil(() => cy.task>('handledWebhookEvents').then((events) => { - if (events.length != 1) { - return false; - } + let foundExpected = 0; for (let i = 0; i < events.length; i++) { - const threshold = percent * (i + 1); - if ( - !Cypress._.matches({ - callURL: callURL, - threshold: threshold, - unit: 1, - usage: threshold, - })(events[i]) - ) { - return false; + for (let expect = 10; expect <= 30; expect += 10) { + if ( + Cypress._.matches({ + sentStatus: 200, + payload: { + callURL: callURL, + threshold: expect, + unit: 1, + usage: expect, + }, + })(events[i]) + ) { + foundExpected++; + } } } - return true; + return foundExpected >= 3; }), ); }); diff --git a/e2e/cypress/support/types.ts b/e2e/cypress/support/types.ts index 8fb29f5389..d2a88ee8c1 100644 --- a/e2e/cypress/support/types.ts +++ b/e2e/cypress/support/types.ts @@ -1,10 +1,13 @@ let webhookEventSchema = { - unit: 0, - id: '', - callURL: '', - periodStart: new Date(), - threshold: 0, - usage: 0, + sentStatus: 0, + payload: { + unit: 0, + id: '', + callURL: '', + periodStart: new Date(), + threshold: 0, + usage: 0, + }, }; export type ZITADELWebhookEvent = typeof webhookEventSchema; diff --git a/internal/command/quota_report.go b/internal/command/quota_report.go index 5228a7a4de..b18b9e9b51 100644 --- a/internal/command/quota_report.go +++ b/internal/command/quota_report.go @@ -1,58 +1,31 @@ package command import ( - "bytes" "context" - "encoding/json" - "fmt" - "net/http" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/repository/quota" ) // ReportUsage calls notification hooks and emits the notified events -func (c *Commands) ReportUsage(ctx context.Context, dueNotifications []*quota.NotifiedEvent) error { - for _, notification := range dueNotifications { - - if err := notify(ctx, notification); err != nil { - if err != nil { - return err - } - } - - if _, err := c.eventstore.Push(ctx, notification); err != nil { - return err - } +func (c *Commands) ReportUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error { + cmds := make([]eventstore.Command, len(dueNotifications)) + for idx, notification := range dueNotifications { + cmds[idx] = notification } - - return nil + _, err := c.eventstore.Push(ctx, cmds...) + return err } -func notify(ctx context.Context, notification *quota.NotifiedEvent) error { - payload, err := json.Marshal(notification) +func (c *Commands) UsageNotificationSent(ctx context.Context, dueEvent *quota.NotificationDueEvent) error { + id, err := c.idGenerator.Next() if err != nil { return err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, notification.CallURL, bytes.NewReader(payload)) - if err != nil { - return err - } - - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - - if err = resp.Body.Close(); err != nil { - return err - } - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("calling url %s returned %s", notification.CallURL, resp.Status) - } - - return nil + _, err = c.eventstore.Push( + ctx, + quota.NewNotifiedEvent(ctx, id, dueEvent), + ) + return err } diff --git a/internal/logstore/quotaqueriers/mock/noop.go b/internal/logstore/quotaqueriers/mock/noop.go index fd3b0aa814..4f4335077b 100644 --- a/internal/logstore/quotaqueriers/mock/noop.go +++ b/internal/logstore/quotaqueriers/mock/noop.go @@ -23,6 +23,6 @@ func (i *inmemReporter) GetCurrentQuotaPeriod(context.Context, string, quota.Uni return i.config, i.startPeriod, nil } -func (*inmemReporter) GetDueQuotaNotifications(context.Context, *quota.AddedEvent, time.Time, uint64) ([]*quota.NotifiedEvent, error) { +func (*inmemReporter) GetDueQuotaNotifications(context.Context, *quota.AddedEvent, time.Time, uint64) ([]*quota.NotificationDueEvent, error) { return nil, nil } diff --git a/internal/logstore/service.go b/internal/logstore/service.go index 80c459dc9d..542b50dd54 100644 --- a/internal/logstore/service.go +++ b/internal/logstore/service.go @@ -15,7 +15,7 @@ const handleThresholdTimeout = time.Minute type QuotaQuerier interface { GetCurrentQuotaPeriod(ctx context.Context, instanceID string, unit quota.Unit) (config *quota.AddedEvent, periodStart time.Time, err error) - GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, used uint64) ([]*quota.NotifiedEvent, error) + GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, used uint64) ([]*quota.NotificationDueEvent, error) } type UsageQuerier interface { @@ -25,12 +25,12 @@ type UsageQuerier interface { } type UsageReporter interface { - Report(ctx context.Context, notifications []*quota.NotifiedEvent) (err error) + Report(ctx context.Context, notifications []*quota.NotificationDueEvent) (err error) } -type UsageReporterFunc func(context.Context, []*quota.NotifiedEvent) (err error) +type UsageReporterFunc func(context.Context, []*quota.NotificationDueEvent) (err error) -func (u UsageReporterFunc) Report(ctx context.Context, notifications []*quota.NotifiedEvent) (err error) { +func (u UsageReporterFunc) Report(ctx context.Context, notifications []*quota.NotificationDueEvent) (err error) { return u(ctx, notifications) } diff --git a/internal/notification/channels/channel.go b/internal/notification/channels/channel.go index 41823e5274..5f4f8f4d88 100644 --- a/internal/notification/channels/channel.go +++ b/internal/notification/channels/channel.go @@ -1,7 +1,10 @@ package channels +import "github.com/zitadel/zitadel/internal/eventstore" + type Message interface { - GetContent() string + GetTriggeringEvent() eventstore.Event + GetContent() (string, error) } type NotificationChannel interface { diff --git a/internal/notification/channels/fs/channel.go b/internal/notification/channels/fs/channel.go index 61dffda873..723946c246 100644 --- a/internal/notification/channels/fs/channel.go +++ b/internal/notification/channels/fs/channel.go @@ -2,19 +2,16 @@ package fs import ( "fmt" - "io/ioutil" "os" "path/filepath" "sort" "strings" "time" + "github.com/k3a/html2text" "github.com/zitadel/logging" - caos_errors "github.com/zitadel/zitadel/internal/errors" - - "github.com/k3a/html2text" - + "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/notification/channels" "github.com/zitadel/zitadel/internal/notification/messages" ) @@ -29,7 +26,10 @@ func InitFSChannel(config Config) (channels.NotificationChannel, error) { return channels.HandleMessageFunc(func(message channels.Message) error { fileName := fmt.Sprintf("%d_", time.Now().Unix()) - content := message.GetContent() + content, err := message.GetContent() + if err != nil { + return err + } switch msg := message.(type) { case *messages.Email: recipients := make([]string, len(msg.Recipients)) @@ -41,10 +41,12 @@ func InitFSChannel(config Config) (channels.NotificationChannel, error) { } case *messages.SMS: fileName = fileName + "sms_to_" + msg.RecipientPhoneNumber + ".txt" + case *messages.JSON: + fileName = "message.json" default: - return caos_errors.ThrowUnimplementedf(nil, "NOTIF-6f9a1", "filesystem provider doesn't support message type %T", message) + return errors.ThrowUnimplementedf(nil, "NOTIF-6f9a1", "filesystem provider doesn't support message type %T", message) } - return ioutil.WriteFile(filepath.Join(config.Path, fileName), []byte(content), 0666) + return os.WriteFile(filepath.Join(config.Path, fileName), []byte(content), 0666) }), nil } diff --git a/internal/notification/channels/instrumenting/instrument.go b/internal/notification/channels/instrumenting/instrument.go new file mode 100644 index 0000000000..e638d934dd --- /dev/null +++ b/internal/notification/channels/instrumenting/instrument.go @@ -0,0 +1,26 @@ +package instrumenting + +import ( + "context" + + "github.com/zitadel/zitadel/internal/notification/channels" +) + +func Wrap( + ctx context.Context, + channel channels.NotificationChannel, + traceSpanName, + successMetricName, + failureMetricName string, +) channels.NotificationChannel { + return traceMessages( + ctx, + countMessages( + ctx, + logMessages(ctx, channel), + successMetricName, + failureMetricName, + ), + traceSpanName, + ) +} diff --git a/internal/notification/channels/instrumenting/logging.go b/internal/notification/channels/instrumenting/logging.go new file mode 100644 index 0000000000..6904f7c263 --- /dev/null +++ b/internal/notification/channels/instrumenting/logging.go @@ -0,0 +1,24 @@ +package instrumenting + +import ( + "context" + + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/notification/channels" +) + +func logMessages(ctx context.Context, channel channels.NotificationChannel) channels.NotificationChannel { + return channels.HandleMessageFunc(func(message channels.Message) error { + logEntry := logging.WithFields( + "instance", authz.GetInstance(ctx).InstanceID(), + "triggering_event_type", message.GetTriggeringEvent().Type(), + ) + logEntry.Debug("sending notification") + err := channel.HandleMessage(message) + logEntry.OnError(err).Warn("sending notification failed") + logEntry.Debug("notification sent") + return err + }) +} diff --git a/internal/notification/channels/instrumenting/metrics.go b/internal/notification/channels/instrumenting/metrics.go new file mode 100644 index 0000000000..6b76c8f788 --- /dev/null +++ b/internal/notification/channels/instrumenting/metrics.go @@ -0,0 +1,36 @@ +package instrumenting + +import ( + "context" + + "github.com/zitadel/logging" + "go.opentelemetry.io/otel/attribute" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/notification/channels" + "github.com/zitadel/zitadel/internal/telemetry/metrics" +) + +func countMessages(ctx context.Context, channel channels.NotificationChannel, successMetricName, errorMetricName string) channels.NotificationChannel { + return channels.HandleMessageFunc(func(message channels.Message) error { + err := channel.HandleMessage(message) + metricName := successMetricName + if err != nil { + metricName = errorMetricName + } + addCount(ctx, metricName, message, err) + return err + }) +} + +func addCount(ctx context.Context, metricName string, message channels.Message, err error) { + labels := map[string]attribute.Value{ + "triggering_event_typey": attribute.StringValue(string(message.GetTriggeringEvent().Type())), + "instance": attribute.StringValue(authz.GetInstance(ctx).InstanceID()), + } + if err != nil { + labels["error"] = attribute.StringValue(err.Error()) + } + addCountErr := metrics.AddCount(ctx, metricName, 1, labels) + logging.WithFields("name", metricName, "labels", labels).OnError(addCountErr).Error("incrementing counter metric failed") +} diff --git a/internal/notification/channels/instrumenting/tracing.go b/internal/notification/channels/instrumenting/tracing.go new file mode 100644 index 0000000000..e5b49029b0 --- /dev/null +++ b/internal/notification/channels/instrumenting/tracing.go @@ -0,0 +1,16 @@ +package instrumenting + +import ( + "context" + + "github.com/zitadel/zitadel/internal/notification/channels" + "github.com/zitadel/zitadel/internal/telemetry/tracing" +) + +func traceMessages(ctx context.Context, channel channels.NotificationChannel, spanName string) channels.NotificationChannel { + return channels.HandleMessageFunc(func(message channels.Message) (err error) { + _, span := tracing.NewNamedSpan(ctx, spanName) + defer func() { span.EndWithError(err) }() + return channel.HandleMessage(message) + }) +} diff --git a/internal/notification/channels/log/channel.go b/internal/notification/channels/log/channel.go index e0f981d205..a435ca1b41 100644 --- a/internal/notification/channels/log/channel.go +++ b/internal/notification/channels/log/channel.go @@ -15,7 +15,10 @@ func InitStdoutChannel(config Config) channels.NotificationChannel { return channels.HandleMessageFunc(func(message channels.Message) error { - content := message.GetContent() + content, err := message.GetContent() + if err != nil { + return err + } if config.Compact { content = html2text.HTML2Text(content) } diff --git a/internal/notification/channels/smtp/channel.go b/internal/notification/channels/smtp/channel.go index 18a4634ebb..9b1b8ecf98 100644 --- a/internal/notification/channels/smtp/channel.go +++ b/internal/notification/channels/smtp/channel.go @@ -22,7 +22,7 @@ type Email struct { senderName string } -func InitSMTPChannel(ctx context.Context, getSMTPConfig func(ctx context.Context) (*Config, error)) (*Email, error) { +func InitChannel(ctx context.Context, getSMTPConfig func(ctx context.Context) (*Config, error)) (*Email, error) { smtpConfig, err := getSMTPConfig(ctx) if err != nil { return nil, err @@ -70,7 +70,12 @@ func (email *Email) HandleMessage(message channels.Message) error { return err } - _, err = w.Write([]byte(emailMsg.GetContent())) + content, err := emailMsg.GetContent() + if err != nil { + return err + } + + _, err = w.Write([]byte(content)) if err != nil { return err } @@ -80,7 +85,6 @@ func (email *Email) HandleMessage(message channels.Message) error { return err } - defer logging.LogWithFields("EMAI-a1c87ec8").Debug("email sent") return email.smtpClient.Quit() } @@ -154,6 +158,8 @@ func (smtpConfig SMTP) smtpAuth(client *smtp.Client, host string) error { // Auth auth := smtp.PlainAuth("", smtpConfig.User, smtpConfig.Password, host) err := client.Auth(auth) - logging.Log("EMAIL-s9kfs").WithField("smtp user", smtpConfig.User).OnError(err).Debug("could not add smtp auth") - return err + if err != nil { + return caos_errs.ThrowInternalf(err, "EMAIL-s9kfs", "could not add smtp auth for user %s", smtpConfig.User) + } + return nil } diff --git a/internal/notification/channels/twilio/channel.go b/internal/notification/channels/twilio/channel.go index 8c4a33e7e5..c25ceb842a 100644 --- a/internal/notification/channels/twilio/channel.go +++ b/internal/notification/channels/twilio/channel.go @@ -9,7 +9,7 @@ import ( "github.com/zitadel/zitadel/internal/notification/messages" ) -func InitTwilioChannel(config Config) channels.NotificationChannel { +func InitChannel(config Config) channels.NotificationChannel { client := twilio.NewClient(config.SID, config.Token, nil) logging.Debug("successfully initialized twilio sms channel") @@ -19,7 +19,11 @@ func InitTwilioChannel(config Config) channels.NotificationChannel { if !ok { return caos_errs.ThrowInternal(nil, "TWILI-s0pLc", "message is not SMS") } - m, err := client.Messages.SendMessage(twilioMsg.SenderPhoneNumber, twilioMsg.RecipientPhoneNumber, twilioMsg.GetContent(), nil) + content, err := twilioMsg.GetContent() + if err != nil { + return err + } + m, err := client.Messages.SendMessage(twilioMsg.SenderPhoneNumber, twilioMsg.RecipientPhoneNumber, content, nil) if err != nil { return caos_errs.ThrowInternal(err, "TWILI-osk3S", "could not send message") } diff --git a/internal/notification/channels/webhook/channel.go b/internal/notification/channels/webhook/channel.go new file mode 100644 index 0000000000..6dbed74eb8 --- /dev/null +++ b/internal/notification/channels/webhook/channel.go @@ -0,0 +1,60 @@ +package webhook + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/notification/channels" + "github.com/zitadel/zitadel/internal/notification/messages" +) + +func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + logging.Debug("successfully initialized webhook json channel") + return channels.HandleMessageFunc(func(message channels.Message) error { + + requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + msg, ok := message.(*messages.JSON) + if !ok { + return errors.ThrowInternal(nil, "WEBH-K686U", "message is not JSON") + } + payload, err := msg.GetContent() + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(requestCtx, cfg.Method, cfg.CallURL, strings.NewReader(payload)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if err = resp.Body.Close(); err != nil { + return err + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return errors.ThrowUnknown(fmt.Errorf("calling url %s returned %s", cfg.CallURL, resp.Status), "WEBH-LBxU0", "webhook didn't return a success status") + } + + logging.WithFields("calling_url", cfg.CallURL, "method", cfg.Method).Debug("webhook called") + return nil + }), nil +} diff --git a/internal/notification/channels/webhook/config.go b/internal/notification/channels/webhook/config.go new file mode 100644 index 0000000000..4af0c15402 --- /dev/null +++ b/internal/notification/channels/webhook/config.go @@ -0,0 +1,15 @@ +package webhook + +import ( + "net/url" +) + +type Config struct { + CallURL string + Method string +} + +func (w *Config) Validate() error { + _, err := url.Parse(w.CallURL) + return err +} diff --git a/internal/notification/handlers/already_handled.go b/internal/notification/handlers/already_handled.go new file mode 100644 index 0000000000..abf7eb901b --- /dev/null +++ b/internal/notification/handlers/already_handled.go @@ -0,0 +1,27 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/repository/user" +) + +func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) { + events, err := n.es.Filter( + ctx, + eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + InstanceID(event.Aggregate().InstanceID). + AddQuery(). + AggregateTypes(user.AggregateType). + AggregateIDs(event.Aggregate().ID). + SequenceGreater(event.Sequence()). + EventTypes(eventTypes...). + EventData(data). + Builder(), + ) + if err != nil { + return false, err + } + return len(events) > 0, nil +} diff --git a/internal/notification/handlers/config_filesystem.go b/internal/notification/handlers/config_filesystem.go new file mode 100644 index 0000000000..9f274057dd --- /dev/null +++ b/internal/notification/handlers/config_filesystem.go @@ -0,0 +1,21 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/notification/channels/fs" +) + +// GetFileSystemProvider reads the iam filesystem provider config +func (n *NotificationQueries) GetFileSystemProvider(ctx context.Context) (*fs.Config, error) { + config, err := n.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeFile) + if err != nil { + return nil, err + } + return &fs.Config{ + Compact: config.Compact, + Path: n.fileSystemPath, + }, nil +} diff --git a/internal/notification/handlers/config_log.go b/internal/notification/handlers/config_log.go new file mode 100644 index 0000000000..bf4cfde19e --- /dev/null +++ b/internal/notification/handlers/config_log.go @@ -0,0 +1,20 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/notification/channels/log" +) + +// GetLogProvider reads the iam log provider config +func (n *NotificationQueries) GetLogProvider(ctx context.Context) (*log.Config, error) { + config, err := n.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeLog) + if err != nil { + return nil, err + } + return &log.Config{ + Compact: config.Compact, + }, nil +} diff --git a/internal/notification/handlers/config_smtp.go b/internal/notification/handlers/config_smtp.go new file mode 100644 index 0000000000..898e9b659b --- /dev/null +++ b/internal/notification/handlers/config_smtp.go @@ -0,0 +1,31 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/crypto" + "github.com/zitadel/zitadel/internal/notification/channels/smtp" +) + +// GetSMTPConfig reads the iam SMTP provider config +func (n *NotificationQueries) GetSMTPConfig(ctx context.Context) (*smtp.Config, error) { + config, err := n.SMTPConfigByAggregateID(ctx, authz.GetInstance(ctx).InstanceID()) + if err != nil { + return nil, err + } + password, err := crypto.DecryptString(config.Password, n.SMTPPasswordCrypto) + if err != nil { + return nil, err + } + return &smtp.Config{ + From: config.SenderAddress, + FromName: config.SenderName, + Tls: config.TLS, + SMTP: smtp.SMTP{ + Host: config.Host, + User: config.User, + Password: password, + }, + }, nil +} diff --git a/internal/notification/handlers/config_twilio.go b/internal/notification/handlers/config_twilio.go new file mode 100644 index 0000000000..29a79415de --- /dev/null +++ b/internal/notification/handlers/config_twilio.go @@ -0,0 +1,35 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/zitadel/internal/crypto" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/notification/channels/twilio" + "github.com/zitadel/zitadel/internal/query" +) + +// GetTwilioConfig reads the iam Twilio provider config +func (n *NotificationQueries) GetTwilioConfig(ctx context.Context) (*twilio.Config, error) { + active, err := query.NewSMSProviderStateQuery(domain.SMSConfigStateActive) + if err != nil { + return nil, err + } + config, err := n.SMSProviderConfig(ctx, active) + if err != nil { + return nil, err + } + if config.TwilioConfig == nil { + return nil, errors.ThrowNotFound(nil, "HANDLER-8nfow", "Errors.SMS.Twilio.NotFound") + } + token, err := crypto.DecryptString(config.TwilioConfig.Token, n.SMSTokenCrypto) + if err != nil { + return nil, err + } + return &twilio.Config{ + SID: config.TwilioConfig.SID, + Token: token, + SenderNumber: config.TwilioConfig.SenderNumber, + }, nil +} diff --git a/internal/notification/handlers/ctx.go b/internal/notification/handlers/ctx.go new file mode 100644 index 0000000000..eb74e558a1 --- /dev/null +++ b/internal/notification/handlers/ctx.go @@ -0,0 +1,15 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/eventstore" +) + +const NotifyUserID = "NOTIFICATION" //TODO: system? + +func HandlerContext(event eventstore.Aggregate) context.Context { + ctx := authz.WithInstanceID(context.Background(), event.InstanceID) + return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner}) +} diff --git a/internal/notification/handlers/origin.go b/internal/notification/handlers/origin.go new file mode 100644 index 0000000000..2e8549a18b --- /dev/null +++ b/internal/notification/handlers/origin.go @@ -0,0 +1,28 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" + http_utils "github.com/zitadel/zitadel/internal/api/http" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/query" +) + +func (n *NotificationQueries) Origin(ctx context.Context) (context.Context, string, error) { + primary, err := query.NewInstanceDomainPrimarySearchQuery(true) + if err != nil { + return ctx, "", err + } + domains, err := n.SearchInstanceDomains(ctx, &query.InstanceDomainSearchQueries{ + Queries: []query.SearchQuery{primary}, + }) + if err != nil { + return ctx, "", err + } + if len(domains.Domains) < 1 { + return ctx, "", errors.ThrowInternal(nil, "NOTIF-Ef3r1", "Errors.Notification.NoDomain") + } + ctx = authz.WithRequestedDomain(ctx, domains.Domains[0].Domain) + return ctx, http_utils.BuildHTTP(domains.Domains[0].Domain, n.externalPort, n.externalSecure), nil +} diff --git a/internal/notification/handlers/queries.go b/internal/notification/handlers/queries.go new file mode 100644 index 0000000000..63e0ab4d41 --- /dev/null +++ b/internal/notification/handlers/queries.go @@ -0,0 +1,46 @@ +package handlers + +import ( + "net/http" + + "github.com/zitadel/zitadel/internal/crypto" + "github.com/zitadel/zitadel/internal/eventstore" + _ "github.com/zitadel/zitadel/internal/notification/statik" + "github.com/zitadel/zitadel/internal/query" +) + +type NotificationQueries struct { + *query.Queries + es *eventstore.Eventstore + externalPort uint16 + externalSecure bool + fileSystemPath string + UserDataCrypto crypto.EncryptionAlgorithm + SMTPPasswordCrypto crypto.EncryptionAlgorithm + SMSTokenCrypto crypto.EncryptionAlgorithm + statikDir http.FileSystem +} + +func NewNotificationQueries( + baseQueries *query.Queries, + es *eventstore.Eventstore, + externalPort uint16, + externalSecure bool, + fileSystemPath string, + userDataCrypto crypto.EncryptionAlgorithm, + smtpPasswordCrypto crypto.EncryptionAlgorithm, + smsTokenCrypto crypto.EncryptionAlgorithm, + statikDir http.FileSystem, +) *NotificationQueries { + return &NotificationQueries{ + Queries: baseQueries, + es: es, + externalPort: externalPort, + externalSecure: externalSecure, + fileSystemPath: fileSystemPath, + UserDataCrypto: userDataCrypto, + SMTPPasswordCrypto: smtpPasswordCrypto, + SMSTokenCrypto: smsTokenCrypto, + statikDir: statikDir, + } +} diff --git a/internal/notification/handlers/quotanotifier.go b/internal/notification/handlers/quotanotifier.go new file mode 100644 index 0000000000..e53998490a --- /dev/null +++ b/internal/notification/handlers/quotanotifier.go @@ -0,0 +1,99 @@ +package handlers + +import ( + "context" + "net/http" + + "github.com/zitadel/zitadel/internal/command" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" + "github.com/zitadel/zitadel/internal/notification/channels/webhook" + _ "github.com/zitadel/zitadel/internal/notification/statik" + "github.com/zitadel/zitadel/internal/notification/types" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/repository/quota" +) + +const ( + QuotaNotificationsProjectionTable = "projections.notifications_quota" +) + +type quotaNotifier struct { + crdb.StatementHandler + commands *command.Commands + queries *NotificationQueries + metricSuccessfulDeliveriesJSON string + metricFailedDeliveriesJSON string +} + +func NewQuotaNotifier( + ctx context.Context, + config crdb.StatementHandlerConfig, + commands *command.Commands, + queries *NotificationQueries, + metricSuccessfulDeliveriesJSON, + metricFailedDeliveriesJSON string, +) *quotaNotifier { + p := new(quotaNotifier) + config.ProjectionName = QuotaNotificationsProjectionTable + config.Reducers = p.reducers() + p.StatementHandler = crdb.NewStatementHandler(ctx, config) + p.commands = commands + p.queries = queries + p.metricSuccessfulDeliveriesJSON = metricSuccessfulDeliveriesJSON + p.metricFailedDeliveriesJSON = metricFailedDeliveriesJSON + projection.NotificationsQuotaProjection = p + return p +} + +func (u *quotaNotifier) reducers() []handler.AggregateReducer { + return []handler.AggregateReducer{ + { + Aggregate: quota.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: quota.NotificationDueEventType, + Reduce: u.reduceNotificationDue, + }, + }, + }, + } +} + +func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*quota.NotificationDueEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + err = types.SendJSON( + ctx, + webhook.Config{ + CallURL: e.CallURL, + Method: http.MethodPost, + }, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + e, + e, + u.metricSuccessfulDeliveriesJSON, + u.metricFailedDeliveriesJSON, + ).WithoutTemplate() + if err != nil { + return nil, err + } + err = u.commands.UsageNotificationSent(ctx, e) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} diff --git a/internal/notification/handlers/translator.go b/internal/notification/handlers/translator.go new file mode 100644 index 0000000000..627bb42a27 --- /dev/null +++ b/internal/notification/handlers/translator.go @@ -0,0 +1,39 @@ +package handlers + +import ( + "context" + + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/i18n" +) + +func (n *NotificationQueries) GetTranslatorWithOrgTexts(ctx context.Context, orgID, textType string) (*i18n.Translator, error) { + translator, err := i18n.NewTranslator(n.statikDir, n.GetDefaultLanguage(ctx), "") + if err != nil { + return nil, err + } + + allCustomTexts, err := n.CustomTextListByTemplate(ctx, authz.GetInstance(ctx).InstanceID(), textType, false) + if err != nil { + return translator, nil + } + customTexts, err := n.CustomTextListByTemplate(ctx, orgID, textType, false) + if err != nil { + return translator, nil + } + allCustomTexts.CustomTexts = append(allCustomTexts.CustomTexts, customTexts.CustomTexts...) + + for _, text := range allCustomTexts.CustomTexts { + msg := i18n.Message{ + ID: text.Template + "." + text.Key, + Text: text.Text, + } + err = translator.AddMessages(text.Language, msg) + logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "orgID", orgID, "messageType", textType, "messageID", msg.ID). + OnError(err). + Warn("could not add translation message") + } + return translator, nil +} diff --git a/internal/notification/handlers/usernotifier.go b/internal/notification/handlers/usernotifier.go new file mode 100644 index 0000000000..542e9287bc --- /dev/null +++ b/internal/notification/handlers/usernotifier.go @@ -0,0 +1,589 @@ +package handlers + +import ( + "context" + "time" + + "github.com/zitadel/zitadel/internal/command" + "github.com/zitadel/zitadel/internal/crypto" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" + "github.com/zitadel/zitadel/internal/notification/types" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/repository/user" +) + +const ( + UserNotificationsProjectionTable = "projections.notifications" +) + +type userNotifier struct { + crdb.StatementHandler + commands *command.Commands + queries *NotificationQueries + assetsPrefix func(context.Context) string + metricSuccessfulDeliveriesEmail, + metricFailedDeliveriesEmail, + metricSuccessfulDeliveriesSMS, + metricFailedDeliveriesSMS string +} + +func NewUserNotifier( + ctx context.Context, + config crdb.StatementHandlerConfig, + commands *command.Commands, + queries *NotificationQueries, + assetsPrefix func(context.Context) string, + metricSuccessfulDeliveriesEmail, + metricFailedDeliveriesEmail, + metricSuccessfulDeliveriesSMS, + metricFailedDeliveriesSMS string, +) *userNotifier { + p := new(userNotifier) + config.ProjectionName = UserNotificationsProjectionTable + config.Reducers = p.reducers() + p.StatementHandler = crdb.NewStatementHandler(ctx, config) + p.commands = commands + p.queries = queries + p.assetsPrefix = assetsPrefix + p.metricSuccessfulDeliveriesEmail = metricSuccessfulDeliveriesEmail + p.metricFailedDeliveriesEmail = metricFailedDeliveriesEmail + p.metricSuccessfulDeliveriesSMS = metricSuccessfulDeliveriesSMS + p.metricFailedDeliveriesSMS = metricFailedDeliveriesSMS + projection.NotificationsProjection = p + return p +} + +func (u *userNotifier) reducers() []handler.AggregateReducer { + return []handler.AggregateReducer{ + { + Aggregate: user.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: user.UserV1InitialCodeAddedType, + Reduce: u.reduceInitCodeAdded, + }, + { + Event: user.HumanInitialCodeAddedType, + Reduce: u.reduceInitCodeAdded, + }, + { + Event: user.UserV1EmailCodeAddedType, + Reduce: u.reduceEmailCodeAdded, + }, + { + Event: user.HumanEmailCodeAddedType, + Reduce: u.reduceEmailCodeAdded, + }, + { + Event: user.UserV1PasswordCodeAddedType, + Reduce: u.reducePasswordCodeAdded, + }, + { + Event: user.HumanPasswordCodeAddedType, + Reduce: u.reducePasswordCodeAdded, + }, + { + Event: user.UserDomainClaimedType, + Reduce: u.reduceDomainClaimed, + }, + { + Event: user.HumanPasswordlessInitCodeRequestedType, + Reduce: u.reducePasswordlessCodeRequested, + }, + { + Event: user.UserV1PhoneCodeAddedType, + Reduce: u.reducePhoneCodeAdded, + }, + { + Event: user.HumanPhoneCodeAddedType, + Reduce: u.reducePhoneCodeAdded, + }, + { + Event: user.HumanPasswordChangedType, + Reduce: u.reducePasswordChanged, + }, + }, + }, + } +} + +func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.HumanInitialCodeAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, + user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType, + user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto) + if err != nil { + return nil, err + } + colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) + if err != nil { + return nil, err + } + translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.InitCodeMessageType) + if err != nil { + return nil, err + } + + ctx, origin, err := u.queries.Origin(ctx) + if err != nil { + return nil, err + } + err = types.SendEmail( + ctx, + string(template.Template), + translator, + notifyUser, + u.queries.GetSMTPConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesEmail, + u.metricFailedDeliveriesEmail, + ).SendUserInitCode(notifyUser, origin, code) + if err != nil { + return nil, err + } + err = u.commands.HumanInitCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} + +func (u *userNotifier) reduceEmailCodeAdded(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.HumanEmailCodeAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-SWf3g", "reduce.wrong.event.type %s", user.HumanEmailCodeAddedType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, + user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType, + user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto) + if err != nil { + return nil, err + } + colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) + if err != nil { + return nil, err + } + translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyEmailMessageType) + if err != nil { + return nil, err + } + + ctx, origin, err := u.queries.Origin(ctx) + if err != nil { + return nil, err + } + err = types.SendEmail( + ctx, + string(template.Template), + translator, + notifyUser, + u.queries.GetSMTPConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesEmail, + u.metricFailedDeliveriesEmail, + ).SendEmailVerificationCode(notifyUser, origin, code) + if err != nil { + return nil, err + } + err = u.commands.HumanEmailVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} + +func (u *userNotifier) reducePasswordCodeAdded(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.HumanPasswordCodeAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Eeg3s", "reduce.wrong.event.type %s", user.HumanPasswordCodeAddedType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, + user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType, + user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto) + if err != nil { + return nil, err + } + colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) + if err != nil { + return nil, err + } + translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordResetMessageType) + if err != nil { + return nil, err + } + + ctx, origin, err := u.queries.Origin(ctx) + if err != nil { + return nil, err + } + notify := types.SendEmail( + ctx, + string(template.Template), + translator, + notifyUser, + u.queries.GetSMTPConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesEmail, + u.metricFailedDeliveriesEmail, + ) + if e.NotificationType == domain.NotificationTypeSms { + notify = types.SendSMSTwilio( + ctx, + translator, + notifyUser, + u.queries.GetTwilioConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesSMS, + u.metricFailedDeliveriesSMS, + ) + } + err = notify.SendPasswordCode(notifyUser, origin, code) + if err != nil { + return nil, err + } + err = u.commands.PasswordCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} + +func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.DomainClaimedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, + user.UserDomainClaimedType, user.UserDomainClaimedSentType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) + if err != nil { + return nil, err + } + translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.DomainClaimedMessageType) + if err != nil { + return nil, err + } + + ctx, origin, err := u.queries.Origin(ctx) + if err != nil { + return nil, err + } + err = types.SendEmail( + ctx, + string(template.Template), + translator, + notifyUser, + u.queries.GetSMTPConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesEmail, + u.metricFailedDeliveriesEmail, + ).SendDomainClaimed(notifyUser, origin, e.UserName) + if err != nil { + return nil, err + } + err = u.commands.UserDomainClaimedSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} + +func (u *userNotifier) reducePasswordlessCodeRequested(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.HumanPasswordlessInitCodeRequestedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EDtjd", "reduce.wrong.event.type %s", user.HumanPasswordlessInitCodeAddedType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto) + if err != nil { + return nil, err + } + colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) + if err != nil { + return nil, err + } + translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordlessRegistrationMessageType) + if err != nil { + return nil, err + } + + ctx, origin, err := u.queries.Origin(ctx) + if err != nil { + return nil, err + } + err = types.SendEmail( + ctx, + string(template.Template), + translator, + notifyUser, + u.queries.GetSMTPConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesEmail, + u.metricFailedDeliveriesEmail, + ).SendPasswordlessRegistrationLink(notifyUser, origin, code, e.ID) + if err != nil { + return nil, err + } + err = u.commands.HumanPasswordlessInitCodeSent(ctx, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.ID) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} + +func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.HumanPasswordChangedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + + notificationPolicy, err := u.queries.NotificationPolicyByOrg(ctx, true, e.Aggregate().ResourceOwner, false) + if errors.IsNotFound(err) { + return crdb.NewNoOpStatement(e), nil + } + if err != nil { + return nil, err + } + + if notificationPolicy.PasswordChange { + colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) + if err != nil { + return nil, err + } + translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordChangeMessageType) + if err != nil { + return nil, err + } + + ctx, origin, err := u.queries.Origin(ctx) + if err != nil { + return nil, err + } + err = types.SendEmail( + ctx, + string(template.Template), + translator, + notifyUser, + u.queries.GetSMTPConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesEmail, + u.metricFailedDeliveriesEmail, + ).SendPasswordChange(notifyUser, origin) + if err != nil { + return nil, err + } + err = u.commands.PasswordChangeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) + if err != nil { + return nil, err + } + } + return crdb.NewNoOpStatement(e), nil +} + +func (u *userNotifier) reducePhoneCodeAdded(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.HumanPhoneCodeAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-He83g", "reduce.wrong.event.type %s", user.HumanPhoneCodeAddedType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, + user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType, + user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto) + if err != nil { + return nil, err + } + colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) + if err != nil { + return nil, err + } + + notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) + if err != nil { + return nil, err + } + translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyPhoneMessageType) + if err != nil { + return nil, err + } + + ctx, origin, err := u.queries.Origin(ctx) + if err != nil { + return nil, err + } + err = types.SendSMSTwilio( + ctx, + translator, + notifyUser, + u.queries.GetTwilioConfig, + u.queries.GetFileSystemProvider, + u.queries.GetLogProvider, + colors, + u.assetsPrefix(ctx), + e, + u.metricSuccessfulDeliveriesSMS, + u.metricFailedDeliveriesSMS, + ).SendPhoneVerificationCode(notifyUser, origin, code) + if err != nil { + return nil, err + } + err = u.commands.HumanPhoneVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} + +func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, event eventstore.Event, expiry time.Duration, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) { + if event.CreationDate().Add(expiry).Before(time.Now().UTC()) { + return true, nil + } + return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...) +} diff --git a/internal/notification/messages/email.go b/internal/notification/messages/email.go index b0483dc568..f19196a297 100644 --- a/internal/notification/messages/email.go +++ b/internal/notification/messages/email.go @@ -5,6 +5,7 @@ import ( "regexp" "strings" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/notification/channels" ) @@ -16,16 +17,17 @@ var ( var _ channels.Message = (*Email)(nil) type Email struct { - Recipients []string - BCC []string - CC []string - SenderEmail string - SenderName string - Subject string - Content string + Recipients []string + BCC []string + CC []string + SenderEmail string + SenderName string + Subject string + Content string + TriggeringEvent eventstore.Event } -func (msg *Email) GetContent() string { +func (msg *Email) GetContent() (string, error) { headers := make(map[string]string) from := msg.SenderEmail if msg.SenderName != "" { @@ -49,7 +51,11 @@ func (msg *Email) GetContent() string { subject := "Subject: " + msg.Subject + lineBreak message += subject + mime + lineBreak + msg.Content - return message + return message, nil +} + +func (msg *Email) GetTriggeringEvent() eventstore.Event { + return msg.TriggeringEvent } func isHTML(input string) bool { diff --git a/internal/notification/messages/json.go b/internal/notification/messages/json.go new file mode 100644 index 0000000000..be092f430b --- /dev/null +++ b/internal/notification/messages/json.go @@ -0,0 +1,24 @@ +package messages + +import ( + "encoding/json" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/notification/channels" +) + +var _ channels.Message = (*JSON)(nil) + +type JSON struct { + Serializable interface{} + TriggeringEvent eventstore.Event +} + +func (msg *JSON) GetContent() (string, error) { + bytes, err := json.Marshal(msg.Serializable) + return string(bytes), err +} + +func (msg *JSON) GetTriggeringEvent() eventstore.Event { + return msg.TriggeringEvent +} diff --git a/internal/notification/messages/sms.go b/internal/notification/messages/sms.go index 4623b81fcd..72c377b337 100644 --- a/internal/notification/messages/sms.go +++ b/internal/notification/messages/sms.go @@ -1,6 +1,9 @@ package messages -import "github.com/zitadel/zitadel/internal/notification/channels" +import ( + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/notification/channels" +) var _ channels.Message = (*SMS)(nil) @@ -8,8 +11,13 @@ type SMS struct { SenderPhoneNumber string RecipientPhoneNumber string Content string + TriggeringEvent eventstore.Event } -func (msg *SMS) GetContent() string { - return msg.Content +func (msg *SMS) GetContent() (string, error) { + return msg.Content, nil +} + +func (msg *SMS) GetTriggeringEvent() eventstore.Event { + return msg.TriggeringEvent } diff --git a/internal/notification/projection.go b/internal/notification/projection.go deleted file mode 100644 index 5534dfa09c..0000000000 --- a/internal/notification/projection.go +++ /dev/null @@ -1,738 +0,0 @@ -package notification - -import ( - "context" - "net/http" - "time" - - statik_fs "github.com/rakyll/statik/fs" - "github.com/zitadel/logging" - - "github.com/zitadel/zitadel/internal/api/authz" - http_utils "github.com/zitadel/zitadel/internal/api/http" - "github.com/zitadel/zitadel/internal/command" - "github.com/zitadel/zitadel/internal/crypto" - "github.com/zitadel/zitadel/internal/domain" - "github.com/zitadel/zitadel/internal/errors" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/eventstore/handler" - "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" - "github.com/zitadel/zitadel/internal/i18n" - "github.com/zitadel/zitadel/internal/notification/channels/fs" - "github.com/zitadel/zitadel/internal/notification/channels/log" - "github.com/zitadel/zitadel/internal/notification/channels/smtp" - "github.com/zitadel/zitadel/internal/notification/channels/twilio" - _ "github.com/zitadel/zitadel/internal/notification/statik" - "github.com/zitadel/zitadel/internal/notification/types" - "github.com/zitadel/zitadel/internal/query" - "github.com/zitadel/zitadel/internal/query/projection" - "github.com/zitadel/zitadel/internal/repository/user" -) - -const ( - NotificationsProjectionTable = "projections.notifications" - NotifyUserID = "NOTIFICATION" //TODO: system? -) - -func Start(ctx context.Context, customConfig projection.CustomConfig, externalPort uint16, externalSecure bool, commands *command.Commands, queries *query.Queries, es *eventstore.Eventstore, assetsPrefix func(context.Context) string, fileSystemPath string, userEncryption, smtpEncryption, smsEncryption crypto.EncryptionAlgorithm) { - statikFS, err := statik_fs.NewWithNamespace("notification") - logging.OnError(err).Panic("unable to start listener") - - projection.NotificationsProjection = newNotificationsProjection(ctx, projection.ApplyCustomConfig(customConfig), commands, queries, es, userEncryption, smtpEncryption, smsEncryption, externalSecure, externalPort, fileSystemPath, assetsPrefix, statikFS) -} - -type notificationsProjection struct { - crdb.StatementHandler - commands *command.Commands - queries *query.Queries - es *eventstore.Eventstore - userDataCrypto crypto.EncryptionAlgorithm - smtpPasswordCrypto crypto.EncryptionAlgorithm - smsTokenCrypto crypto.EncryptionAlgorithm - assetsPrefix func(context.Context) string - fileSystemPath string - externalPort uint16 - externalSecure bool - statikDir http.FileSystem -} - -func newNotificationsProjection( - ctx context.Context, - config crdb.StatementHandlerConfig, - commands *command.Commands, - queries *query.Queries, - es *eventstore.Eventstore, - userDataCrypto, - smtpPasswordCrypto, - smsTokenCrypto crypto.EncryptionAlgorithm, - externalSecure bool, - externalPort uint16, - fileSystemPath string, - assetsPrefix func(context.Context) string, - statikDir http.FileSystem, -) *notificationsProjection { - p := new(notificationsProjection) - config.ProjectionName = NotificationsProjectionTable - config.Reducers = p.reducers() - p.StatementHandler = crdb.NewStatementHandler(ctx, config) - p.commands = commands - p.queries = queries - p.es = es - p.userDataCrypto = userDataCrypto - p.smtpPasswordCrypto = smtpPasswordCrypto - p.smsTokenCrypto = smsTokenCrypto - p.assetsPrefix = assetsPrefix - p.externalPort = externalPort - p.externalSecure = externalSecure - p.fileSystemPath = fileSystemPath - p.statikDir = statikDir - - // needs to be started here as it is not part of the projection.projections / projection.newProjectionsList() - p.Start() - return p -} - -func (p *notificationsProjection) reducers() []handler.AggregateReducer { - return []handler.AggregateReducer{ - { - Aggregate: user.AggregateType, - EventRedusers: []handler.EventReducer{ - { - Event: user.UserV1InitialCodeAddedType, - Reduce: p.reduceInitCodeAdded, - }, - { - Event: user.HumanInitialCodeAddedType, - Reduce: p.reduceInitCodeAdded, - }, - { - Event: user.UserV1EmailCodeAddedType, - Reduce: p.reduceEmailCodeAdded, - }, - { - Event: user.HumanEmailCodeAddedType, - Reduce: p.reduceEmailCodeAdded, - }, - { - Event: user.UserV1PasswordCodeAddedType, - Reduce: p.reducePasswordCodeAdded, - }, - { - Event: user.HumanPasswordCodeAddedType, - Reduce: p.reducePasswordCodeAdded, - }, - { - Event: user.UserDomainClaimedType, - Reduce: p.reduceDomainClaimed, - }, - { - Event: user.HumanPasswordlessInitCodeRequestedType, - Reduce: p.reducePasswordlessCodeRequested, - }, - { - Event: user.UserV1PhoneCodeAddedType, - Reduce: p.reducePhoneCodeAdded, - }, - { - Event: user.HumanPhoneCodeAddedType, - Reduce: p.reducePhoneCodeAdded, - }, - { - Event: user.HumanPasswordChangedType, - Reduce: p.reducePasswordChanged, - }, - }, - }, - } -} - -func (p *notificationsProjection) reduceInitCodeAdded(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*user.HumanInitialCodeAddedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType) - } - ctx := setNotificationContext(event.Aggregate()) - alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, - user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType, - user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType) - if err != nil { - return nil, err - } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil - } - code, err := crypto.DecryptString(e.Code, p.userDataCrypto) - if err != nil { - return nil, err - } - colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) - if err != nil { - return nil, err - } - translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.InitCodeMessageType) - if err != nil { - return nil, err - } - - ctx, origin, err := p.origin(ctx) - if err != nil { - return nil, err - } - err = types.SendEmail( - ctx, - string(template.Template), - translator, - notifyUser, - p.getSMTPConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ).SendUserInitCode(notifyUser, origin, code) - if err != nil { - return nil, err - } - err = p.commands.HumanInitCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) - if err != nil { - return nil, err - } - return crdb.NewNoOpStatement(e), nil -} - -func (p *notificationsProjection) reduceEmailCodeAdded(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*user.HumanEmailCodeAddedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-SWf3g", "reduce.wrong.event.type %s", user.HumanEmailCodeAddedType) - } - ctx := setNotificationContext(event.Aggregate()) - alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, - user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType, - user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType) - if err != nil { - return nil, err - } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil - } - code, err := crypto.DecryptString(e.Code, p.userDataCrypto) - if err != nil { - return nil, err - } - colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) - if err != nil { - return nil, err - } - translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyEmailMessageType) - if err != nil { - return nil, err - } - - ctx, origin, err := p.origin(ctx) - if err != nil { - return nil, err - } - err = types.SendEmail( - ctx, - string(template.Template), - translator, - notifyUser, - p.getSMTPConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ).SendEmailVerificationCode(notifyUser, origin, code) - if err != nil { - return nil, err - } - err = p.commands.HumanEmailVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) - if err != nil { - return nil, err - } - return crdb.NewNoOpStatement(e), nil -} - -func (p *notificationsProjection) reducePasswordCodeAdded(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*user.HumanPasswordCodeAddedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Eeg3s", "reduce.wrong.event.type %s", user.HumanPasswordCodeAddedType) - } - ctx := setNotificationContext(event.Aggregate()) - alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, - user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType, - user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType) - if err != nil { - return nil, err - } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil - } - code, err := crypto.DecryptString(e.Code, p.userDataCrypto) - if err != nil { - return nil, err - } - colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) - if err != nil { - return nil, err - } - translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordResetMessageType) - if err != nil { - return nil, err - } - - ctx, origin, err := p.origin(ctx) - if err != nil { - return nil, err - } - notify := types.SendEmail( - ctx, - string(template.Template), - translator, - notifyUser, - p.getSMTPConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ) - if e.NotificationType == domain.NotificationTypeSms { - notify = types.SendSMSTwilio( - ctx, - translator, - notifyUser, - p.getTwilioConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ) - } - err = notify.SendPasswordCode(notifyUser, origin, code) - if err != nil { - return nil, err - } - err = p.commands.PasswordCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) - if err != nil { - return nil, err - } - return crdb.NewNoOpStatement(e), nil -} - -func (p *notificationsProjection) reduceDomainClaimed(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*user.DomainClaimedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType) - } - ctx := setNotificationContext(event.Aggregate()) - alreadyHandled, err := p.checkIfAlreadyHandled(ctx, event, nil, - user.UserDomainClaimedType, user.UserDomainClaimedSentType) - if err != nil { - return nil, err - } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil - } - colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) - if err != nil { - return nil, err - } - translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.DomainClaimedMessageType) - if err != nil { - return nil, err - } - - ctx, origin, err := p.origin(ctx) - if err != nil { - return nil, err - } - err = types.SendEmail( - ctx, - string(template.Template), - translator, - notifyUser, - p.getSMTPConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ).SendDomainClaimed(notifyUser, origin, e.UserName) - if err != nil { - return nil, err - } - err = p.commands.UserDomainClaimedSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) - if err != nil { - return nil, err - } - return crdb.NewNoOpStatement(e), nil -} - -func (p *notificationsProjection) reducePasswordlessCodeRequested(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*user.HumanPasswordlessInitCodeRequestedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EDtjd", "reduce.wrong.event.type %s", user.HumanPasswordlessInitCodeAddedType) - } - ctx := setNotificationContext(event.Aggregate()) - alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType) - if err != nil { - return nil, err - } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil - } - code, err := crypto.DecryptString(e.Code, p.userDataCrypto) - if err != nil { - return nil, err - } - colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) - if err != nil { - return nil, err - } - translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordlessRegistrationMessageType) - if err != nil { - return nil, err - } - - ctx, origin, err := p.origin(ctx) - if err != nil { - return nil, err - } - err = types.SendEmail( - ctx, - string(template.Template), - translator, - notifyUser, - p.getSMTPConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ).SendPasswordlessRegistrationLink(notifyUser, origin, code, e.ID) - if err != nil { - return nil, err - } - err = p.commands.HumanPasswordlessInitCodeSent(ctx, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.ID) - if err != nil { - return nil, err - } - return crdb.NewNoOpStatement(e), nil -} - -func (p *notificationsProjection) reducePasswordChanged(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*user.HumanPasswordChangedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType) - } - ctx := setNotificationContext(event.Aggregate()) - alreadyHandled, err := p.checkIfAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType) - if err != nil { - return nil, err - } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil - } - - notificationPolicy, err := p.queries.NotificationPolicyByOrg(ctx, true, e.Aggregate().ResourceOwner, false) - if errors.IsNotFound(err) { - return crdb.NewNoOpStatement(e), nil - } - if err != nil { - return nil, err - } - - if notificationPolicy.PasswordChange { - colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) - if err != nil { - return nil, err - } - translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordChangeMessageType) - if err != nil { - return nil, err - } - - ctx, origin, err := p.origin(ctx) - if err != nil { - return nil, err - } - err = types.SendEmail( - ctx, - string(template.Template), - translator, - notifyUser, - p.getSMTPConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ).SendPasswordChange(notifyUser, origin) - if err != nil { - return nil, err - } - err = p.commands.PasswordChangeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) - if err != nil { - return nil, err - } - } - return crdb.NewNoOpStatement(e), nil -} - -func (p *notificationsProjection) reducePhoneCodeAdded(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*user.HumanPhoneCodeAddedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-He83g", "reduce.wrong.event.type %s", user.HumanPhoneCodeAddedType) - } - ctx := setNotificationContext(event.Aggregate()) - alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, - user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType, - user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType) - if err != nil { - return nil, err - } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil - } - code, err := crypto.DecryptString(e.Code, p.userDataCrypto) - if err != nil { - return nil, err - } - colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false) - if err != nil { - return nil, err - } - - notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false) - if err != nil { - return nil, err - } - translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyPhoneMessageType) - if err != nil { - return nil, err - } - - ctx, origin, err := p.origin(ctx) - if err != nil { - return nil, err - } - err = types.SendSMSTwilio( - ctx, - translator, - notifyUser, - p.getTwilioConfig, - p.getFileSystemProvider, - p.getLogProvider, - colors, - p.assetsPrefix(ctx), - ).SendPhoneVerificationCode(notifyUser, origin, code) - if err != nil { - return nil, err - } - err = p.commands.HumanPhoneVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID) - if err != nil { - return nil, err - } - return crdb.NewNoOpStatement(e), nil -} - -func (p *notificationsProjection) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, event eventstore.Event, expiry time.Duration, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) { - if event.CreationDate().Add(expiry).Before(time.Now().UTC()) { - return true, nil - } - return p.checkIfAlreadyHandled(ctx, event, data, eventTypes...) -} - -func (p *notificationsProjection) checkIfAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) { - events, err := p.es.Filter( - ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - InstanceID(event.Aggregate().InstanceID). - AddQuery(). - AggregateTypes(user.AggregateType). - AggregateIDs(event.Aggregate().ID). - SequenceGreater(event.Sequence()). - EventTypes(eventTypes...). - EventData(data). - Builder(), - ) - if err != nil { - return false, err - } - return len(events) > 0, nil -} -func (p *notificationsProjection) getSMTPConfig(ctx context.Context) (*smtp.Config, error) { - config, err := p.queries.SMTPConfigByAggregateID(ctx, authz.GetInstance(ctx).InstanceID()) - if err != nil { - return nil, err - } - password, err := crypto.DecryptString(config.Password, p.smtpPasswordCrypto) - if err != nil { - return nil, err - } - return &smtp.Config{ - From: config.SenderAddress, - FromName: config.SenderName, - Tls: config.TLS, - SMTP: smtp.SMTP{ - Host: config.Host, - User: config.User, - Password: password, - }, - }, nil -} - -// Read iam twilio config -func (p *notificationsProjection) getTwilioConfig(ctx context.Context) (*twilio.Config, error) { - active, err := query.NewSMSProviderStateQuery(domain.SMSConfigStateActive) - if err != nil { - return nil, err - } - config, err := p.queries.SMSProviderConfig(ctx, active) - if err != nil { - return nil, err - } - if config.TwilioConfig == nil { - return nil, errors.ThrowNotFound(nil, "HANDLER-8nfow", "Errors.SMS.Twilio.NotFound") - } - token, err := crypto.DecryptString(config.TwilioConfig.Token, p.smsTokenCrypto) - if err != nil { - return nil, err - } - return &twilio.Config{ - SID: config.TwilioConfig.SID, - Token: token, - SenderNumber: config.TwilioConfig.SenderNumber, - }, nil -} - -// Read iam filesystem provider config -func (p *notificationsProjection) getFileSystemProvider(ctx context.Context) (*fs.Config, error) { - config, err := p.queries.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeFile) - if err != nil { - return nil, err - } - return &fs.Config{ - Compact: config.Compact, - Path: p.fileSystemPath, - }, nil -} - -// Read iam log provider config -func (p *notificationsProjection) getLogProvider(ctx context.Context) (*log.Config, error) { - config, err := p.queries.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeLog) - if err != nil { - return nil, err - } - return &log.Config{ - Compact: config.Compact, - }, nil -} - -func (p *notificationsProjection) getTranslatorWithOrgTexts(ctx context.Context, orgID, textType string) (*i18n.Translator, error) { - translator, err := i18n.NewTranslator(p.statikDir, p.queries.GetDefaultLanguage(ctx), "") - if err != nil { - return nil, err - } - - allCustomTexts, err := p.queries.CustomTextListByTemplate(ctx, authz.GetInstance(ctx).InstanceID(), textType, false) - if err != nil { - return translator, nil - } - customTexts, err := p.queries.CustomTextListByTemplate(ctx, orgID, textType, false) - if err != nil { - return translator, nil - } - allCustomTexts.CustomTexts = append(allCustomTexts.CustomTexts, customTexts.CustomTexts...) - - for _, text := range allCustomTexts.CustomTexts { - msg := i18n.Message{ - ID: text.Template + "." + text.Key, - Text: text.Text, - } - err = translator.AddMessages(text.Language, msg) - logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "orgID", orgID, "messageType", textType, "messageID", msg.ID). - OnError(err). - Warn("could not add translation message") - } - return translator, nil -} - -func (p *notificationsProjection) origin(ctx context.Context) (context.Context, string, error) { - primary, err := query.NewInstanceDomainPrimarySearchQuery(true) - if err != nil { - return ctx, "", err - } - domains, err := p.queries.SearchInstanceDomains(ctx, &query.InstanceDomainSearchQueries{ - Queries: []query.SearchQuery{primary}, - }) - if err != nil { - return ctx, "", err - } - if len(domains.Domains) < 1 { - return ctx, "", errors.ThrowInternal(nil, "NOTIF-Ef3r1", "Errors.Notification.NoDomain") - } - ctx = authz.WithRequestedDomain(ctx, domains.Domains[0].Domain) - return ctx, http_utils.BuildHTTP(domains.Domains[0].Domain, p.externalPort, p.externalSecure), nil -} - -func setNotificationContext(event eventstore.Aggregate) context.Context { - ctx := authz.WithInstanceID(context.Background(), event.InstanceID) - return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner}) -} diff --git a/internal/notification/projections.go b/internal/notification/projections.go new file mode 100644 index 0000000000..f4abf3b7bc --- /dev/null +++ b/internal/notification/projections.go @@ -0,0 +1,77 @@ +package notification + +import ( + "context" + + statik_fs "github.com/rakyll/statik/fs" + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/command" + "github.com/zitadel/zitadel/internal/crypto" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/notification/handlers" + _ "github.com/zitadel/zitadel/internal/notification/statik" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/telemetry/metrics" +) + +const ( + metricSuccessfulDeliveriesEmail = "successful_deliveries_email" + metricFailedDeliveriesEmail = "failed_deliveries_email" + metricSuccessfulDeliveriesSMS = "successful_deliveries_sms" + metricFailedDeliveriesSMS = "failed_deliveries_sms" + metricSuccessfulDeliveriesJSON = "successful_deliveries_json" + metricFailedDeliveriesJSON = "failed_deliveries_json" +) + +func Start( + ctx context.Context, + userHandlerCustomConfig projection.CustomConfig, + quotaHandlerCustomConfig projection.CustomConfig, + externalPort uint16, + externalSecure bool, + commands *command.Commands, + queries *query.Queries, + es *eventstore.Eventstore, + assetsPrefix func(context.Context) string, + fileSystemPath string, + userEncryption, + smtpEncryption, + smsEncryption crypto.EncryptionAlgorithm, +) { + statikFS, err := statik_fs.NewWithNamespace("notification") + logging.OnError(err).Panic("unable to start listener") + err = metrics.RegisterCounter(metricSuccessfulDeliveriesEmail, "Successfully delivered emails") + logging.WithFields("metric", metricSuccessfulDeliveriesEmail).OnError(err).Panic("unable to register counter") + err = metrics.RegisterCounter(metricFailedDeliveriesEmail, "Failed email deliveries") + logging.WithFields("metric", metricFailedDeliveriesEmail).OnError(err).Panic("unable to register counter") + err = metrics.RegisterCounter(metricSuccessfulDeliveriesSMS, "Successfully delivered SMS") + logging.WithFields("metric", metricSuccessfulDeliveriesSMS).OnError(err).Panic("unable to register counter") + err = metrics.RegisterCounter(metricFailedDeliveriesSMS, "Failed SMS deliveries") + logging.WithFields("metric", metricFailedDeliveriesSMS).OnError(err).Panic("unable to register counter") + err = metrics.RegisterCounter(metricSuccessfulDeliveriesJSON, "Successfully delivered JSON messages") + logging.WithFields("metric", metricSuccessfulDeliveriesJSON).OnError(err).Panic("unable to register counter") + err = metrics.RegisterCounter(metricFailedDeliveriesJSON, "Failed JSON message deliveries") + logging.WithFields("metric", metricFailedDeliveriesJSON).OnError(err).Panic("unable to register counter") + q := handlers.NewNotificationQueries(queries, es, externalPort, externalSecure, fileSystemPath, userEncryption, smtpEncryption, smsEncryption, statikFS) + handlers.NewUserNotifier( + ctx, + projection.ApplyCustomConfig(userHandlerCustomConfig), + commands, + q, + assetsPrefix, + metricSuccessfulDeliveriesEmail, + metricFailedDeliveriesEmail, + metricSuccessfulDeliveriesSMS, + metricFailedDeliveriesSMS, + ).Start() + handlers.NewQuotaNotifier( + ctx, + projection.ApplyCustomConfig(quotaHandlerCustomConfig), + commands, + q, + metricSuccessfulDeliveriesJSON, + metricFailedDeliveriesJSON, + ).Start() +} diff --git a/internal/notification/senders/email.go b/internal/notification/senders/email.go index 8f5103506a..ddaa5a5c2c 100644 --- a/internal/notification/senders/email.go +++ b/internal/notification/senders/email.go @@ -3,17 +3,42 @@ package senders import ( "context" + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/notification/channels" "github.com/zitadel/zitadel/internal/notification/channels/fs" + "github.com/zitadel/zitadel/internal/notification/channels/instrumenting" "github.com/zitadel/zitadel/internal/notification/channels/log" "github.com/zitadel/zitadel/internal/notification/channels/smtp" ) -func EmailChannels(ctx context.Context, emailConfig func(ctx context.Context) (*smtp.Config, error), getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error)) (chain *Chain, err error) { +const smtpSpanName = "smtp.NotificationChannel" + +func EmailChannels( + ctx context.Context, + emailConfig func(ctx context.Context) (*smtp.Config, error), + getFileSystemProvider func(ctx context.Context) (*fs.Config, error), + getLogProvider func(ctx context.Context) (*log.Config, error), + successMetricName, + failureMetricName string, +) (chain *Chain, err error) { channels := make([]channels.NotificationChannel, 0, 3) - p, err := smtp.InitSMTPChannel(ctx, emailConfig) + p, err := smtp.InitChannel(ctx, emailConfig) + logging.WithFields( + "instance", authz.GetInstance(ctx).InstanceID(), + ).OnError(err).Debug("initializing SMTP channel failed") if err == nil { - channels = append(channels, p) + channels = append( + channels, + instrumenting.Wrap( + ctx, + p, + smtpSpanName, + successMetricName, + failureMetricName, + ), + ) } channels = append(channels, debugChannels(ctx, getFileSystemProvider, getLogProvider)...) return chainChannels(channels...), nil diff --git a/internal/notification/senders/json.go b/internal/notification/senders/json.go new file mode 100644 index 0000000000..73f6a92ac8 --- /dev/null +++ b/internal/notification/senders/json.go @@ -0,0 +1,49 @@ +package senders + +import ( + "context" + + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/notification/channels" + "github.com/zitadel/zitadel/internal/notification/channels/fs" + "github.com/zitadel/zitadel/internal/notification/channels/instrumenting" + "github.com/zitadel/zitadel/internal/notification/channels/log" + "github.com/zitadel/zitadel/internal/notification/channels/webhook" +) + +const webhookSpanName = "webhook.NotificationChannel" + +func JSONChannels( + ctx context.Context, + webhookConfig webhook.Config, + getFileSystemProvider func(ctx context.Context) (*fs.Config, error), + getLogProvider func(ctx context.Context) (*log.Config, error), + successMetricName, + failureMetricName string, +) (*Chain, error) { + if err := webhookConfig.Validate(); err != nil { + return nil, err + } + channels := make([]channels.NotificationChannel, 0, 3) + webhookChannel, err := webhook.InitChannel(ctx, webhookConfig) + logging.WithFields( + "instance", authz.GetInstance(ctx).InstanceID(), + "callurl", webhookConfig.CallURL, + ).OnError(err).Debug("initializing JSON channel failed") + if err == nil { + channels = append( + channels, + instrumenting.Wrap( + ctx, + webhookChannel, + webhookSpanName, + successMetricName, + failureMetricName, + ), + ) + } + channels = append(channels, debugChannels(ctx, getFileSystemProvider, getLogProvider)...) + return chainChannels(channels...), nil +} diff --git a/internal/notification/senders/sms.go b/internal/notification/senders/sms.go index 0b2cdbfb74..22d4a47202 100644 --- a/internal/notification/senders/sms.go +++ b/internal/notification/senders/sms.go @@ -5,14 +5,33 @@ import ( "github.com/zitadel/zitadel/internal/notification/channels" "github.com/zitadel/zitadel/internal/notification/channels/fs" + "github.com/zitadel/zitadel/internal/notification/channels/instrumenting" "github.com/zitadel/zitadel/internal/notification/channels/log" "github.com/zitadel/zitadel/internal/notification/channels/twilio" ) -func SMSChannels(ctx context.Context, twilioConfig *twilio.Config, getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error)) (chain *Chain, err error) { +const twilioSpanName = "twilio.NotificationChannel" + +func SMSChannels( + ctx context.Context, + twilioConfig *twilio.Config, + getFileSystemProvider func(ctx context.Context) (*fs.Config, error), + getLogProvider func(ctx context.Context) (*log.Config, error), + successMetricName, + failureMetricName string, +) (chain *Chain, err error) { channels := make([]channels.NotificationChannel, 0, 3) if twilioConfig != nil { - channels = append(channels, twilio.InitTwilioChannel(*twilioConfig)) + channels = append( + channels, + instrumenting.Wrap( + ctx, + twilio.InitChannel(*twilioConfig), + twilioSpanName, + successMetricName, + failureMetricName, + ), + ) } channels = append(channels, debugChannels(ctx, getFileSystemProvider, getLogProvider)...) return chainChannels(channels...), nil diff --git a/internal/notification/types/json.go b/internal/notification/types/json.go new file mode 100644 index 0000000000..8c5e181661 --- /dev/null +++ b/internal/notification/types/json.go @@ -0,0 +1,40 @@ +package types + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/notification/channels/fs" + "github.com/zitadel/zitadel/internal/notification/channels/log" + "github.com/zitadel/zitadel/internal/notification/channels/webhook" + "github.com/zitadel/zitadel/internal/notification/messages" + "github.com/zitadel/zitadel/internal/notification/senders" +) + +func handleJSON( + ctx context.Context, + webhookConfig webhook.Config, + getFileSystemProvider func(ctx context.Context) (*fs.Config, error), + getLogProvider func(ctx context.Context) (*log.Config, error), + serializable interface{}, + triggeringEvent eventstore.Event, + successMetricName, + failureMetricName string, +) error { + message := &messages.JSON{ + Serializable: serializable, + TriggeringEvent: triggeringEvent, + } + channelChain, err := senders.JSONChannels( + ctx, + webhookConfig, + getFileSystemProvider, + getLogProvider, + successMetricName, + failureMetricName, + ) + if err != nil { + return err + } + return channelChain.HandleMessage(message) +} diff --git a/internal/notification/types/notification.go b/internal/notification/types/notification.go index e990626782..7c4fc3c14b 100644 --- a/internal/notification/types/notification.go +++ b/internal/notification/types/notification.go @@ -3,11 +3,13 @@ package types import ( "context" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/i18n" "github.com/zitadel/zitadel/internal/notification/channels/fs" "github.com/zitadel/zitadel/internal/notification/channels/log" "github.com/zitadel/zitadel/internal/notification/channels/smtp" "github.com/zitadel/zitadel/internal/notification/channels/twilio" + "github.com/zitadel/zitadel/internal/notification/channels/webhook" "github.com/zitadel/zitadel/internal/notification/templates" "github.com/zitadel/zitadel/internal/query" ) @@ -29,6 +31,9 @@ func SendEmail( getLogProvider func(ctx context.Context) (*log.Config, error), colors *query.LabelPolicy, assetsPrefix string, + triggeringEvent eventstore.Event, + successMetricName, + failureMetricName string, ) Notify { return func( url string, @@ -42,7 +47,19 @@ func SendEmail( if err != nil { return err } - return generateEmail(ctx, user, data.Subject, template, emailConfig, getFileSystemProvider, getLogProvider, allowUnverifiedNotificationChannel) + return generateEmail( + ctx, + user, + data.Subject, + template, + emailConfig, + getFileSystemProvider, + getLogProvider, + allowUnverifiedNotificationChannel, + triggeringEvent, + successMetricName, + failureMetricName, + ) } } @@ -55,6 +72,9 @@ func SendSMSTwilio( getLogProvider func(ctx context.Context) (*log.Config, error), colors *query.LabelPolicy, assetsPrefix string, + triggeringEvent eventstore.Event, + successMetricName, + failureMetricName string, ) Notify { return func( url string, @@ -64,10 +84,41 @@ func SendSMSTwilio( ) error { args = mapNotifyUserToArgs(user, args) data := GetTemplateData(translator, args, assetsPrefix, url, messageType, user.PreferredLanguage.String(), colors) - return generateSms(ctx, user, data.Text, twilioConfig, getFileSystemProvider, getLogProvider, allowUnverifiedNotificationChannel) + return generateSms( + ctx, + user, + data.Text, + twilioConfig, + getFileSystemProvider, + getLogProvider, + allowUnverifiedNotificationChannel, + triggeringEvent, + successMetricName, + failureMetricName, + ) } } -func externalLink(origin string) string { - return origin + "/ui/login" +func SendJSON( + ctx context.Context, + webhookConfig webhook.Config, + getFileSystemProvider func(ctx context.Context) (*fs.Config, error), + getLogProvider func(ctx context.Context) (*log.Config, error), + serializable interface{}, + triggeringEvent eventstore.Event, + successMetricName, + failureMetricName string, +) Notify { + return func(_ string, _ map[string]interface{}, _ string, _ bool) error { + return handleJSON( + ctx, + webhookConfig, + getFileSystemProvider, + getLogProvider, + serializable, + triggeringEvent, + successMetricName, + failureMetricName, + ) + } } diff --git a/internal/notification/types/user_email.go b/internal/notification/types/user_email.go index 6f211996f1..916844d76e 100644 --- a/internal/notification/types/user_email.go +++ b/internal/notification/types/user_email.go @@ -4,7 +4,8 @@ import ( "context" "html" - caos_errors "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/notification/channels/fs" "github.com/zitadel/zitadel/internal/notification/channels/log" "github.com/zitadel/zitadel/internal/notification/channels/smtp" @@ -13,24 +14,44 @@ import ( "github.com/zitadel/zitadel/internal/query" ) -func generateEmail(ctx context.Context, user *query.NotifyUser, subject, content string, smtpConfig func(ctx context.Context) (*smtp.Config, error), getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error), lastEmail bool) error { +func generateEmail( + ctx context.Context, + user *query.NotifyUser, + subject, + content string, + smtpConfig func(ctx context.Context) (*smtp.Config, error), + getFileSystemProvider func(ctx context.Context) (*fs.Config, error), + getLogProvider func(ctx context.Context) (*log.Config, error), + lastEmail bool, + triggeringEvent eventstore.Event, + successMetricName, + failureMetricName string, +) error { content = html.UnescapeString(content) message := &messages.Email{ - Recipients: []string{user.VerifiedEmail}, - Subject: subject, - Content: content, + Recipients: []string{user.VerifiedEmail}, + Subject: subject, + Content: content, + TriggeringEvent: triggeringEvent, } if lastEmail { message.Recipients = []string{user.LastEmail} } - channelChain, err := senders.EmailChannels(ctx, smtpConfig, getFileSystemProvider, getLogProvider) + channelChain, err := senders.EmailChannels( + ctx, + smtpConfig, + getFileSystemProvider, + getLogProvider, + successMetricName, + failureMetricName, + ) if err != nil { return err } if channelChain.Len() == 0 { - return caos_errors.ThrowPreconditionFailed(nil, "MAIL-83nof", "Errors.Notification.Channels.NotPresent") + return errors.ThrowPreconditionFailed(nil, "MAIL-83nof", "Errors.Notification.Channels.NotPresent") } return channelChain.HandleMessage(message) } diff --git a/internal/notification/types/user_phone.go b/internal/notification/types/user_phone.go index b66cf48e0e..a3407119aa 100644 --- a/internal/notification/types/user_phone.go +++ b/internal/notification/types/user_phone.go @@ -5,7 +5,8 @@ import ( "github.com/zitadel/logging" - caos_errors "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/notification/channels/fs" "github.com/zitadel/zitadel/internal/notification/channels/log" "github.com/zitadel/zitadel/internal/notification/channels/twilio" @@ -14,7 +15,18 @@ import ( "github.com/zitadel/zitadel/internal/query" ) -func generateSms(ctx context.Context, user *query.NotifyUser, content string, getTwilioProvider func(ctx context.Context) (*twilio.Config, error), getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error), lastPhone bool) error { +func generateSms( + ctx context.Context, + user *query.NotifyUser, + content string, + getTwilioProvider func(ctx context.Context) (*twilio.Config, error), + getFileSystemProvider func(ctx context.Context) (*fs.Config, error), + getLogProvider func(ctx context.Context) (*log.Config, error), + lastPhone bool, + triggeringEvent eventstore.Event, + successMetricName, + failureMetricName string, +) error { number := "" twilioConfig, err := getTwilioProvider(ctx) if err == nil { @@ -24,16 +36,24 @@ func generateSms(ctx context.Context, user *query.NotifyUser, content string, ge SenderPhoneNumber: number, RecipientPhoneNumber: user.VerifiedPhone, Content: content, + TriggeringEvent: triggeringEvent, } if lastPhone { message.RecipientPhoneNumber = user.LastPhone } - channelChain, err := senders.SMSChannels(ctx, twilioConfig, getFileSystemProvider, getLogProvider) + channelChain, err := senders.SMSChannels( + ctx, + twilioConfig, + getFileSystemProvider, + getLogProvider, + successMetricName, + failureMetricName, + ) logging.OnError(err).Error("could not create sms channel") if channelChain.Len() == 0 { - return caos_errors.ThrowPreconditionFailed(nil, "PHONE-w8nfow", "Errors.Notification.Channels.NotPresent") + return errors.ThrowPreconditionFailed(nil, "PHONE-w8nfow", "Errors.Notification.Channels.NotPresent") } return channelChain.HandleMessage(message) } diff --git a/internal/notification/types/without_template.go b/internal/notification/types/without_template.go new file mode 100644 index 0000000000..45ded1b598 --- /dev/null +++ b/internal/notification/types/without_template.go @@ -0,0 +1,5 @@ +package types + +func (notify Notify) WithoutTemplate() error { + return notify("", nil, "", false) +} diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 15faad03e6..049c6a80df 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -63,6 +63,7 @@ var ( SecurityPolicyProjection *securityPolicyProjection NotificationPolicyProjection *notificationPolicyProjection NotificationsProjection interface{} + NotificationsQuotaProjection interface{} ) type projection interface { diff --git a/internal/query/quota_notifications.go b/internal/query/quota_notifications.go index f5ac6f2522..63930dbe1b 100644 --- a/internal/query/quota_notifications.go +++ b/internal/query/quota_notifications.go @@ -9,7 +9,7 @@ import ( "github.com/zitadel/zitadel/internal/repository/quota" ) -func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, usedAbs uint64) ([]*quota.NotifiedEvent, error) { +func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, usedAbs uint64) ([]*quota.NotificationDueEvent, error) { if len(config.Notifications) == 0 { return nil, nil } @@ -22,7 +22,7 @@ func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.Ad usedRel := uint16(math.Floor(float64(usedAbs*100) / float64(config.Amount))) - var dueNotifications []*quota.NotifiedEvent + var dueNotifications []*quota.NotificationDueEvent for _, notification := range config.Notifications { if notification.Percent > usedRel { continue @@ -30,13 +30,13 @@ func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.Ad threshold := notification.Percent if notification.Repeat { - threshold = uint16(math.Min(1, math.Floor(float64(usedRel)/float64(notification.Percent)))) * notification.Percent + threshold = uint16(math.Max(1, math.Floor(float64(usedRel)/float64(notification.Percent)))) * notification.Percent } - if wm.latestNotifiedThresholds[notification.ID] < threshold { + if wm.latestDueThresholds[notification.ID] < threshold { dueNotifications = append( dueNotifications, - quota.NewNotifiedEvent( + quota.NewNotificationDueEvent( ctx, &aggregate, config.Unit, diff --git a/internal/query/quota_notifications_model.go b/internal/query/quota_notifications_model.go index a64aa05b06..af809c7047 100644 --- a/internal/query/quota_notifications_model.go +++ b/internal/query/quota_notifications_model.go @@ -9,8 +9,8 @@ import ( type quotaNotificationsReadModel struct { eventstore.ReadModel - periodStart time.Time - latestNotifiedThresholds map[string]uint16 + periodStart time.Time + latestDueThresholds map[string]uint16 } func newQuotaNotificationsReadModel(aggregateId, instanceId, resourceOwner string, periodStart time.Time) *quotaNotificationsReadModel { @@ -20,8 +20,8 @@ func newQuotaNotificationsReadModel(aggregateId, instanceId, resourceOwner strin InstanceID: instanceId, ResourceOwner: resourceOwner, }, - periodStart: periodStart, - latestNotifiedThresholds: make(map[string]uint16), + periodStart: periodStart, + latestDueThresholds: make(map[string]uint16), } } @@ -34,13 +34,13 @@ func (rm *quotaNotificationsReadModel) Query() *eventstore.SearchQueryBuilder { AggregateTypes(quota.AggregateType). AggregateIDs(rm.AggregateID). CreationDateAfter(rm.periodStart). - EventTypes(quota.NotifiedEventType).Builder() + EventTypes(quota.NotificationDueEventType).Builder() } func (rm *quotaNotificationsReadModel) Reduce() error { for _, event := range rm.Events { - e := event.(*quota.NotifiedEvent) - rm.latestNotifiedThresholds[e.ID] = e.Threshold + e := event.(*quota.NotificationDueEvent) + rm.latestDueThresholds[e.ID] = e.Threshold } return rm.ReadModel.Reduce() } diff --git a/internal/repository/quota/events.go b/internal/repository/quota/events.go index 3f889a01da..4d635e2bd3 100644 --- a/internal/repository/quota/events.go +++ b/internal/repository/quota/events.go @@ -19,6 +19,7 @@ const ( eventTypePrefix = eventstore.EventType("quota.") AddedEventType = eventTypePrefix + "added" NotifiedEventType = eventTypePrefix + "notified" + NotificationDueEventType = eventTypePrefix + "notificationdue" RemovedEventType = eventTypePrefix + "removed" ) @@ -107,6 +108,62 @@ func AddedEventMapper(event *repository.Event) (eventstore.Event, error) { return e, nil } +type NotificationDueEvent struct { + eventstore.BaseEvent `json:"-"` + Unit Unit `json:"unit"` + ID string `json:"id"` + CallURL string `json:"callURL"` + PeriodStart time.Time `json:"periodStart"` + Threshold uint16 `json:"threshold"` + Usage uint64 `json:"usage"` +} + +func (n *NotificationDueEvent) Data() interface{} { + return n +} + +func (n *NotificationDueEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { + return nil +} + +func NewNotificationDueEvent( + ctx context.Context, + aggregate *eventstore.Aggregate, + unit Unit, + id string, + callURL string, + periodStart time.Time, + threshold uint16, + usage uint64, +) *NotificationDueEvent { + return &NotificationDueEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + aggregate, + NotificationDueEventType, + ), + Unit: unit, + ID: id, + CallURL: callURL, + PeriodStart: periodStart, + Threshold: threshold, + Usage: usage, + } +} + +func NotificationDueEventMapper(event *repository.Event) (eventstore.Event, error) { + e := &NotificationDueEvent{ + BaseEvent: *eventstore.BaseEventFromRepo(event), + } + + err := json.Unmarshal(event.Data, e) + if err != nil { + return nil, errors.ThrowInternal(err, "QUOTA-k56rT", "unable to unmarshal notification due") + } + + return e, nil +} + type NotifiedEvent struct { eventstore.BaseEvent `json:"-"` Unit Unit `json:"unit"` @@ -115,6 +172,7 @@ type NotifiedEvent struct { PeriodStart time.Time `json:"periodStart"` Threshold uint16 `json:"threshold"` Usage uint64 `json:"usage"` + DueEventID string `json:"dueEventID"` } func (e *NotifiedEvent) Data() interface{} { @@ -127,26 +185,28 @@ func (e *NotifiedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint func NewNotifiedEvent( ctx context.Context, - aggregate *eventstore.Aggregate, - unit Unit, id string, - callURL string, - periodStart time.Time, - threshold uint16, - usage uint64, + dueEvent *NotificationDueEvent, ) *NotifiedEvent { + aggregate := dueEvent.Aggregate() return &NotifiedEvent{ BaseEvent: *eventstore.NewBaseEventForPush( ctx, - aggregate, + &aggregate, NotifiedEventType, ), - Unit: unit, - ID: id, - CallURL: callURL, - PeriodStart: periodStart, - Threshold: threshold, - Usage: usage, + ID: id, + DueEventID: dueEvent.ID, + // Deprecated: dereference the NotificationDueEvent + Unit: dueEvent.Unit, + // Deprecated: dereference the NotificationDueEvent + CallURL: dueEvent.CallURL, + // Deprecated: dereference the NotificationDueEvent + PeriodStart: dueEvent.PeriodStart, + // Deprecated: dereference the NotificationDueEvent + Threshold: dueEvent.Threshold, + // Deprecated: dereference the NotificationDueEvent + Usage: dueEvent.Usage, } } diff --git a/internal/repository/quota/eventstore.go b/internal/repository/quota/eventstore.go index 1725b2bc48..2c87bb0658 100644 --- a/internal/repository/quota/eventstore.go +++ b/internal/repository/quota/eventstore.go @@ -6,6 +6,7 @@ import ( func RegisterEventMappers(es *eventstore.Eventstore) { es.RegisterFilterEventMapper(AggregateType, AddedEventType, AddedEventMapper). - RegisterFilterEventMapper(AggregateType, NotifiedEventType, NotifiedEventMapper). - RegisterFilterEventMapper(AggregateType, RemovedEventType, RemovedEventMapper) + RegisterFilterEventMapper(AggregateType, RemovedEventType, RemovedEventMapper). + RegisterFilterEventMapper(AggregateType, NotificationDueEventType, NotificationDueEventMapper). + RegisterFilterEventMapper(AggregateType, NotifiedEventType, NotifiedEventMapper) }