feat: call webhooks at least once (#5454)

* feat: call webhooks at least once

* self review

* feat: improve notification observability

* feat: add notification tracing

* test(e2e): test at-least-once webhook delivery

* fix webhook notifications

* dedicated quota notifications handler

* fix linting

* fix e2e test

* wait less in e2e test

* fix: don't ignore failed events in handlers

* fix: don't ignore failed events in handlers

* faster requeues

* question

* fix retries

* fix retries

* retry

* don't instance ids query

* revert handler_projection

* statements can be nil

* cleanup

* make unit tests pass

* add comments

* add comments

* lint

* spool only active instances

* feat(config): handle inactive instances

* customizable HandleInactiveInstances

* call inactive instances quota webhooks

* test: handling with and w/o inactive instances

* omit retrying noop statements

* docs: describe projection options

* enable global handling of inactive instances

* self review

* requeue quota notifications every 5m

* remove caos_errors reference

* fix comment styles

* make handlers package flat

* fix linting

* fix repeating quota notifications

* test with more usage

* debug log channel init failures
This commit is contained in:
Elio Bischof 2023-03-29 00:09:06 +02:00 committed by GitHub
parent 3c3e51045b
commit cccccd005c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 1776 additions and 893 deletions

View File

@ -171,6 +171,14 @@ Projections:
Notifications:
# As notification projections don't result in database statements, retries don't have an effect
MaxFailureCount: 0
# The NotificationsQuotas projection is used for calling quota webhooks
NotificationsQuotas:
# Delivery guarantee requirements are probably higher for quota webhooks
HandleInactiveInstances: true
# As quota notification projections don't result in database statements, retries don't have an effect
MaxFailureCount: 0
# Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much.
RequeueEvery: 300s
Auth:
SearchLimit: 1000

View File

@ -167,7 +167,7 @@ func startZitadel(config *Config, masterKey string) error {
}
actions.SetLogstoreService(actionsLogstoreSvc)
notification.Start(ctx, config.Projections.Customizations["notifications"], config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS)
notification.Start(ctx, config.Projections.Customizations["notifications"], config.Projections.Customizations["notificationsquotas"], config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS)
router := mux.NewRouter()
tlsConfig, err := config.TLS.Config()

View File

@ -106,6 +106,8 @@ Projections:
HandleInactiveInstances: true
# As quota notification projections don't result in database statements, retries don't have an effect
MaxFailureCount: 0
# Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much.
RequeueEvery: 300s
```
### Manage your Data

View File

@ -1,3 +1,6 @@
Log:
Level: debug
ExternalDomain: host.docker.internal
ExternalSecure: false
@ -32,6 +35,11 @@ Quotas:
ExhaustedCookieKey: "zitadel.quota.limiting"
ExhaustedCookieMaxAge: "60s"
Projections:
Customizations:
NotificationsQuotas:
RequeueEvery: 1s
DefaultInstance:
LoginPolicy:
MfaInitSkipLifetime: "0"

View File

@ -27,4 +27,5 @@ services:
retries: 5
start_period: '20s'
ports:
- "26257:26257"
- "26257:26257"
- "9090:9090"

View File

@ -1,3 +1,6 @@
Log:
Level: debug
ExternalDomain: localhost
ExternalSecure: false
@ -32,6 +35,11 @@ Quotas:
ExhaustedCookieKey: "zitadel.quota.limiting"
ExhaustedCookieMaxAge: "60s"
Projections:
Customizations:
NotificationsQuotas:
RequeueEvery: 1s
DefaultInstance:
LoginPolicy:
MfaInitSkipLifetime: "0"

View File

@ -34,8 +34,8 @@ YkTaa1AFLstnf348ZjuvBN3USUYZo3X3mxnS+uluVuRSGwIKsN0a
-----END RSA PRIVATE KEY-----`
let tokensCache = new Map<string,string>()
let webhookEvents = new Array<ZITADELWebhookEvent>()
let failWebhookEventsCount = 0
export default defineConfig({
reporter: 'mochawesome',
@ -98,10 +98,15 @@ export default defineConfig({
},
resetWebhookEvents() {
webhookEvents = []
failWebhookEventsCount = 0
return null
},
handledWebhookEvents(){
return webhookEvents
},
failWebhookEvents(count: number){
failWebhookEventsCount = count
return null
}
})
},
@ -127,11 +132,17 @@ function startWebhookEventHandler() {
req.on("data", (chunk) => {
chunks.push(chunk);
});
const sendStatus = failWebhookEventsCount ? 500 : 200
req.on("end", () => {
webhookEvents.push(JSON.parse(Buffer.concat(chunks).toString()));
webhookEvents.push({
sentStatus: sendStatus,
payload: JSON.parse(Buffer.concat(chunks).toString())
});
});
res.writeHead(200);
if (failWebhookEventsCount > 0){
failWebhookEventsCount--
}
res.writeHead(sendStatus);
res.end()
});

View File

@ -2,6 +2,7 @@ import { addQuota, ensureQuotaIsAdded, ensureQuotaIsRemoved, removeQuota, Unit }
import { createHumanUser, ensureUserDoesntExist } from 'support/api/users';
import { Context } from 'support/commands';
import { ZITADELWebhookEvent } from 'support/types';
import { textChangeRangeIsUnchanged } from 'typescript';
beforeEach(() => {
cy.context().as('ctx');
@ -144,7 +145,7 @@ describe('quotas', () => {
const amount = 100;
const percent = 10;
const usage = 25;
const usage = 35;
describe('without repetition', () => {
beforeEach(() => {
@ -160,7 +161,7 @@ describe('quotas', () => {
});
});
it('fires once with the expected payload', () => {
it('fires at least once with the expected payload', () => {
cy.get<Array<string>>('@authenticatedUrls').then((urls) => {
cy.get<Context>('@ctx').then((ctx) => {
for (let i = 0; i < usage; i++) {
@ -175,19 +176,71 @@ describe('quotas', () => {
});
cy.waitUntil(() =>
cy.task<Array<ZITADELWebhookEvent>>('handledWebhookEvents').then((events) => {
if (events.length != 1) {
if (events.length < 1) {
return false;
}
return Cypress._.matches(<ZITADELWebhookEvent>{
callURL: callURL,
threshold: percent,
unit: 1,
usage: percent,
sentStatus: 200,
payload: {
callURL: callURL,
threshold: percent,
unit: 1,
usage: percent,
},
})(events[0]);
}),
);
});
});
it('fires until the webhook returns a successful message', () => {
cy.task('failWebhookEvents', 8);
cy.get<Array<string>>('@authenticatedUrls').then((urls) => {
cy.get<Context>('@ctx').then((ctx) => {
for (let i = 0; i < usage; i++) {
cy.request({
url: urls[0],
method: 'GET',
auth: {
bearer: ctx.api.token,
},
});
}
});
cy.waitUntil(
() =>
cy.task<Array<ZITADELWebhookEvent>>('handledWebhookEvents').then((events) => {
if (events.length != 9) {
return false;
}
return events.reduce<boolean>((a, b, i) => {
return !a
? a
: i < 8
? Cypress._.matches(<ZITADELWebhookEvent>{
sentStatus: 500,
payload: {
callURL: callURL,
threshold: percent,
unit: 1,
usage: percent,
},
})(b)
: Cypress._.matches(<ZITADELWebhookEvent>{
sentStatus: 200,
payload: {
callURL: callURL,
threshold: percent,
unit: 1,
usage: percent,
},
})(b);
}, true);
}),
{ timeout: 60_000 },
);
});
});
});
describe('with repetition', () => {
@ -222,23 +275,25 @@ describe('quotas', () => {
});
cy.waitUntil(() =>
cy.task<Array<ZITADELWebhookEvent>>('handledWebhookEvents').then((events) => {
if (events.length != 1) {
return false;
}
let foundExpected = 0;
for (let i = 0; i < events.length; i++) {
const threshold = percent * (i + 1);
if (
!Cypress._.matches(<ZITADELWebhookEvent>{
callURL: callURL,
threshold: threshold,
unit: 1,
usage: threshold,
})(events[i])
) {
return false;
for (let expect = 10; expect <= 30; expect += 10) {
if (
Cypress._.matches(<ZITADELWebhookEvent>{
sentStatus: 200,
payload: {
callURL: callURL,
threshold: expect,
unit: 1,
usage: expect,
},
})(events[i])
) {
foundExpected++;
}
}
}
return true;
return foundExpected >= 3;
}),
);
});

View File

@ -1,10 +1,13 @@
let webhookEventSchema = {
unit: 0,
id: '',
callURL: '',
periodStart: new Date(),
threshold: 0,
usage: 0,
sentStatus: 0,
payload: {
unit: 0,
id: '',
callURL: '',
periodStart: new Date(),
threshold: 0,
usage: 0,
},
};
export type ZITADELWebhookEvent = typeof webhookEventSchema;

View File

@ -1,58 +1,31 @@
package command
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/quota"
)
// ReportUsage calls notification hooks and emits the notified events
func (c *Commands) ReportUsage(ctx context.Context, dueNotifications []*quota.NotifiedEvent) error {
for _, notification := range dueNotifications {
if err := notify(ctx, notification); err != nil {
if err != nil {
return err
}
}
if _, err := c.eventstore.Push(ctx, notification); err != nil {
return err
}
func (c *Commands) ReportUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error {
cmds := make([]eventstore.Command, len(dueNotifications))
for idx, notification := range dueNotifications {
cmds[idx] = notification
}
return nil
_, err := c.eventstore.Push(ctx, cmds...)
return err
}
func notify(ctx context.Context, notification *quota.NotifiedEvent) error {
payload, err := json.Marshal(notification)
func (c *Commands) UsageNotificationSent(ctx context.Context, dueEvent *quota.NotificationDueEvent) error {
id, err := c.idGenerator.Next()
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, notification.CallURL, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("calling url %s returned %s", notification.CallURL, resp.Status)
}
return nil
_, err = c.eventstore.Push(
ctx,
quota.NewNotifiedEvent(ctx, id, dueEvent),
)
return err
}

View File

@ -23,6 +23,6 @@ func (i *inmemReporter) GetCurrentQuotaPeriod(context.Context, string, quota.Uni
return i.config, i.startPeriod, nil
}
func (*inmemReporter) GetDueQuotaNotifications(context.Context, *quota.AddedEvent, time.Time, uint64) ([]*quota.NotifiedEvent, error) {
func (*inmemReporter) GetDueQuotaNotifications(context.Context, *quota.AddedEvent, time.Time, uint64) ([]*quota.NotificationDueEvent, error) {
return nil, nil
}

View File

@ -15,7 +15,7 @@ const handleThresholdTimeout = time.Minute
type QuotaQuerier interface {
GetCurrentQuotaPeriod(ctx context.Context, instanceID string, unit quota.Unit) (config *quota.AddedEvent, periodStart time.Time, err error)
GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, used uint64) ([]*quota.NotifiedEvent, error)
GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, used uint64) ([]*quota.NotificationDueEvent, error)
}
type UsageQuerier interface {
@ -25,12 +25,12 @@ type UsageQuerier interface {
}
type UsageReporter interface {
Report(ctx context.Context, notifications []*quota.NotifiedEvent) (err error)
Report(ctx context.Context, notifications []*quota.NotificationDueEvent) (err error)
}
type UsageReporterFunc func(context.Context, []*quota.NotifiedEvent) (err error)
type UsageReporterFunc func(context.Context, []*quota.NotificationDueEvent) (err error)
func (u UsageReporterFunc) Report(ctx context.Context, notifications []*quota.NotifiedEvent) (err error) {
func (u UsageReporterFunc) Report(ctx context.Context, notifications []*quota.NotificationDueEvent) (err error) {
return u(ctx, notifications)
}

View File

@ -1,7 +1,10 @@
package channels
import "github.com/zitadel/zitadel/internal/eventstore"
type Message interface {
GetContent() string
GetTriggeringEvent() eventstore.Event
GetContent() (string, error)
}
type NotificationChannel interface {

View File

@ -2,19 +2,16 @@ package fs
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/k3a/html2text"
"github.com/zitadel/logging"
caos_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/k3a/html2text"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/notification/messages"
)
@ -29,7 +26,10 @@ func InitFSChannel(config Config) (channels.NotificationChannel, error) {
return channels.HandleMessageFunc(func(message channels.Message) error {
fileName := fmt.Sprintf("%d_", time.Now().Unix())
content := message.GetContent()
content, err := message.GetContent()
if err != nil {
return err
}
switch msg := message.(type) {
case *messages.Email:
recipients := make([]string, len(msg.Recipients))
@ -41,10 +41,12 @@ func InitFSChannel(config Config) (channels.NotificationChannel, error) {
}
case *messages.SMS:
fileName = fileName + "sms_to_" + msg.RecipientPhoneNumber + ".txt"
case *messages.JSON:
fileName = "message.json"
default:
return caos_errors.ThrowUnimplementedf(nil, "NOTIF-6f9a1", "filesystem provider doesn't support message type %T", message)
return errors.ThrowUnimplementedf(nil, "NOTIF-6f9a1", "filesystem provider doesn't support message type %T", message)
}
return ioutil.WriteFile(filepath.Join(config.Path, fileName), []byte(content), 0666)
return os.WriteFile(filepath.Join(config.Path, fileName), []byte(content), 0666)
}), nil
}

View File

@ -0,0 +1,26 @@
package instrumenting
import (
"context"
"github.com/zitadel/zitadel/internal/notification/channels"
)
func Wrap(
ctx context.Context,
channel channels.NotificationChannel,
traceSpanName,
successMetricName,
failureMetricName string,
) channels.NotificationChannel {
return traceMessages(
ctx,
countMessages(
ctx,
logMessages(ctx, channel),
successMetricName,
failureMetricName,
),
traceSpanName,
)
}

View File

@ -0,0 +1,24 @@
package instrumenting
import (
"context"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/notification/channels"
)
func logMessages(ctx context.Context, channel channels.NotificationChannel) channels.NotificationChannel {
return channels.HandleMessageFunc(func(message channels.Message) error {
logEntry := logging.WithFields(
"instance", authz.GetInstance(ctx).InstanceID(),
"triggering_event_type", message.GetTriggeringEvent().Type(),
)
logEntry.Debug("sending notification")
err := channel.HandleMessage(message)
logEntry.OnError(err).Warn("sending notification failed")
logEntry.Debug("notification sent")
return err
})
}

View File

@ -0,0 +1,36 @@
package instrumenting
import (
"context"
"github.com/zitadel/logging"
"go.opentelemetry.io/otel/attribute"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/telemetry/metrics"
)
func countMessages(ctx context.Context, channel channels.NotificationChannel, successMetricName, errorMetricName string) channels.NotificationChannel {
return channels.HandleMessageFunc(func(message channels.Message) error {
err := channel.HandleMessage(message)
metricName := successMetricName
if err != nil {
metricName = errorMetricName
}
addCount(ctx, metricName, message, err)
return err
})
}
func addCount(ctx context.Context, metricName string, message channels.Message, err error) {
labels := map[string]attribute.Value{
"triggering_event_typey": attribute.StringValue(string(message.GetTriggeringEvent().Type())),
"instance": attribute.StringValue(authz.GetInstance(ctx).InstanceID()),
}
if err != nil {
labels["error"] = attribute.StringValue(err.Error())
}
addCountErr := metrics.AddCount(ctx, metricName, 1, labels)
logging.WithFields("name", metricName, "labels", labels).OnError(addCountErr).Error("incrementing counter metric failed")
}

View File

@ -0,0 +1,16 @@
package instrumenting
import (
"context"
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
func traceMessages(ctx context.Context, channel channels.NotificationChannel, spanName string) channels.NotificationChannel {
return channels.HandleMessageFunc(func(message channels.Message) (err error) {
_, span := tracing.NewNamedSpan(ctx, spanName)
defer func() { span.EndWithError(err) }()
return channel.HandleMessage(message)
})
}

View File

@ -15,7 +15,10 @@ func InitStdoutChannel(config Config) channels.NotificationChannel {
return channels.HandleMessageFunc(func(message channels.Message) error {
content := message.GetContent()
content, err := message.GetContent()
if err != nil {
return err
}
if config.Compact {
content = html2text.HTML2Text(content)
}

View File

@ -22,7 +22,7 @@ type Email struct {
senderName string
}
func InitSMTPChannel(ctx context.Context, getSMTPConfig func(ctx context.Context) (*Config, error)) (*Email, error) {
func InitChannel(ctx context.Context, getSMTPConfig func(ctx context.Context) (*Config, error)) (*Email, error) {
smtpConfig, err := getSMTPConfig(ctx)
if err != nil {
return nil, err
@ -70,7 +70,12 @@ func (email *Email) HandleMessage(message channels.Message) error {
return err
}
_, err = w.Write([]byte(emailMsg.GetContent()))
content, err := emailMsg.GetContent()
if err != nil {
return err
}
_, err = w.Write([]byte(content))
if err != nil {
return err
}
@ -80,7 +85,6 @@ func (email *Email) HandleMessage(message channels.Message) error {
return err
}
defer logging.LogWithFields("EMAI-a1c87ec8").Debug("email sent")
return email.smtpClient.Quit()
}
@ -154,6 +158,8 @@ func (smtpConfig SMTP) smtpAuth(client *smtp.Client, host string) error {
// Auth
auth := smtp.PlainAuth("", smtpConfig.User, smtpConfig.Password, host)
err := client.Auth(auth)
logging.Log("EMAIL-s9kfs").WithField("smtp user", smtpConfig.User).OnError(err).Debug("could not add smtp auth")
return err
if err != nil {
return caos_errs.ThrowInternalf(err, "EMAIL-s9kfs", "could not add smtp auth for user %s", smtpConfig.User)
}
return nil
}

View File

@ -9,7 +9,7 @@ import (
"github.com/zitadel/zitadel/internal/notification/messages"
)
func InitTwilioChannel(config Config) channels.NotificationChannel {
func InitChannel(config Config) channels.NotificationChannel {
client := twilio.NewClient(config.SID, config.Token, nil)
logging.Debug("successfully initialized twilio sms channel")
@ -19,7 +19,11 @@ func InitTwilioChannel(config Config) channels.NotificationChannel {
if !ok {
return caos_errs.ThrowInternal(nil, "TWILI-s0pLc", "message is not SMS")
}
m, err := client.Messages.SendMessage(twilioMsg.SenderPhoneNumber, twilioMsg.RecipientPhoneNumber, twilioMsg.GetContent(), nil)
content, err := twilioMsg.GetContent()
if err != nil {
return err
}
m, err := client.Messages.SendMessage(twilioMsg.SenderPhoneNumber, twilioMsg.RecipientPhoneNumber, content, nil)
if err != nil {
return caos_errs.ThrowInternal(err, "TWILI-osk3S", "could not send message")
}

View File

@ -0,0 +1,60 @@
package webhook
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/notification/messages"
)
func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
logging.Debug("successfully initialized webhook json channel")
return channels.HandleMessageFunc(func(message channels.Message) error {
requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
msg, ok := message.(*messages.JSON)
if !ok {
return errors.ThrowInternal(nil, "WEBH-K686U", "message is not JSON")
}
payload, err := msg.GetContent()
if err != nil {
return err
}
req, err := http.NewRequestWithContext(requestCtx, cfg.Method, cfg.CallURL, strings.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return errors.ThrowUnknown(fmt.Errorf("calling url %s returned %s", cfg.CallURL, resp.Status), "WEBH-LBxU0", "webhook didn't return a success status")
}
logging.WithFields("calling_url", cfg.CallURL, "method", cfg.Method).Debug("webhook called")
return nil
}), nil
}

View File

@ -0,0 +1,15 @@
package webhook
import (
"net/url"
)
type Config struct {
CallURL string
Method string
}
func (w *Config) Validate() error {
_, err := url.Parse(w.CallURL)
return err
}

View File

@ -0,0 +1,27 @@
package handlers
import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/user"
)
func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
events, err := n.es.Filter(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
InstanceID(event.Aggregate().InstanceID).
AddQuery().
AggregateTypes(user.AggregateType).
AggregateIDs(event.Aggregate().ID).
SequenceGreater(event.Sequence()).
EventTypes(eventTypes...).
EventData(data).
Builder(),
)
if err != nil {
return false, err
}
return len(events) > 0, nil
}

View File

@ -0,0 +1,21 @@
package handlers
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
)
// GetFileSystemProvider reads the iam filesystem provider config
func (n *NotificationQueries) GetFileSystemProvider(ctx context.Context) (*fs.Config, error) {
config, err := n.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeFile)
if err != nil {
return nil, err
}
return &fs.Config{
Compact: config.Compact,
Path: n.fileSystemPath,
}, nil
}

View File

@ -0,0 +1,20 @@
package handlers
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/notification/channels/log"
)
// GetLogProvider reads the iam log provider config
func (n *NotificationQueries) GetLogProvider(ctx context.Context) (*log.Config, error) {
config, err := n.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeLog)
if err != nil {
return nil, err
}
return &log.Config{
Compact: config.Compact,
}, nil
}

View File

@ -0,0 +1,31 @@
package handlers
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/notification/channels/smtp"
)
// GetSMTPConfig reads the iam SMTP provider config
func (n *NotificationQueries) GetSMTPConfig(ctx context.Context) (*smtp.Config, error) {
config, err := n.SMTPConfigByAggregateID(ctx, authz.GetInstance(ctx).InstanceID())
if err != nil {
return nil, err
}
password, err := crypto.DecryptString(config.Password, n.SMTPPasswordCrypto)
if err != nil {
return nil, err
}
return &smtp.Config{
From: config.SenderAddress,
FromName: config.SenderName,
Tls: config.TLS,
SMTP: smtp.SMTP{
Host: config.Host,
User: config.User,
Password: password,
},
}, nil
}

View File

@ -0,0 +1,35 @@
package handlers
import (
"context"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/notification/channels/twilio"
"github.com/zitadel/zitadel/internal/query"
)
// GetTwilioConfig reads the iam Twilio provider config
func (n *NotificationQueries) GetTwilioConfig(ctx context.Context) (*twilio.Config, error) {
active, err := query.NewSMSProviderStateQuery(domain.SMSConfigStateActive)
if err != nil {
return nil, err
}
config, err := n.SMSProviderConfig(ctx, active)
if err != nil {
return nil, err
}
if config.TwilioConfig == nil {
return nil, errors.ThrowNotFound(nil, "HANDLER-8nfow", "Errors.SMS.Twilio.NotFound")
}
token, err := crypto.DecryptString(config.TwilioConfig.Token, n.SMSTokenCrypto)
if err != nil {
return nil, err
}
return &twilio.Config{
SID: config.TwilioConfig.SID,
Token: token,
SenderNumber: config.TwilioConfig.SenderNumber,
}, nil
}

View File

@ -0,0 +1,15 @@
package handlers
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
)
const NotifyUserID = "NOTIFICATION" //TODO: system?
func HandlerContext(event eventstore.Aggregate) context.Context {
ctx := authz.WithInstanceID(context.Background(), event.InstanceID)
return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner})
}

View File

@ -0,0 +1,28 @@
package handlers
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
http_utils "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/query"
)
func (n *NotificationQueries) Origin(ctx context.Context) (context.Context, string, error) {
primary, err := query.NewInstanceDomainPrimarySearchQuery(true)
if err != nil {
return ctx, "", err
}
domains, err := n.SearchInstanceDomains(ctx, &query.InstanceDomainSearchQueries{
Queries: []query.SearchQuery{primary},
})
if err != nil {
return ctx, "", err
}
if len(domains.Domains) < 1 {
return ctx, "", errors.ThrowInternal(nil, "NOTIF-Ef3r1", "Errors.Notification.NoDomain")
}
ctx = authz.WithRequestedDomain(ctx, domains.Domains[0].Domain)
return ctx, http_utils.BuildHTTP(domains.Domains[0].Domain, n.externalPort, n.externalSecure), nil
}

View File

@ -0,0 +1,46 @@
package handlers
import (
"net/http"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/eventstore"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/query"
)
type NotificationQueries struct {
*query.Queries
es *eventstore.Eventstore
externalPort uint16
externalSecure bool
fileSystemPath string
UserDataCrypto crypto.EncryptionAlgorithm
SMTPPasswordCrypto crypto.EncryptionAlgorithm
SMSTokenCrypto crypto.EncryptionAlgorithm
statikDir http.FileSystem
}
func NewNotificationQueries(
baseQueries *query.Queries,
es *eventstore.Eventstore,
externalPort uint16,
externalSecure bool,
fileSystemPath string,
userDataCrypto crypto.EncryptionAlgorithm,
smtpPasswordCrypto crypto.EncryptionAlgorithm,
smsTokenCrypto crypto.EncryptionAlgorithm,
statikDir http.FileSystem,
) *NotificationQueries {
return &NotificationQueries{
Queries: baseQueries,
es: es,
externalPort: externalPort,
externalSecure: externalSecure,
fileSystemPath: fileSystemPath,
UserDataCrypto: userDataCrypto,
SMTPPasswordCrypto: smtpPasswordCrypto,
SMSTokenCrypto: smsTokenCrypto,
statikDir: statikDir,
}
}

View File

@ -0,0 +1,99 @@
package handlers
import (
"context"
"net/http"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/notification/types"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/quota"
)
const (
QuotaNotificationsProjectionTable = "projections.notifications_quota"
)
type quotaNotifier struct {
crdb.StatementHandler
commands *command.Commands
queries *NotificationQueries
metricSuccessfulDeliveriesJSON string
metricFailedDeliveriesJSON string
}
func NewQuotaNotifier(
ctx context.Context,
config crdb.StatementHandlerConfig,
commands *command.Commands,
queries *NotificationQueries,
metricSuccessfulDeliveriesJSON,
metricFailedDeliveriesJSON string,
) *quotaNotifier {
p := new(quotaNotifier)
config.ProjectionName = QuotaNotificationsProjectionTable
config.Reducers = p.reducers()
p.StatementHandler = crdb.NewStatementHandler(ctx, config)
p.commands = commands
p.queries = queries
p.metricSuccessfulDeliveriesJSON = metricSuccessfulDeliveriesJSON
p.metricFailedDeliveriesJSON = metricFailedDeliveriesJSON
projection.NotificationsQuotaProjection = p
return p
}
func (u *quotaNotifier) reducers() []handler.AggregateReducer {
return []handler.AggregateReducer{
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.NotificationDueEventType,
Reduce: u.reduceNotificationDue,
},
},
},
}
}
func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*quota.NotificationDueEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
err = types.SendJSON(
ctx,
webhook.Config{
CallURL: e.CallURL,
Method: http.MethodPost,
},
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
e,
e,
u.metricSuccessfulDeliveriesJSON,
u.metricFailedDeliveriesJSON,
).WithoutTemplate()
if err != nil {
return nil, err
}
err = u.commands.UsageNotificationSent(ctx, e)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}

View File

@ -0,0 +1,39 @@
package handlers
import (
"context"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/i18n"
)
func (n *NotificationQueries) GetTranslatorWithOrgTexts(ctx context.Context, orgID, textType string) (*i18n.Translator, error) {
translator, err := i18n.NewTranslator(n.statikDir, n.GetDefaultLanguage(ctx), "")
if err != nil {
return nil, err
}
allCustomTexts, err := n.CustomTextListByTemplate(ctx, authz.GetInstance(ctx).InstanceID(), textType, false)
if err != nil {
return translator, nil
}
customTexts, err := n.CustomTextListByTemplate(ctx, orgID, textType, false)
if err != nil {
return translator, nil
}
allCustomTexts.CustomTexts = append(allCustomTexts.CustomTexts, customTexts.CustomTexts...)
for _, text := range allCustomTexts.CustomTexts {
msg := i18n.Message{
ID: text.Template + "." + text.Key,
Text: text.Text,
}
err = translator.AddMessages(text.Language, msg)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "orgID", orgID, "messageType", textType, "messageID", msg.ID).
OnError(err).
Warn("could not add translation message")
}
return translator, nil
}

View File

@ -0,0 +1,589 @@
package handlers
import (
"context"
"time"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
"github.com/zitadel/zitadel/internal/notification/types"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/user"
)
const (
UserNotificationsProjectionTable = "projections.notifications"
)
type userNotifier struct {
crdb.StatementHandler
commands *command.Commands
queries *NotificationQueries
assetsPrefix func(context.Context) string
metricSuccessfulDeliveriesEmail,
metricFailedDeliveriesEmail,
metricSuccessfulDeliveriesSMS,
metricFailedDeliveriesSMS string
}
func NewUserNotifier(
ctx context.Context,
config crdb.StatementHandlerConfig,
commands *command.Commands,
queries *NotificationQueries,
assetsPrefix func(context.Context) string,
metricSuccessfulDeliveriesEmail,
metricFailedDeliveriesEmail,
metricSuccessfulDeliveriesSMS,
metricFailedDeliveriesSMS string,
) *userNotifier {
p := new(userNotifier)
config.ProjectionName = UserNotificationsProjectionTable
config.Reducers = p.reducers()
p.StatementHandler = crdb.NewStatementHandler(ctx, config)
p.commands = commands
p.queries = queries
p.assetsPrefix = assetsPrefix
p.metricSuccessfulDeliveriesEmail = metricSuccessfulDeliveriesEmail
p.metricFailedDeliveriesEmail = metricFailedDeliveriesEmail
p.metricSuccessfulDeliveriesSMS = metricSuccessfulDeliveriesSMS
p.metricFailedDeliveriesSMS = metricFailedDeliveriesSMS
projection.NotificationsProjection = p
return p
}
func (u *userNotifier) reducers() []handler.AggregateReducer {
return []handler.AggregateReducer{
{
Aggregate: user.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: user.UserV1InitialCodeAddedType,
Reduce: u.reduceInitCodeAdded,
},
{
Event: user.HumanInitialCodeAddedType,
Reduce: u.reduceInitCodeAdded,
},
{
Event: user.UserV1EmailCodeAddedType,
Reduce: u.reduceEmailCodeAdded,
},
{
Event: user.HumanEmailCodeAddedType,
Reduce: u.reduceEmailCodeAdded,
},
{
Event: user.UserV1PasswordCodeAddedType,
Reduce: u.reducePasswordCodeAdded,
},
{
Event: user.HumanPasswordCodeAddedType,
Reduce: u.reducePasswordCodeAdded,
},
{
Event: user.UserDomainClaimedType,
Reduce: u.reduceDomainClaimed,
},
{
Event: user.HumanPasswordlessInitCodeRequestedType,
Reduce: u.reducePasswordlessCodeRequested,
},
{
Event: user.UserV1PhoneCodeAddedType,
Reduce: u.reducePhoneCodeAdded,
},
{
Event: user.HumanPhoneCodeAddedType,
Reduce: u.reducePhoneCodeAdded,
},
{
Event: user.HumanPasswordChangedType,
Reduce: u.reducePasswordChanged,
},
},
},
}
}
func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanInitialCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType,
user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto)
if err != nil {
return nil, err
}
colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.InitCodeMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := u.queries.Origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
u.queries.GetSMTPConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesEmail,
u.metricFailedDeliveriesEmail,
).SendUserInitCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = u.commands.HumanInitCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (u *userNotifier) reduceEmailCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanEmailCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-SWf3g", "reduce.wrong.event.type %s", user.HumanEmailCodeAddedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType,
user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto)
if err != nil {
return nil, err
}
colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyEmailMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := u.queries.Origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
u.queries.GetSMTPConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesEmail,
u.metricFailedDeliveriesEmail,
).SendEmailVerificationCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = u.commands.HumanEmailVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (u *userNotifier) reducePasswordCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPasswordCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Eeg3s", "reduce.wrong.event.type %s", user.HumanPasswordCodeAddedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType,
user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto)
if err != nil {
return nil, err
}
colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordResetMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := u.queries.Origin(ctx)
if err != nil {
return nil, err
}
notify := types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
u.queries.GetSMTPConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesEmail,
u.metricFailedDeliveriesEmail,
)
if e.NotificationType == domain.NotificationTypeSms {
notify = types.SendSMSTwilio(
ctx,
translator,
notifyUser,
u.queries.GetTwilioConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesSMS,
u.metricFailedDeliveriesSMS,
)
}
err = notify.SendPasswordCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = u.commands.PasswordCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.DomainClaimedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.DomainClaimedMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := u.queries.Origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
u.queries.GetSMTPConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesEmail,
u.metricFailedDeliveriesEmail,
).SendDomainClaimed(notifyUser, origin, e.UserName)
if err != nil {
return nil, err
}
err = u.commands.UserDomainClaimedSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (u *userNotifier) reducePasswordlessCodeRequested(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPasswordlessInitCodeRequestedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EDtjd", "reduce.wrong.event.type %s", user.HumanPasswordlessInitCodeAddedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto)
if err != nil {
return nil, err
}
colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordlessRegistrationMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := u.queries.Origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
u.queries.GetSMTPConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesEmail,
u.metricFailedDeliveriesEmail,
).SendPasswordlessRegistrationLink(notifyUser, origin, code, e.ID)
if err != nil {
return nil, err
}
err = u.commands.HumanPasswordlessInitCodeSent(ctx, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPasswordChangedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
notificationPolicy, err := u.queries.NotificationPolicyByOrg(ctx, true, e.Aggregate().ResourceOwner, false)
if errors.IsNotFound(err) {
return crdb.NewNoOpStatement(e), nil
}
if err != nil {
return nil, err
}
if notificationPolicy.PasswordChange {
colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := u.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordChangeMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := u.queries.Origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
u.queries.GetSMTPConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesEmail,
u.metricFailedDeliveriesEmail,
).SendPasswordChange(notifyUser, origin)
if err != nil {
return nil, err
}
err = u.commands.PasswordChangeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
}
return crdb.NewNoOpStatement(e), nil
}
func (u *userNotifier) reducePhoneCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPhoneCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-He83g", "reduce.wrong.event.type %s", user.HumanPhoneCodeAddedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType,
user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, u.queries.UserDataCrypto)
if err != nil {
return nil, err
}
colors, err := u.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := u.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := u.queries.GetTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyPhoneMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := u.queries.Origin(ctx)
if err != nil {
return nil, err
}
err = types.SendSMSTwilio(
ctx,
translator,
notifyUser,
u.queries.GetTwilioConfig,
u.queries.GetFileSystemProvider,
u.queries.GetLogProvider,
colors,
u.assetsPrefix(ctx),
e,
u.metricSuccessfulDeliveriesSMS,
u.metricFailedDeliveriesSMS,
).SendPhoneVerificationCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = u.commands.HumanPhoneVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, event eventstore.Event, expiry time.Duration, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
if event.CreationDate().Add(expiry).Before(time.Now().UTC()) {
return true, nil
}
return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...)
}

View File

@ -5,6 +5,7 @@ import (
"regexp"
"strings"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/notification/channels"
)
@ -16,16 +17,17 @@ var (
var _ channels.Message = (*Email)(nil)
type Email struct {
Recipients []string
BCC []string
CC []string
SenderEmail string
SenderName string
Subject string
Content string
Recipients []string
BCC []string
CC []string
SenderEmail string
SenderName string
Subject string
Content string
TriggeringEvent eventstore.Event
}
func (msg *Email) GetContent() string {
func (msg *Email) GetContent() (string, error) {
headers := make(map[string]string)
from := msg.SenderEmail
if msg.SenderName != "" {
@ -49,7 +51,11 @@ func (msg *Email) GetContent() string {
subject := "Subject: " + msg.Subject + lineBreak
message += subject + mime + lineBreak + msg.Content
return message
return message, nil
}
func (msg *Email) GetTriggeringEvent() eventstore.Event {
return msg.TriggeringEvent
}
func isHTML(input string) bool {

View File

@ -0,0 +1,24 @@
package messages
import (
"encoding/json"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/notification/channels"
)
var _ channels.Message = (*JSON)(nil)
type JSON struct {
Serializable interface{}
TriggeringEvent eventstore.Event
}
func (msg *JSON) GetContent() (string, error) {
bytes, err := json.Marshal(msg.Serializable)
return string(bytes), err
}
func (msg *JSON) GetTriggeringEvent() eventstore.Event {
return msg.TriggeringEvent
}

View File

@ -1,6 +1,9 @@
package messages
import "github.com/zitadel/zitadel/internal/notification/channels"
import (
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/notification/channels"
)
var _ channels.Message = (*SMS)(nil)
@ -8,8 +11,13 @@ type SMS struct {
SenderPhoneNumber string
RecipientPhoneNumber string
Content string
TriggeringEvent eventstore.Event
}
func (msg *SMS) GetContent() string {
return msg.Content
func (msg *SMS) GetContent() (string, error) {
return msg.Content, nil
}
func (msg *SMS) GetTriggeringEvent() eventstore.Event {
return msg.TriggeringEvent
}

View File

@ -1,738 +0,0 @@
package notification
import (
"context"
"net/http"
"time"
statik_fs "github.com/rakyll/statik/fs"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
http_utils "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
"github.com/zitadel/zitadel/internal/i18n"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/smtp"
"github.com/zitadel/zitadel/internal/notification/channels/twilio"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/notification/types"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/user"
)
const (
NotificationsProjectionTable = "projections.notifications"
NotifyUserID = "NOTIFICATION" //TODO: system?
)
func Start(ctx context.Context, customConfig projection.CustomConfig, externalPort uint16, externalSecure bool, commands *command.Commands, queries *query.Queries, es *eventstore.Eventstore, assetsPrefix func(context.Context) string, fileSystemPath string, userEncryption, smtpEncryption, smsEncryption crypto.EncryptionAlgorithm) {
statikFS, err := statik_fs.NewWithNamespace("notification")
logging.OnError(err).Panic("unable to start listener")
projection.NotificationsProjection = newNotificationsProjection(ctx, projection.ApplyCustomConfig(customConfig), commands, queries, es, userEncryption, smtpEncryption, smsEncryption, externalSecure, externalPort, fileSystemPath, assetsPrefix, statikFS)
}
type notificationsProjection struct {
crdb.StatementHandler
commands *command.Commands
queries *query.Queries
es *eventstore.Eventstore
userDataCrypto crypto.EncryptionAlgorithm
smtpPasswordCrypto crypto.EncryptionAlgorithm
smsTokenCrypto crypto.EncryptionAlgorithm
assetsPrefix func(context.Context) string
fileSystemPath string
externalPort uint16
externalSecure bool
statikDir http.FileSystem
}
func newNotificationsProjection(
ctx context.Context,
config crdb.StatementHandlerConfig,
commands *command.Commands,
queries *query.Queries,
es *eventstore.Eventstore,
userDataCrypto,
smtpPasswordCrypto,
smsTokenCrypto crypto.EncryptionAlgorithm,
externalSecure bool,
externalPort uint16,
fileSystemPath string,
assetsPrefix func(context.Context) string,
statikDir http.FileSystem,
) *notificationsProjection {
p := new(notificationsProjection)
config.ProjectionName = NotificationsProjectionTable
config.Reducers = p.reducers()
p.StatementHandler = crdb.NewStatementHandler(ctx, config)
p.commands = commands
p.queries = queries
p.es = es
p.userDataCrypto = userDataCrypto
p.smtpPasswordCrypto = smtpPasswordCrypto
p.smsTokenCrypto = smsTokenCrypto
p.assetsPrefix = assetsPrefix
p.externalPort = externalPort
p.externalSecure = externalSecure
p.fileSystemPath = fileSystemPath
p.statikDir = statikDir
// needs to be started here as it is not part of the projection.projections / projection.newProjectionsList()
p.Start()
return p
}
func (p *notificationsProjection) reducers() []handler.AggregateReducer {
return []handler.AggregateReducer{
{
Aggregate: user.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: user.UserV1InitialCodeAddedType,
Reduce: p.reduceInitCodeAdded,
},
{
Event: user.HumanInitialCodeAddedType,
Reduce: p.reduceInitCodeAdded,
},
{
Event: user.UserV1EmailCodeAddedType,
Reduce: p.reduceEmailCodeAdded,
},
{
Event: user.HumanEmailCodeAddedType,
Reduce: p.reduceEmailCodeAdded,
},
{
Event: user.UserV1PasswordCodeAddedType,
Reduce: p.reducePasswordCodeAdded,
},
{
Event: user.HumanPasswordCodeAddedType,
Reduce: p.reducePasswordCodeAdded,
},
{
Event: user.UserDomainClaimedType,
Reduce: p.reduceDomainClaimed,
},
{
Event: user.HumanPasswordlessInitCodeRequestedType,
Reduce: p.reducePasswordlessCodeRequested,
},
{
Event: user.UserV1PhoneCodeAddedType,
Reduce: p.reducePhoneCodeAdded,
},
{
Event: user.HumanPhoneCodeAddedType,
Reduce: p.reducePhoneCodeAdded,
},
{
Event: user.HumanPasswordChangedType,
Reduce: p.reducePasswordChanged,
},
},
},
}
}
func (p *notificationsProjection) reduceInitCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanInitialCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
}
ctx := setNotificationContext(event.Aggregate())
alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType,
user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, p.userDataCrypto)
if err != nil {
return nil, err
}
colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.InitCodeMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := p.origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
p.getSMTPConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
).SendUserInitCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = p.commands.HumanInitCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (p *notificationsProjection) reduceEmailCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanEmailCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-SWf3g", "reduce.wrong.event.type %s", user.HumanEmailCodeAddedType)
}
ctx := setNotificationContext(event.Aggregate())
alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType,
user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, p.userDataCrypto)
if err != nil {
return nil, err
}
colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyEmailMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := p.origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
p.getSMTPConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
).SendEmailVerificationCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = p.commands.HumanEmailVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (p *notificationsProjection) reducePasswordCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPasswordCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Eeg3s", "reduce.wrong.event.type %s", user.HumanPasswordCodeAddedType)
}
ctx := setNotificationContext(event.Aggregate())
alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType,
user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, p.userDataCrypto)
if err != nil {
return nil, err
}
colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordResetMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := p.origin(ctx)
if err != nil {
return nil, err
}
notify := types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
p.getSMTPConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
)
if e.NotificationType == domain.NotificationTypeSms {
notify = types.SendSMSTwilio(
ctx,
translator,
notifyUser,
p.getTwilioConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
)
}
err = notify.SendPasswordCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = p.commands.PasswordCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (p *notificationsProjection) reduceDomainClaimed(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.DomainClaimedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
}
ctx := setNotificationContext(event.Aggregate())
alreadyHandled, err := p.checkIfAlreadyHandled(ctx, event, nil,
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.DomainClaimedMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := p.origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
p.getSMTPConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
).SendDomainClaimed(notifyUser, origin, e.UserName)
if err != nil {
return nil, err
}
err = p.commands.UserDomainClaimedSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (p *notificationsProjection) reducePasswordlessCodeRequested(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPasswordlessInitCodeRequestedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-EDtjd", "reduce.wrong.event.type %s", user.HumanPasswordlessInitCodeAddedType)
}
ctx := setNotificationContext(event.Aggregate())
alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, p.userDataCrypto)
if err != nil {
return nil, err
}
colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordlessRegistrationMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := p.origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
p.getSMTPConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
).SendPasswordlessRegistrationLink(notifyUser, origin, code, e.ID)
if err != nil {
return nil, err
}
err = p.commands.HumanPasswordlessInitCodeSent(ctx, e.Aggregate().ID, e.Aggregate().ResourceOwner, e.ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (p *notificationsProjection) reducePasswordChanged(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPasswordChangedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
}
ctx := setNotificationContext(event.Aggregate())
alreadyHandled, err := p.checkIfAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
notificationPolicy, err := p.queries.NotificationPolicyByOrg(ctx, true, e.Aggregate().ResourceOwner, false)
if errors.IsNotFound(err) {
return crdb.NewNoOpStatement(e), nil
}
if err != nil {
return nil, err
}
if notificationPolicy.PasswordChange {
colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
template, err := p.queries.MailTemplateByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.PasswordChangeMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := p.origin(ctx)
if err != nil {
return nil, err
}
err = types.SendEmail(
ctx,
string(template.Template),
translator,
notifyUser,
p.getSMTPConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
).SendPasswordChange(notifyUser, origin)
if err != nil {
return nil, err
}
err = p.commands.PasswordChangeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
}
return crdb.NewNoOpStatement(e), nil
}
func (p *notificationsProjection) reducePhoneCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanPhoneCodeAddedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-He83g", "reduce.wrong.event.type %s", user.HumanPhoneCodeAddedType)
}
ctx := setNotificationContext(event.Aggregate())
alreadyHandled, err := p.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType,
user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
code, err := crypto.DecryptString(e.Code, p.userDataCrypto)
if err != nil {
return nil, err
}
colors, err := p.queries.ActiveLabelPolicyByOrg(ctx, e.Aggregate().ResourceOwner, false)
if err != nil {
return nil, err
}
notifyUser, err := p.queries.GetNotifyUserByID(ctx, true, e.Aggregate().ID, false)
if err != nil {
return nil, err
}
translator, err := p.getTranslatorWithOrgTexts(ctx, notifyUser.ResourceOwner, domain.VerifyPhoneMessageType)
if err != nil {
return nil, err
}
ctx, origin, err := p.origin(ctx)
if err != nil {
return nil, err
}
err = types.SendSMSTwilio(
ctx,
translator,
notifyUser,
p.getTwilioConfig,
p.getFileSystemProvider,
p.getLogProvider,
colors,
p.assetsPrefix(ctx),
).SendPhoneVerificationCode(notifyUser, origin, code)
if err != nil {
return nil, err
}
err = p.commands.HumanPhoneVerificationCodeSent(ctx, e.Aggregate().ResourceOwner, e.Aggregate().ID)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}
func (p *notificationsProjection) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, event eventstore.Event, expiry time.Duration, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
if event.CreationDate().Add(expiry).Before(time.Now().UTC()) {
return true, nil
}
return p.checkIfAlreadyHandled(ctx, event, data, eventTypes...)
}
func (p *notificationsProjection) checkIfAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
events, err := p.es.Filter(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
InstanceID(event.Aggregate().InstanceID).
AddQuery().
AggregateTypes(user.AggregateType).
AggregateIDs(event.Aggregate().ID).
SequenceGreater(event.Sequence()).
EventTypes(eventTypes...).
EventData(data).
Builder(),
)
if err != nil {
return false, err
}
return len(events) > 0, nil
}
func (p *notificationsProjection) getSMTPConfig(ctx context.Context) (*smtp.Config, error) {
config, err := p.queries.SMTPConfigByAggregateID(ctx, authz.GetInstance(ctx).InstanceID())
if err != nil {
return nil, err
}
password, err := crypto.DecryptString(config.Password, p.smtpPasswordCrypto)
if err != nil {
return nil, err
}
return &smtp.Config{
From: config.SenderAddress,
FromName: config.SenderName,
Tls: config.TLS,
SMTP: smtp.SMTP{
Host: config.Host,
User: config.User,
Password: password,
},
}, nil
}
// Read iam twilio config
func (p *notificationsProjection) getTwilioConfig(ctx context.Context) (*twilio.Config, error) {
active, err := query.NewSMSProviderStateQuery(domain.SMSConfigStateActive)
if err != nil {
return nil, err
}
config, err := p.queries.SMSProviderConfig(ctx, active)
if err != nil {
return nil, err
}
if config.TwilioConfig == nil {
return nil, errors.ThrowNotFound(nil, "HANDLER-8nfow", "Errors.SMS.Twilio.NotFound")
}
token, err := crypto.DecryptString(config.TwilioConfig.Token, p.smsTokenCrypto)
if err != nil {
return nil, err
}
return &twilio.Config{
SID: config.TwilioConfig.SID,
Token: token,
SenderNumber: config.TwilioConfig.SenderNumber,
}, nil
}
// Read iam filesystem provider config
func (p *notificationsProjection) getFileSystemProvider(ctx context.Context) (*fs.Config, error) {
config, err := p.queries.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeFile)
if err != nil {
return nil, err
}
return &fs.Config{
Compact: config.Compact,
Path: p.fileSystemPath,
}, nil
}
// Read iam log provider config
func (p *notificationsProjection) getLogProvider(ctx context.Context) (*log.Config, error) {
config, err := p.queries.NotificationProviderByIDAndType(ctx, authz.GetInstance(ctx).InstanceID(), domain.NotificationProviderTypeLog)
if err != nil {
return nil, err
}
return &log.Config{
Compact: config.Compact,
}, nil
}
func (p *notificationsProjection) getTranslatorWithOrgTexts(ctx context.Context, orgID, textType string) (*i18n.Translator, error) {
translator, err := i18n.NewTranslator(p.statikDir, p.queries.GetDefaultLanguage(ctx), "")
if err != nil {
return nil, err
}
allCustomTexts, err := p.queries.CustomTextListByTemplate(ctx, authz.GetInstance(ctx).InstanceID(), textType, false)
if err != nil {
return translator, nil
}
customTexts, err := p.queries.CustomTextListByTemplate(ctx, orgID, textType, false)
if err != nil {
return translator, nil
}
allCustomTexts.CustomTexts = append(allCustomTexts.CustomTexts, customTexts.CustomTexts...)
for _, text := range allCustomTexts.CustomTexts {
msg := i18n.Message{
ID: text.Template + "." + text.Key,
Text: text.Text,
}
err = translator.AddMessages(text.Language, msg)
logging.WithFields("instanceID", authz.GetInstance(ctx).InstanceID(), "orgID", orgID, "messageType", textType, "messageID", msg.ID).
OnError(err).
Warn("could not add translation message")
}
return translator, nil
}
func (p *notificationsProjection) origin(ctx context.Context) (context.Context, string, error) {
primary, err := query.NewInstanceDomainPrimarySearchQuery(true)
if err != nil {
return ctx, "", err
}
domains, err := p.queries.SearchInstanceDomains(ctx, &query.InstanceDomainSearchQueries{
Queries: []query.SearchQuery{primary},
})
if err != nil {
return ctx, "", err
}
if len(domains.Domains) < 1 {
return ctx, "", errors.ThrowInternal(nil, "NOTIF-Ef3r1", "Errors.Notification.NoDomain")
}
ctx = authz.WithRequestedDomain(ctx, domains.Domains[0].Domain)
return ctx, http_utils.BuildHTTP(domains.Domains[0].Domain, p.externalPort, p.externalSecure), nil
}
func setNotificationContext(event eventstore.Aggregate) context.Context {
ctx := authz.WithInstanceID(context.Background(), event.InstanceID)
return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner})
}

View File

@ -0,0 +1,77 @@
package notification
import (
"context"
statik_fs "github.com/rakyll/statik/fs"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/notification/handlers"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/telemetry/metrics"
)
const (
metricSuccessfulDeliveriesEmail = "successful_deliveries_email"
metricFailedDeliveriesEmail = "failed_deliveries_email"
metricSuccessfulDeliveriesSMS = "successful_deliveries_sms"
metricFailedDeliveriesSMS = "failed_deliveries_sms"
metricSuccessfulDeliveriesJSON = "successful_deliveries_json"
metricFailedDeliveriesJSON = "failed_deliveries_json"
)
func Start(
ctx context.Context,
userHandlerCustomConfig projection.CustomConfig,
quotaHandlerCustomConfig projection.CustomConfig,
externalPort uint16,
externalSecure bool,
commands *command.Commands,
queries *query.Queries,
es *eventstore.Eventstore,
assetsPrefix func(context.Context) string,
fileSystemPath string,
userEncryption,
smtpEncryption,
smsEncryption crypto.EncryptionAlgorithm,
) {
statikFS, err := statik_fs.NewWithNamespace("notification")
logging.OnError(err).Panic("unable to start listener")
err = metrics.RegisterCounter(metricSuccessfulDeliveriesEmail, "Successfully delivered emails")
logging.WithFields("metric", metricSuccessfulDeliveriesEmail).OnError(err).Panic("unable to register counter")
err = metrics.RegisterCounter(metricFailedDeliveriesEmail, "Failed email deliveries")
logging.WithFields("metric", metricFailedDeliveriesEmail).OnError(err).Panic("unable to register counter")
err = metrics.RegisterCounter(metricSuccessfulDeliveriesSMS, "Successfully delivered SMS")
logging.WithFields("metric", metricSuccessfulDeliveriesSMS).OnError(err).Panic("unable to register counter")
err = metrics.RegisterCounter(metricFailedDeliveriesSMS, "Failed SMS deliveries")
logging.WithFields("metric", metricFailedDeliveriesSMS).OnError(err).Panic("unable to register counter")
err = metrics.RegisterCounter(metricSuccessfulDeliveriesJSON, "Successfully delivered JSON messages")
logging.WithFields("metric", metricSuccessfulDeliveriesJSON).OnError(err).Panic("unable to register counter")
err = metrics.RegisterCounter(metricFailedDeliveriesJSON, "Failed JSON message deliveries")
logging.WithFields("metric", metricFailedDeliveriesJSON).OnError(err).Panic("unable to register counter")
q := handlers.NewNotificationQueries(queries, es, externalPort, externalSecure, fileSystemPath, userEncryption, smtpEncryption, smsEncryption, statikFS)
handlers.NewUserNotifier(
ctx,
projection.ApplyCustomConfig(userHandlerCustomConfig),
commands,
q,
assetsPrefix,
metricSuccessfulDeliveriesEmail,
metricFailedDeliveriesEmail,
metricSuccessfulDeliveriesSMS,
metricFailedDeliveriesSMS,
).Start()
handlers.NewQuotaNotifier(
ctx,
projection.ApplyCustomConfig(quotaHandlerCustomConfig),
commands,
q,
metricSuccessfulDeliveriesJSON,
metricFailedDeliveriesJSON,
).Start()
}

View File

@ -3,17 +3,42 @@ package senders
import (
"context"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/instrumenting"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/smtp"
)
func EmailChannels(ctx context.Context, emailConfig func(ctx context.Context) (*smtp.Config, error), getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error)) (chain *Chain, err error) {
const smtpSpanName = "smtp.NotificationChannel"
func EmailChannels(
ctx context.Context,
emailConfig func(ctx context.Context) (*smtp.Config, error),
getFileSystemProvider func(ctx context.Context) (*fs.Config, error),
getLogProvider func(ctx context.Context) (*log.Config, error),
successMetricName,
failureMetricName string,
) (chain *Chain, err error) {
channels := make([]channels.NotificationChannel, 0, 3)
p, err := smtp.InitSMTPChannel(ctx, emailConfig)
p, err := smtp.InitChannel(ctx, emailConfig)
logging.WithFields(
"instance", authz.GetInstance(ctx).InstanceID(),
).OnError(err).Debug("initializing SMTP channel failed")
if err == nil {
channels = append(channels, p)
channels = append(
channels,
instrumenting.Wrap(
ctx,
p,
smtpSpanName,
successMetricName,
failureMetricName,
),
)
}
channels = append(channels, debugChannels(ctx, getFileSystemProvider, getLogProvider)...)
return chainChannels(channels...), nil

View File

@ -0,0 +1,49 @@
package senders
import (
"context"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/instrumenting"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
)
const webhookSpanName = "webhook.NotificationChannel"
func JSONChannels(
ctx context.Context,
webhookConfig webhook.Config,
getFileSystemProvider func(ctx context.Context) (*fs.Config, error),
getLogProvider func(ctx context.Context) (*log.Config, error),
successMetricName,
failureMetricName string,
) (*Chain, error) {
if err := webhookConfig.Validate(); err != nil {
return nil, err
}
channels := make([]channels.NotificationChannel, 0, 3)
webhookChannel, err := webhook.InitChannel(ctx, webhookConfig)
logging.WithFields(
"instance", authz.GetInstance(ctx).InstanceID(),
"callurl", webhookConfig.CallURL,
).OnError(err).Debug("initializing JSON channel failed")
if err == nil {
channels = append(
channels,
instrumenting.Wrap(
ctx,
webhookChannel,
webhookSpanName,
successMetricName,
failureMetricName,
),
)
}
channels = append(channels, debugChannels(ctx, getFileSystemProvider, getLogProvider)...)
return chainChannels(channels...), nil
}

View File

@ -5,14 +5,33 @@ import (
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/instrumenting"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/twilio"
)
func SMSChannels(ctx context.Context, twilioConfig *twilio.Config, getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error)) (chain *Chain, err error) {
const twilioSpanName = "twilio.NotificationChannel"
func SMSChannels(
ctx context.Context,
twilioConfig *twilio.Config,
getFileSystemProvider func(ctx context.Context) (*fs.Config, error),
getLogProvider func(ctx context.Context) (*log.Config, error),
successMetricName,
failureMetricName string,
) (chain *Chain, err error) {
channels := make([]channels.NotificationChannel, 0, 3)
if twilioConfig != nil {
channels = append(channels, twilio.InitTwilioChannel(*twilioConfig))
channels = append(
channels,
instrumenting.Wrap(
ctx,
twilio.InitChannel(*twilioConfig),
twilioSpanName,
successMetricName,
failureMetricName,
),
)
}
channels = append(channels, debugChannels(ctx, getFileSystemProvider, getLogProvider)...)
return chainChannels(channels...), nil

View File

@ -0,0 +1,40 @@
package types
import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
"github.com/zitadel/zitadel/internal/notification/messages"
"github.com/zitadel/zitadel/internal/notification/senders"
)
func handleJSON(
ctx context.Context,
webhookConfig webhook.Config,
getFileSystemProvider func(ctx context.Context) (*fs.Config, error),
getLogProvider func(ctx context.Context) (*log.Config, error),
serializable interface{},
triggeringEvent eventstore.Event,
successMetricName,
failureMetricName string,
) error {
message := &messages.JSON{
Serializable: serializable,
TriggeringEvent: triggeringEvent,
}
channelChain, err := senders.JSONChannels(
ctx,
webhookConfig,
getFileSystemProvider,
getLogProvider,
successMetricName,
failureMetricName,
)
if err != nil {
return err
}
return channelChain.HandleMessage(message)
}

View File

@ -3,11 +3,13 @@ package types
import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/i18n"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/smtp"
"github.com/zitadel/zitadel/internal/notification/channels/twilio"
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
"github.com/zitadel/zitadel/internal/notification/templates"
"github.com/zitadel/zitadel/internal/query"
)
@ -29,6 +31,9 @@ func SendEmail(
getLogProvider func(ctx context.Context) (*log.Config, error),
colors *query.LabelPolicy,
assetsPrefix string,
triggeringEvent eventstore.Event,
successMetricName,
failureMetricName string,
) Notify {
return func(
url string,
@ -42,7 +47,19 @@ func SendEmail(
if err != nil {
return err
}
return generateEmail(ctx, user, data.Subject, template, emailConfig, getFileSystemProvider, getLogProvider, allowUnverifiedNotificationChannel)
return generateEmail(
ctx,
user,
data.Subject,
template,
emailConfig,
getFileSystemProvider,
getLogProvider,
allowUnverifiedNotificationChannel,
triggeringEvent,
successMetricName,
failureMetricName,
)
}
}
@ -55,6 +72,9 @@ func SendSMSTwilio(
getLogProvider func(ctx context.Context) (*log.Config, error),
colors *query.LabelPolicy,
assetsPrefix string,
triggeringEvent eventstore.Event,
successMetricName,
failureMetricName string,
) Notify {
return func(
url string,
@ -64,10 +84,41 @@ func SendSMSTwilio(
) error {
args = mapNotifyUserToArgs(user, args)
data := GetTemplateData(translator, args, assetsPrefix, url, messageType, user.PreferredLanguage.String(), colors)
return generateSms(ctx, user, data.Text, twilioConfig, getFileSystemProvider, getLogProvider, allowUnverifiedNotificationChannel)
return generateSms(
ctx,
user,
data.Text,
twilioConfig,
getFileSystemProvider,
getLogProvider,
allowUnverifiedNotificationChannel,
triggeringEvent,
successMetricName,
failureMetricName,
)
}
}
func externalLink(origin string) string {
return origin + "/ui/login"
func SendJSON(
ctx context.Context,
webhookConfig webhook.Config,
getFileSystemProvider func(ctx context.Context) (*fs.Config, error),
getLogProvider func(ctx context.Context) (*log.Config, error),
serializable interface{},
triggeringEvent eventstore.Event,
successMetricName,
failureMetricName string,
) Notify {
return func(_ string, _ map[string]interface{}, _ string, _ bool) error {
return handleJSON(
ctx,
webhookConfig,
getFileSystemProvider,
getLogProvider,
serializable,
triggeringEvent,
successMetricName,
failureMetricName,
)
}
}

View File

@ -4,7 +4,8 @@ import (
"context"
"html"
caos_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/smtp"
@ -13,24 +14,44 @@ import (
"github.com/zitadel/zitadel/internal/query"
)
func generateEmail(ctx context.Context, user *query.NotifyUser, subject, content string, smtpConfig func(ctx context.Context) (*smtp.Config, error), getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error), lastEmail bool) error {
func generateEmail(
ctx context.Context,
user *query.NotifyUser,
subject,
content string,
smtpConfig func(ctx context.Context) (*smtp.Config, error),
getFileSystemProvider func(ctx context.Context) (*fs.Config, error),
getLogProvider func(ctx context.Context) (*log.Config, error),
lastEmail bool,
triggeringEvent eventstore.Event,
successMetricName,
failureMetricName string,
) error {
content = html.UnescapeString(content)
message := &messages.Email{
Recipients: []string{user.VerifiedEmail},
Subject: subject,
Content: content,
Recipients: []string{user.VerifiedEmail},
Subject: subject,
Content: content,
TriggeringEvent: triggeringEvent,
}
if lastEmail {
message.Recipients = []string{user.LastEmail}
}
channelChain, err := senders.EmailChannels(ctx, smtpConfig, getFileSystemProvider, getLogProvider)
channelChain, err := senders.EmailChannels(
ctx,
smtpConfig,
getFileSystemProvider,
getLogProvider,
successMetricName,
failureMetricName,
)
if err != nil {
return err
}
if channelChain.Len() == 0 {
return caos_errors.ThrowPreconditionFailed(nil, "MAIL-83nof", "Errors.Notification.Channels.NotPresent")
return errors.ThrowPreconditionFailed(nil, "MAIL-83nof", "Errors.Notification.Channels.NotPresent")
}
return channelChain.HandleMessage(message)
}

View File

@ -5,7 +5,8 @@ import (
"github.com/zitadel/logging"
caos_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/notification/channels/fs"
"github.com/zitadel/zitadel/internal/notification/channels/log"
"github.com/zitadel/zitadel/internal/notification/channels/twilio"
@ -14,7 +15,18 @@ import (
"github.com/zitadel/zitadel/internal/query"
)
func generateSms(ctx context.Context, user *query.NotifyUser, content string, getTwilioProvider func(ctx context.Context) (*twilio.Config, error), getFileSystemProvider func(ctx context.Context) (*fs.Config, error), getLogProvider func(ctx context.Context) (*log.Config, error), lastPhone bool) error {
func generateSms(
ctx context.Context,
user *query.NotifyUser,
content string,
getTwilioProvider func(ctx context.Context) (*twilio.Config, error),
getFileSystemProvider func(ctx context.Context) (*fs.Config, error),
getLogProvider func(ctx context.Context) (*log.Config, error),
lastPhone bool,
triggeringEvent eventstore.Event,
successMetricName,
failureMetricName string,
) error {
number := ""
twilioConfig, err := getTwilioProvider(ctx)
if err == nil {
@ -24,16 +36,24 @@ func generateSms(ctx context.Context, user *query.NotifyUser, content string, ge
SenderPhoneNumber: number,
RecipientPhoneNumber: user.VerifiedPhone,
Content: content,
TriggeringEvent: triggeringEvent,
}
if lastPhone {
message.RecipientPhoneNumber = user.LastPhone
}
channelChain, err := senders.SMSChannels(ctx, twilioConfig, getFileSystemProvider, getLogProvider)
channelChain, err := senders.SMSChannels(
ctx,
twilioConfig,
getFileSystemProvider,
getLogProvider,
successMetricName,
failureMetricName,
)
logging.OnError(err).Error("could not create sms channel")
if channelChain.Len() == 0 {
return caos_errors.ThrowPreconditionFailed(nil, "PHONE-w8nfow", "Errors.Notification.Channels.NotPresent")
return errors.ThrowPreconditionFailed(nil, "PHONE-w8nfow", "Errors.Notification.Channels.NotPresent")
}
return channelChain.HandleMessage(message)
}

View File

@ -0,0 +1,5 @@
package types
func (notify Notify) WithoutTemplate() error {
return notify("", nil, "", false)
}

View File

@ -63,6 +63,7 @@ var (
SecurityPolicyProjection *securityPolicyProjection
NotificationPolicyProjection *notificationPolicyProjection
NotificationsProjection interface{}
NotificationsQuotaProjection interface{}
)
type projection interface {

View File

@ -9,7 +9,7 @@ import (
"github.com/zitadel/zitadel/internal/repository/quota"
)
func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, usedAbs uint64) ([]*quota.NotifiedEvent, error) {
func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, usedAbs uint64) ([]*quota.NotificationDueEvent, error) {
if len(config.Notifications) == 0 {
return nil, nil
}
@ -22,7 +22,7 @@ func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.Ad
usedRel := uint16(math.Floor(float64(usedAbs*100) / float64(config.Amount)))
var dueNotifications []*quota.NotifiedEvent
var dueNotifications []*quota.NotificationDueEvent
for _, notification := range config.Notifications {
if notification.Percent > usedRel {
continue
@ -30,13 +30,13 @@ func (q *Queries) GetDueQuotaNotifications(ctx context.Context, config *quota.Ad
threshold := notification.Percent
if notification.Repeat {
threshold = uint16(math.Min(1, math.Floor(float64(usedRel)/float64(notification.Percent)))) * notification.Percent
threshold = uint16(math.Max(1, math.Floor(float64(usedRel)/float64(notification.Percent)))) * notification.Percent
}
if wm.latestNotifiedThresholds[notification.ID] < threshold {
if wm.latestDueThresholds[notification.ID] < threshold {
dueNotifications = append(
dueNotifications,
quota.NewNotifiedEvent(
quota.NewNotificationDueEvent(
ctx,
&aggregate,
config.Unit,

View File

@ -9,8 +9,8 @@ import (
type quotaNotificationsReadModel struct {
eventstore.ReadModel
periodStart time.Time
latestNotifiedThresholds map[string]uint16
periodStart time.Time
latestDueThresholds map[string]uint16
}
func newQuotaNotificationsReadModel(aggregateId, instanceId, resourceOwner string, periodStart time.Time) *quotaNotificationsReadModel {
@ -20,8 +20,8 @@ func newQuotaNotificationsReadModel(aggregateId, instanceId, resourceOwner strin
InstanceID: instanceId,
ResourceOwner: resourceOwner,
},
periodStart: periodStart,
latestNotifiedThresholds: make(map[string]uint16),
periodStart: periodStart,
latestDueThresholds: make(map[string]uint16),
}
}
@ -34,13 +34,13 @@ func (rm *quotaNotificationsReadModel) Query() *eventstore.SearchQueryBuilder {
AggregateTypes(quota.AggregateType).
AggregateIDs(rm.AggregateID).
CreationDateAfter(rm.periodStart).
EventTypes(quota.NotifiedEventType).Builder()
EventTypes(quota.NotificationDueEventType).Builder()
}
func (rm *quotaNotificationsReadModel) Reduce() error {
for _, event := range rm.Events {
e := event.(*quota.NotifiedEvent)
rm.latestNotifiedThresholds[e.ID] = e.Threshold
e := event.(*quota.NotificationDueEvent)
rm.latestDueThresholds[e.ID] = e.Threshold
}
return rm.ReadModel.Reduce()
}

View File

@ -19,6 +19,7 @@ const (
eventTypePrefix = eventstore.EventType("quota.")
AddedEventType = eventTypePrefix + "added"
NotifiedEventType = eventTypePrefix + "notified"
NotificationDueEventType = eventTypePrefix + "notificationdue"
RemovedEventType = eventTypePrefix + "removed"
)
@ -107,6 +108,62 @@ func AddedEventMapper(event *repository.Event) (eventstore.Event, error) {
return e, nil
}
type NotificationDueEvent struct {
eventstore.BaseEvent `json:"-"`
Unit Unit `json:"unit"`
ID string `json:"id"`
CallURL string `json:"callURL"`
PeriodStart time.Time `json:"periodStart"`
Threshold uint16 `json:"threshold"`
Usage uint64 `json:"usage"`
}
func (n *NotificationDueEvent) Data() interface{} {
return n
}
func (n *NotificationDueEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
return nil
}
func NewNotificationDueEvent(
ctx context.Context,
aggregate *eventstore.Aggregate,
unit Unit,
id string,
callURL string,
periodStart time.Time,
threshold uint16,
usage uint64,
) *NotificationDueEvent {
return &NotificationDueEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
aggregate,
NotificationDueEventType,
),
Unit: unit,
ID: id,
CallURL: callURL,
PeriodStart: periodStart,
Threshold: threshold,
Usage: usage,
}
}
func NotificationDueEventMapper(event *repository.Event) (eventstore.Event, error) {
e := &NotificationDueEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
err := json.Unmarshal(event.Data, e)
if err != nil {
return nil, errors.ThrowInternal(err, "QUOTA-k56rT", "unable to unmarshal notification due")
}
return e, nil
}
type NotifiedEvent struct {
eventstore.BaseEvent `json:"-"`
Unit Unit `json:"unit"`
@ -115,6 +172,7 @@ type NotifiedEvent struct {
PeriodStart time.Time `json:"periodStart"`
Threshold uint16 `json:"threshold"`
Usage uint64 `json:"usage"`
DueEventID string `json:"dueEventID"`
}
func (e *NotifiedEvent) Data() interface{} {
@ -127,26 +185,28 @@ func (e *NotifiedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint
func NewNotifiedEvent(
ctx context.Context,
aggregate *eventstore.Aggregate,
unit Unit,
id string,
callURL string,
periodStart time.Time,
threshold uint16,
usage uint64,
dueEvent *NotificationDueEvent,
) *NotifiedEvent {
aggregate := dueEvent.Aggregate()
return &NotifiedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
aggregate,
&aggregate,
NotifiedEventType,
),
Unit: unit,
ID: id,
CallURL: callURL,
PeriodStart: periodStart,
Threshold: threshold,
Usage: usage,
ID: id,
DueEventID: dueEvent.ID,
// Deprecated: dereference the NotificationDueEvent
Unit: dueEvent.Unit,
// Deprecated: dereference the NotificationDueEvent
CallURL: dueEvent.CallURL,
// Deprecated: dereference the NotificationDueEvent
PeriodStart: dueEvent.PeriodStart,
// Deprecated: dereference the NotificationDueEvent
Threshold: dueEvent.Threshold,
// Deprecated: dereference the NotificationDueEvent
Usage: dueEvent.Usage,
}
}

View File

@ -6,6 +6,7 @@ import (
func RegisterEventMappers(es *eventstore.Eventstore) {
es.RegisterFilterEventMapper(AggregateType, AddedEventType, AddedEventMapper).
RegisterFilterEventMapper(AggregateType, NotifiedEventType, NotifiedEventMapper).
RegisterFilterEventMapper(AggregateType, RemovedEventType, RemovedEventMapper)
RegisterFilterEventMapper(AggregateType, RemovedEventType, RemovedEventMapper).
RegisterFilterEventMapper(AggregateType, NotificationDueEventType, NotificationDueEventMapper).
RegisterFilterEventMapper(AggregateType, NotifiedEventType, NotifiedEventMapper)
}