feat: notifications (#109)

* implement notification providers

* email provider

* notification handler

* notify users

* implement code sent on user eventstore

* send email implementation

* send init code

* handle codes

* fix project member handler

* add some logs for debug

* send emails

* text changes

* send sms

* notification process

* send password code

* format phone number

* test format phone

* remove fmts

* remove unused code

* rename files

* add mocks

* merge master

* Update internal/notification/providers/email/message.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* Update internal/notification/repository/eventsourcing/handler/notification.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* Update internal/notification/repository/eventsourcing/handler/notification.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* Update internal/notification/providers/email/provider.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* requested changes of mr

* move locker to eventstore pkg

* Update internal/notification/providers/chat/message.go

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* move locker to eventstore pkg

* linebreak

* Update internal/notification/providers/email/provider.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* Update internal/notification/repository/eventsourcing/handler/notification.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* Update internal/notification/repository/eventsourcing/handler/notification.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

Co-authored-by: Silvan <silvan.reusser@gmail.com>
Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Fabi
2020-05-20 14:28:08 +02:00
committed by GitHub
parent c365a98cc8
commit e318139b37
67 changed files with 3278 additions and 119 deletions

View File

@@ -0,0 +1,17 @@
package notification
import (
"context"
"github.com/caos/logging"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/notification/repository/eventsourcing"
)
type Config struct {
Repository eventsourcing.Config
}
func Start(ctx context.Context, config Config, systemDefaults sd.SystemDefaults) {
_, err := eventsourcing.Start(config.Repository, systemDefaults)
logging.Log("MAIN-9uBxp").OnError(err).Panic("unable to start app")
}

View File

@@ -0,0 +1,6 @@
package chat
type ChatConfig struct {
Url string
SplitCount int
}

View File

@@ -0,0 +1,9 @@
package chat
type ChatMessage struct {
Text string `json:"text"`
}
func (msg *ChatMessage) GetContent() string {
return msg.Text
}

View File

@@ -0,0 +1,75 @@
package chat
import (
"bytes"
"encoding/json"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/notification/providers"
"net/http"
"net/url"
"unicode/utf8"
)
type Chat struct {
URL *url.URL
SplitCount int
}
func InitChatProvider(config ChatConfig) (*Chat, error) {
url, err := url.Parse(config.Url)
if err != nil {
return nil, err
}
return &Chat{
URL: url,
SplitCount: config.SplitCount,
}, nil
}
func (chat *Chat) CanHandleMessage(_ providers.Message) bool {
return true
}
func (chat *Chat) HandleMessage(message providers.Message) error {
contentText := message.GetContent()
for _, splittedMsg := range splitMessage(contentText, chat.SplitCount) {
chatMsg := &ChatMessage{Text: splittedMsg}
if err := chat.SendMessage(chatMsg); err != nil {
return err
}
}
return nil
}
func (chat *Chat) SendMessage(message providers.Message) error {
chatMsg, ok := message.(*ChatMessage)
if !ok {
return caos_errs.ThrowInternal(nil, "EMAIL-s8JLs", "message is not ChatMessage")
}
req, err := json.Marshal(chatMsg)
if err != nil {
return caos_errs.ThrowInternal(err, "PROVI-s8uie", "Could not unmarshal content")
}
_, err = http.Post(chat.URL.String(), "application/json; charset=UTF-8", bytes.NewReader(req))
if err != nil {
return caos_errs.ThrowInternal(err, "PROVI-si93s", "unable to send message")
}
return nil
}
func splitMessage(message string, count int) []string {
if count == 0 {
return []string{message}
}
var splits []string
var l, r int
for l, r = 0, count; r < len(message); l, r = r, r+count {
for !utf8.RuneStart(message[r]) {
r--
}
splits = append(splits, message[l:r])
}
splits = append(splits, message[l:])
return splits
}

View File

@@ -0,0 +1,18 @@
package email
type EmailConfig struct {
SMTP SMTP
Tls bool
From string
FromName string
}
type SMTP struct {
Host string
User string
Password string
}
func (smtp *SMTP) HasAuth() bool {
return smtp.User != "" && smtp.Password != ""
}

View File

@@ -0,0 +1,47 @@
package email
import (
"fmt"
"regexp"
"strings"
)
var (
isHTMLRgx = regexp.MustCompile(`.*<html.*>.*`)
lineBreak = "\r\n"
)
type EmailMessage struct {
Recipients []string
BCC []string
CC []string
SenderEmail string
Subject string
Content string
}
func (msg *EmailMessage) GetContent() string {
headers := make(map[string]string)
headers["From"] = msg.SenderEmail
headers["To"] = strings.Join(msg.Recipients, ", ")
headers["Cc"] = strings.Join(msg.CC, ", ")
message := ""
for k, v := range headers {
message += fmt.Sprintf("%s: %s"+lineBreak, k, v)
}
//default mime-type is html
mime := "MIME-version: 1.0;" + lineBreak + "Content-Type: text/html; charset=\"UTF-8\";" + lineBreak + lineBreak
if !isHTML(msg.Content) {
mime = "MIME-version: 1.0;" + lineBreak + "Content-Type: text/plain; charset=\"UTF-8\";" + lineBreak + lineBreak
}
subject := "Subject: " + msg.Subject + lineBreak
message += subject + mime + lineBreak + msg.Content
return message
}
func isHTML(input string) bool {
return isHTMLRgx.MatchString(input)
}

View File

@@ -0,0 +1,123 @@
package email
import (
"crypto/tls"
"github.com/caos/logging"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/notification/providers"
"net"
"net/smtp"
)
type Email struct {
smtpClient *smtp.Client
}
func InitEmailProvider(config EmailConfig) (*Email, error) {
client, err := config.SMTP.connectToSMTP(config.Tls)
if err != nil {
return nil, err
}
return &Email{
smtpClient: client,
}, nil
}
func (email *Email) CanHandleMessage(message providers.Message) bool {
msg, ok := message.(*EmailMessage)
if !ok {
return false
}
return msg.Content != "" && msg.Subject != "" && len(msg.Recipients) > 0
}
func (email *Email) HandleMessage(message providers.Message) error {
defer email.smtpClient.Close()
emailMsg, ok := message.(*EmailMessage)
if !ok {
return caos_errs.ThrowInternal(nil, "EMAIL-s8JLs", "message is not EmailMessage")
}
// To && From
if err := email.smtpClient.Mail(emailMsg.SenderEmail); err != nil {
return caos_errs.ThrowInternalf(err, "EMAIL-s3is3", "could not set sender: %v", emailMsg.SenderEmail)
}
for _, recp := range append(append(emailMsg.Recipients, emailMsg.CC...), emailMsg.BCC...) {
if err := email.smtpClient.Rcpt(recp); err != nil {
return caos_errs.ThrowInternalf(err, "EMAIL-s4is4", "could not set recipient: %v", recp)
}
}
// Data
w, err := email.smtpClient.Data()
if err != nil {
return err
}
_, err = w.Write([]byte(emailMsg.GetContent()))
if err != nil {
return err
}
err = w.Close()
if err != nil {
return err
}
defer logging.LogWithFields("EMAI-a1c87ec8").Debug("email sent")
return email.smtpClient.Quit()
}
func (smtpConfig SMTP) connectToSMTP(tlsRequired bool) (client *smtp.Client, err error) {
host, _, err := net.SplitHostPort(smtpConfig.Host)
if err != nil {
return nil, caos_errs.ThrowInternal(err, "EMAIL-spR56", "could not split host and port for connect to smtp")
}
if !tlsRequired {
client, err = smtpConfig.getSMPTClient()
} else {
client, err = smtpConfig.getSMPTClientWithTls(host)
}
if err != nil {
return nil, err
}
err = smtpConfig.smtpAuth(client, host)
if err != nil {
return nil, err
}
return client, nil
}
func (smtpConfig SMTP) getSMPTClient() (*smtp.Client, error) {
client, err := smtp.Dial(smtpConfig.Host)
if err != nil {
return nil, caos_errs.ThrowInternal(err, "EMAIL-skwos", "Could not make smtp dial")
}
return client, nil
}
func (smtpConfig SMTP) getSMPTClientWithTls(host string) (*smtp.Client, error) {
conn, err := tls.Dial("tcp", smtpConfig.Host, &tls.Config{})
if err != nil {
return nil, caos_errs.ThrowInternal(err, "EMAIL-sl39s", "Could not make tls dial")
}
client, err := smtp.NewClient(conn, host)
if err != nil {
return nil, caos_errs.ThrowInternal(err, "EMAIL-skwi4", "Could not create smtp client")
}
return client, err
}
func (smtpConfig SMTP) smtpAuth(client *smtp.Client, host string) error {
if !smtpConfig.HasAuth() {
return nil
}
// Auth
auth := smtp.PlainAuth("", smtpConfig.User, smtpConfig.Password, host)
err := client.Auth(auth)
logging.Log("EMAIL-s9kfs").WithField("smtp user", smtpConfig.User).OnError(err).Debug("Could not add smtp auth")
return err
}

View File

@@ -0,0 +1,4 @@
package providers
//go:generate mockgen -package mock -destination ./mock/provider.mock.go github.com/caos/zitadel/internal/notification/providers NotificationProvider
//go:generate mockgen -package mock -destination ./mock/message.mock.go github.com/caos/zitadel/internal/notification/providers Message

View File

@@ -0,0 +1,5 @@
package providers
type Message interface {
GetContent() string
}

View File

@@ -0,0 +1,47 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/caos/zitadel/internal/notification/providers (interfaces: Message)
// Package mock is a generated GoMock package.
package mock
import (
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockMessage is a mock of Message interface
type MockMessage struct {
ctrl *gomock.Controller
recorder *MockMessageMockRecorder
}
// MockMessageMockRecorder is the mock recorder for MockMessage
type MockMessageMockRecorder struct {
mock *MockMessage
}
// NewMockMessage creates a new mock instance
func NewMockMessage(ctrl *gomock.Controller) *MockMessage {
mock := &MockMessage{ctrl: ctrl}
mock.recorder = &MockMessageMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockMessage) EXPECT() *MockMessageMockRecorder {
return m.recorder
}
// GetContent mocks base method
func (m *MockMessage) GetContent() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetContent")
ret0, _ := ret[0].(string)
return ret0
}
// GetContent indicates an expected call of GetContent
func (mr *MockMessageMockRecorder) GetContent() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContent", reflect.TypeOf((*MockMessage)(nil).GetContent))
}

View File

@@ -0,0 +1,61 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/caos/zitadel/internal/notification/providers (interfaces: NotificationProvider)
// Package mock is a generated GoMock package.
package mock
import (
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockNotificationProvider is a mock of NotificationProvider interface
type MockNotificationProvider struct {
ctrl *gomock.Controller
recorder *MockNotificationProviderMockRecorder
}
// MockNotificationProviderMockRecorder is the mock recorder for MockNotificationProvider
type MockNotificationProviderMockRecorder struct {
mock *MockNotificationProvider
}
// NewMockNotificationProvider creates a new mock instance
func NewMockNotificationProvider(ctrl *gomock.Controller) *MockNotificationProvider {
mock := &MockNotificationProvider{ctrl: ctrl}
mock.recorder = &MockNotificationProviderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockNotificationProvider) EXPECT() *MockNotificationProviderMockRecorder {
return m.recorder
}
// CanHandleMessage mocks base method
func (m *MockNotificationProvider) CanHandleMessage() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CanHandleMessage")
ret0, _ := ret[0].(bool)
return ret0
}
// CanHandleMessage indicates an expected call of CanHandleMessage
func (mr *MockNotificationProviderMockRecorder) CanHandleMessage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanHandleMessage", reflect.TypeOf((*MockNotificationProvider)(nil).CanHandleMessage))
}
// HandleMessage mocks base method
func (m *MockNotificationProvider) HandleMessage() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HandleMessage")
ret0, _ := ret[0].(error)
return ret0
}
// HandleMessage indicates an expected call of HandleMessage
func (mr *MockNotificationProviderMockRecorder) HandleMessage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockNotificationProvider)(nil).HandleMessage))
}

View File

@@ -0,0 +1,6 @@
package providers
type NotificationProvider interface {
CanHandleMessage() bool
HandleMessage() error
}

View File

@@ -0,0 +1,7 @@
package twilio
type TwilioConfig struct {
SID string
Token string
From string
}

View File

@@ -0,0 +1,11 @@
package twilio
type TwilioMessage struct {
SenderPhoneNumber string
RecipientPhoneNumber string
Content string
}
func (msg *TwilioMessage) GetContent() string {
return msg.Content
}

View File

@@ -0,0 +1,39 @@
package twilio
import (
"github.com/caos/logging"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/notification/providers"
twilio "github.com/kevinburke/twilio-go"
)
type Twilio struct {
client *twilio.Client
}
func InitTwilioProvider(config TwilioConfig) *Twilio {
return &Twilio{
client: twilio.NewClient(config.SID, config.Token, nil),
}
}
func (t *Twilio) CanHandleMessage(message providers.Message) bool {
twilioMsg, ok := message.(*TwilioMessage)
if !ok {
return false
}
return twilioMsg.Content != "" && twilioMsg.RecipientPhoneNumber != "" && twilioMsg.SenderPhoneNumber != ""
}
func (t *Twilio) HandleMessage(message providers.Message) error {
twilioMsg, ok := message.(*TwilioMessage)
if !ok {
return caos_errs.ThrowInternal(nil, "TWILI-s0pLc", "message is not TwilioMessage")
}
m, err := t.client.Messages.SendMessage(twilioMsg.SenderPhoneNumber, twilioMsg.RecipientPhoneNumber, twilioMsg.GetContent(), nil)
if err != nil {
return caos_errs.ThrowInternal(err, "TWILI-osk3S", "could not send message")
}
logging.LogWithFields("SMS_-f335c523", "message_sid", m.Sid, "status", m.Status).Debug("sms sent")
return nil
}

View File

@@ -0,0 +1,49 @@
package handler
import (
"github.com/caos/logging"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/config/types"
"github.com/caos/zitadel/internal/crypto"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/notification/repository/eventsourcing/view"
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
"time"
)
type Configs map[string]*Config
type Config struct {
MinimumCycleDuration types.Duration
}
type handler struct {
view *view.View
bulkLimit uint64
cycleDuration time.Duration
errorCountUntilSkip uint64
}
type EventstoreRepos struct {
UserEvents *usr_event.UserEventstore
}
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, eventstore eventstore.Eventstore, repos EventstoreRepos, systemDefaults sd.SystemDefaults) []spooler.Handler {
aesCrypto, err := crypto.NewAESCrypto(systemDefaults.UserVerificationKey)
if err != nil {
logging.Log("HANDL-s90ew").WithError(err).Debug("error create new aes crypto")
}
return []spooler.Handler{
&NotifyUser{handler: handler{view, bulkLimit, configs.cycleDuration("User"), errorCount}},
&Notification{handler: handler{view, bulkLimit, configs.cycleDuration("Notification"), errorCount}, eventstore: eventstore, userEvents: repos.UserEvents, systemDefaults: systemDefaults, AesCrypto: aesCrypto},
}
}
func (configs Configs) cycleDuration(viewModel string) time.Duration {
c, ok := configs[viewModel]
if !ok {
return 1 * time.Second
}
return c.MinimumCycleDuration.Duration
}

View File

@@ -0,0 +1,168 @@
package handler
import (
"context"
"github.com/caos/zitadel/internal/api/auth"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/crypto"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/notification/types"
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
"time"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/user/repository/eventsourcing"
)
type Notification struct {
handler
eventstore eventstore.Eventstore
userEvents *usr_event.UserEventstore
systemDefaults sd.SystemDefaults
AesCrypto crypto.EncryptionAlgorithm
}
const (
notificationTable = "notification.notifications"
NOTIFY_USER = "NOTIFICATION"
)
func (n *Notification) MinimumCycleDuration() time.Duration { return n.cycleDuration }
func (n *Notification) ViewModel() string {
return notificationTable
}
func (n *Notification) EventQuery() (*models.SearchQuery, error) {
sequence, err := n.view.GetLatestNotificationSequence()
if err != nil {
return nil, err
}
return eventsourcing.UserQuery(sequence), nil
}
func (n *Notification) Process(event *models.Event) (err error) {
switch event.Type {
case es_model.InitializedUserCodeAdded:
err = n.handleInitUserCode(event)
case es_model.UserEmailCodeAdded:
err = n.handleEmailVerificationCode(event)
case es_model.UserPhoneCodeAdded:
err = n.handlePhoneVerificationCode(event)
case es_model.UserPasswordCodeAdded:
err = n.handlePasswordCode(event)
default:
return n.view.ProcessedNotificationSequence(event.Sequence)
}
if err != nil {
return err
}
return n.view.ProcessedNotificationSequence(event.Sequence)
}
func (n *Notification) handleInitUserCode(event *models.Event) (err error) {
alreadyHandled, err := n.checkIfCodeAlreadyHandled(event.AggregateID, event.Sequence, es_model.InitializedUserCodeAdded, es_model.InitializedUserCodeSent)
if err != nil || alreadyHandled {
return err
}
initCode := new(es_model.InitUserCode)
initCode.SetData(event)
user, err := n.view.NotifyUserByID(event.AggregateID)
if err != nil {
return err
}
err = types.SendUserInitCode(user, initCode, n.systemDefaults, n.AesCrypto)
if err != nil {
return err
}
return n.userEvents.InitCodeSent(getSetNotifyContextData(event.ResourceOwner), event.AggregateID)
}
func (n *Notification) handlePasswordCode(event *models.Event) (err error) {
alreadyHandled, err := n.checkIfCodeAlreadyHandled(event.AggregateID, event.Sequence, es_model.UserPasswordCodeAdded, es_model.UserPasswordCodeSent)
if err != nil || alreadyHandled {
return err
}
pwCode := new(es_model.PasswordCode)
pwCode.SetData(event)
user, err := n.view.NotifyUserByID(event.AggregateID)
if err != nil {
return err
}
err = types.SendPasswordCode(user, pwCode, n.systemDefaults, n.AesCrypto)
if err != nil {
return err
}
return n.userEvents.PasswordCodeSent(getSetNotifyContextData(event.ResourceOwner), event.AggregateID)
}
func (n *Notification) handleEmailVerificationCode(event *models.Event) (err error) {
alreadyHandled, err := n.checkIfCodeAlreadyHandled(event.AggregateID, event.Sequence, es_model.UserEmailCodeAdded, es_model.UserEmailCodeSent)
if err != nil || alreadyHandled {
return nil
}
emailCode := new(es_model.EmailCode)
emailCode.SetData(event)
user, err := n.view.NotifyUserByID(event.AggregateID)
if err != nil {
return err
}
err = types.SendEmailVerificationCode(user, emailCode, n.systemDefaults, n.AesCrypto)
if err != nil {
return err
}
return n.userEvents.EmailVerificationCodeSent(getSetNotifyContextData(event.ResourceOwner), event.AggregateID)
}
func (n *Notification) handlePhoneVerificationCode(event *models.Event) (err error) {
alreadyHandled, err := n.checkIfCodeAlreadyHandled(event.AggregateID, event.Sequence, es_model.UserPhoneCodeAdded, es_model.UserPhoneCodeSent)
if err != nil || alreadyHandled {
return nil
}
phoneCode := new(es_model.PhoneCode)
phoneCode.SetData(event)
user, err := n.view.NotifyUserByID(event.AggregateID)
if err != nil {
return err
}
err = types.SendPhoneVerificationCode(user, phoneCode, n.systemDefaults, n.AesCrypto)
if err != nil {
return err
}
return n.userEvents.PhoneVerificationCodeSent(getSetNotifyContextData(event.ResourceOwner), event.AggregateID)
}
func (n *Notification) checkIfCodeAlreadyHandled(userID string, sequence uint64, addedType, sentType models.EventType) (bool, error) {
events, err := n.getUserEvents(userID, sequence)
if err != nil {
return false, err
}
for _, event := range events {
if event.Type == addedType || event.Type == sentType {
return true, nil
}
}
return false, nil
}
func (n *Notification) getUserEvents(userID string, sequence uint64) ([]*models.Event, error) {
query, err := eventsourcing.UserByIDQuery(userID, sequence)
if err != nil {
return nil, err
}
return n.eventstore.FilterEvents(context.Background(), query)
}
func (n *Notification) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-s9opc", "id", event.AggregateID, "sequence", event.Sequence).WithError(err).Warn("something went wrong in notification handler")
return spooler.HandleError(event, err, n.view.GetLatestNotificationFailedEvent, n.view.ProcessedNotificationFailedEvent, n.view.ProcessedNotificationSequence, n.errorCountUntilSkip)
}
func getSetNotifyContextData(orgID string) context.Context {
return auth.SetCtxData(context.Background(), auth.CtxData{UserID: NOTIFY_USER, OrgID: orgID})
}

View File

@@ -0,0 +1,69 @@
package handler
import (
es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
"time"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/user/repository/eventsourcing"
view_model "github.com/caos/zitadel/internal/user/repository/view/model"
)
type NotifyUser struct {
handler
eventstore eventstore.Eventstore
}
const (
userTable = "notification.notify_users"
)
func (p *NotifyUser) MinimumCycleDuration() time.Duration { return p.cycleDuration }
func (p *NotifyUser) ViewModel() string {
return userTable
}
func (p *NotifyUser) EventQuery() (*models.SearchQuery, error) {
sequence, err := p.view.GetLatestNotifyUserSequence()
if err != nil {
return nil, err
}
return eventsourcing.UserQuery(sequence), nil
}
func (p *NotifyUser) Process(event *models.Event) (err error) {
user := new(view_model.NotifyUser)
switch event.Type {
case es_model.UserAdded,
es_model.UserRegistered:
user.AppendEvent(event)
case es_model.UserProfileChanged,
es_model.UserEmailChanged,
es_model.UserEmailVerified,
es_model.UserPhoneChanged,
es_model.UserPhoneVerified:
user, err = p.view.NotifyUserByID(event.AggregateID)
if err != nil {
return err
}
err = user.AppendEvent(event)
case es_model.UserDeleted:
err = p.view.DeleteNotifyUser(event.AggregateID, event.Sequence)
default:
return p.view.ProcessedNotifyUserSequence(event.Sequence)
}
if err != nil {
return err
}
return p.view.PutNotifyUser(user)
}
func (p *NotifyUser) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-9spwf", "id", event.AggregateID).WithError(err).Warn("something went wrong in notify user handler")
return spooler.HandleError(event, err, p.view.GetLatestNotifyUserFailedEvent, p.view.ProcessedNotifyUserFailedEvent, p.view.ProcessedNotifyUserSequence, p.errorCountUntilSkip)
}

View File

@@ -0,0 +1,56 @@
package eventsourcing
import (
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/config/types"
es_int "github.com/caos/zitadel/internal/eventstore"
es_spol "github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/notification/repository/eventsourcing/handler"
"github.com/caos/zitadel/internal/notification/repository/eventsourcing/spooler"
noti_view "github.com/caos/zitadel/internal/notification/repository/eventsourcing/view"
es_usr "github.com/caos/zitadel/internal/user/repository/eventsourcing"
)
type Config struct {
Eventstore es_int.Config
View types.SQL
Spooler spooler.SpoolerConfig
}
type EsRepository struct {
spooler *es_spol.Spooler
}
func Start(conf Config, systemDefaults sd.SystemDefaults) (*EsRepository, error) {
es, err := es_int.Start(conf.Eventstore)
if err != nil {
return nil, err
}
sqlClient, err := conf.View.Start()
if err != nil {
return nil, err
}
view, err := noti_view.StartView(sqlClient)
if err != nil {
return nil, err
}
user, err := es_usr.StartUser(es_usr.UserConfig{
Eventstore: es,
Cache: conf.Eventstore.Cache,
}, systemDefaults)
if err != nil {
return nil, err
}
eventstoreRepos := handler.EventstoreRepos{UserEvents: user}
spool := spooler.StartSpooler(conf.Spooler, es, view, sqlClient, eventstoreRepos, systemDefaults)
return &EsRepository{
spool,
}, nil
}
func (repo *EsRepository) Health() error {
return nil
}

View File

@@ -0,0 +1,19 @@
package spooler
import (
"database/sql"
es_locker "github.com/caos/zitadel/internal/eventstore/locker"
"time"
)
const (
lockTable = "notification.locks"
)
type locker struct {
dbClient *sql.DB
}
func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, waitTime)
}

View File

@@ -0,0 +1,127 @@
package spooler
import (
"database/sql"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
type dbMock struct {
db *sql.DB
mock sqlmock.Sqlmock
}
func mockDB(t *testing.T) *dbMock {
mockDB := dbMock{}
var err error
mockDB.db, mockDB.mock, err = sqlmock.New()
if err != nil {
t.Fatalf("error occured while creating stub db %v", err)
}
mockDB.mock.MatchExpectationsInOrder(true)
return &mockDB
}
func (db *dbMock) expectCommit() *dbMock {
db.mock.ExpectCommit()
return db
}
func (db *dbMock) expectRollback() *dbMock {
db.mock.ExpectRollback()
return db
}
func (db *dbMock) expectBegin() *dbMock {
db.mock.ExpectBegin()
return db
}
func (db *dbMock) expectSavepoint() *dbMock {
db.mock.ExpectExec("SAVEPOINT").WillReturnResult(sqlmock.NewResult(1, 1))
return db
}
func (db *dbMock) expectReleaseSavepoint() *dbMock {
db.mock.ExpectExec("RELEASE SAVEPOINT").WillReturnResult(sqlmock.NewResult(1, 1))
return db
}
func (db *dbMock) expectRenew(lockerID, view string, affectedRows int64) *dbMock {
query := db.mock.
ExpectExec(`INSERT INTO notification\.locks \(object_type, locker_id, locked_until\) VALUES \(\$1, \$2, now\(\)\+\$3\) ON CONFLICT \(object_type\) DO UPDATE SET locked_until = now\(\)\+\$4, locker_id = \$5 WHERE \(locks\.locked_until < now\(\) OR locks\.locker_id = \$6\) AND locks\.object_type = \$7`).
WithArgs(view, lockerID, sqlmock.AnyArg(), sqlmock.AnyArg(), lockerID, lockerID, view).
WillReturnResult(sqlmock.NewResult(1, 1))
if affectedRows == 0 {
query.WillReturnResult(sqlmock.NewResult(0, 0))
} else {
query.WillReturnResult(sqlmock.NewResult(1, affectedRows))
}
return db
}
func Test_locker_Renew(t *testing.T) {
type fields struct {
db *dbMock
}
type args struct {
lockerID string
viewModel string
waitTime time.Duration
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "renew succeeded",
fields: fields{
db: mockDB(t).
expectBegin().
expectSavepoint().
expectRenew("locker", "view", 1).
expectReleaseSavepoint().
expectCommit(),
},
args: args{lockerID: "locker", viewModel: "view", waitTime: 1 * time.Second},
wantErr: false,
},
{
name: "renew now rows updated",
fields: fields{
db: mockDB(t).
expectBegin().
expectSavepoint().
expectRenew("locker", "view", 0).
expectRollback(),
},
args: args{lockerID: "locker", viewModel: "view", waitTime: 1 * time.Second},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &locker{
dbClient: tt.fields.db.db,
}
if err := l.Renew(tt.args.lockerID, tt.args.viewModel, tt.args.waitTime); (err != nil) != tt.wantErr {
t.Errorf("locker.Renew() error = %v, wantErr %v", err, tt.wantErr)
}
if err := tt.fields.db.mock.ExpectationsWereMet(); err != nil {
t.Errorf("not all database expectations met: %v", err)
}
})
}
}

View File

@@ -0,0 +1,34 @@
package spooler
import (
"database/sql"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/notification/repository/eventsourcing/handler"
"github.com/caos/zitadel/internal/notification/repository/eventsourcing/view"
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
)
type SpoolerConfig struct {
BulkLimit uint64
FailureCountUntilSkip uint64
ConcurrentTasks int
Handlers handler.Configs
}
type EventstoreRepos struct {
UserEvents *usr_event.UserEventstore
}
func StartSpooler(c SpoolerConfig, es eventstore.Eventstore, view *view.View, sql *sql.DB, eventstoreRepos handler.EventstoreRepos, systemDefaults sd.SystemDefaults) *spooler.Spooler {
spoolerConfig := spooler.Config{
Eventstore: es,
Locker: &locker{dbClient: sql},
ConcurrentTasks: c.ConcurrentTasks,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, eventstoreRepos, systemDefaults),
}
spool := spoolerConfig.New()
spool.Start()
return spool
}

View File

@@ -0,0 +1,17 @@
package view
import (
"github.com/caos/zitadel/internal/view"
)
const (
errTable = "notification.failed_event"
)
func (v *View) saveFailedEvent(failedEvent *view.FailedEvent) error {
return view.SaveFailedEvent(v.Db, errTable, failedEvent)
}
func (v *View) latestFailedEvent(viewName string, sequence uint64) (*view.FailedEvent, error) {
return view.LatestFailedEvent(v.Db, errTable, viewName, sequence)
}

View File

@@ -0,0 +1,25 @@
package view
import (
global_view "github.com/caos/zitadel/internal/view"
)
const (
notificationTable = "notification.notifications"
)
func (v *View) GetLatestNotificationSequence() (uint64, error) {
return v.latestSequence(notificationTable)
}
func (v *View) ProcessedNotificationSequence(eventSequence uint64) error {
return v.saveCurrentSequence(notificationTable, eventSequence)
}
func (v *View) GetLatestNotificationFailedEvent(sequence uint64) (*global_view.FailedEvent, error) {
return v.latestFailedEvent(notificationTable, sequence)
}
func (v *View) ProcessedNotificationFailedEvent(failedEvent *global_view.FailedEvent) error {
return v.saveFailedEvent(failedEvent)
}

View File

@@ -0,0 +1,47 @@
package view
import (
"github.com/caos/zitadel/internal/user/repository/view"
"github.com/caos/zitadel/internal/user/repository/view/model"
global_view "github.com/caos/zitadel/internal/view"
)
const (
notifyUserTable = "notification.notify_users"
)
func (v *View) NotifyUserByID(userID string) (*model.NotifyUser, error) {
return view.NotifyUserByID(v.Db, notifyUserTable, userID)
}
func (v *View) PutNotifyUser(user *model.NotifyUser) error {
err := view.PutNotifyUser(v.Db, notifyUserTable, user)
if err != nil {
return err
}
return v.ProcessedNotifyUserSequence(user.Sequence)
}
func (v *View) DeleteNotifyUser(userID string, eventSequence uint64) error {
err := view.DeleteNotifyUser(v.Db, notifyUserTable, userID)
if err != nil {
return nil
}
return v.ProcessedNotifyUserSequence(eventSequence)
}
func (v *View) GetLatestNotifyUserSequence() (uint64, error) {
return v.latestSequence(notifyUserTable)
}
func (v *View) ProcessedNotifyUserSequence(eventSequence uint64) error {
return v.saveCurrentSequence(notifyUserTable, eventSequence)
}
func (v *View) GetLatestNotifyUserFailedEvent(sequence uint64) (*global_view.FailedEvent, error) {
return v.latestFailedEvent(notifyUserTable, sequence)
}
func (v *View) ProcessedNotifyUserFailedEvent(failedEvent *global_view.FailedEvent) error {
return v.saveFailedEvent(failedEvent)
}

View File

@@ -0,0 +1,17 @@
package view
import (
"github.com/caos/zitadel/internal/view"
)
const (
sequencesTable = "notification.current_sequences"
)
func (v *View) saveCurrentSequence(viewName string, sequence uint64) error {
return view.SaveCurrentSequence(v.Db, sequencesTable, viewName, sequence)
}
func (v *View) latestSequence(viewName string) (uint64, error) {
return view.LatestSequence(v.Db, sequencesTable, viewName)
}

View File

@@ -0,0 +1,25 @@
package view
import (
"database/sql"
"github.com/jinzhu/gorm"
)
type View struct {
Db *gorm.DB
}
func StartView(sqlClient *sql.DB) (*View, error) {
gorm, err := gorm.Open("postgres", sqlClient)
if err != nil {
return nil, err
}
return &View{
Db: gorm,
}, nil
}
func (v *View) Health() (err error) {
return v.Db.DB().Ping()
}

View File

@@ -0,0 +1,5 @@
package repository
type Repository interface {
Health() error
}

View File

@@ -0,0 +1,45 @@
package templates
import (
"bytes"
"html/template"
)
const (
templateFileName = "template.html"
)
func GetParsedTemplate(contentData interface{}) (string, error) {
template, err := ParseTemplateFile("", contentData)
if err != nil {
return "", err
}
return ParseTemplateText(template, contentData)
}
func ParseTemplateFile(fileName string, data interface{}) (string, error) {
if fileName == "" {
fileName = templateFileName
}
template, err := template.ParseFiles(fileName)
if err != nil {
return "", err
}
return parseTemplate(template, data)
}
func ParseTemplateText(text string, data interface{}) (string, error) {
template, err := template.New("template").Parse(text)
if err != nil {
return "", err
}
return parseTemplate(template, data)
}
func parseTemplate(template *template.Template, data interface{}) (string, error) {
buf := new(bytes.Buffer)
if err := template.Execute(buf, data); err != nil {
return "", err
}
return buf.String(), nil
}

View File

@@ -0,0 +1,11 @@
package templates
type TemplateData struct {
Title string
PreHeader string
Subject string
Greeting string
Text string
Href string
ButtonText string
}

View File

@@ -0,0 +1,34 @@
package types
import (
"github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/crypto"
"github.com/caos/zitadel/internal/notification/templates"
es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
view_model "github.com/caos/zitadel/internal/user/repository/view/model"
)
type EmailVerificationCodeData struct {
templates.TemplateData
FirstName string
LastName string
URL string
}
func SendEmailVerificationCode(user *view_model.NotifyUser, code *es_model.EmailCode, systemDefaults systemdefaults.SystemDefaults, alg crypto.EncryptionAlgorithm) error {
codeString, err := crypto.DecryptString(code.Code, alg)
if err != nil {
return err
}
url, err := templates.ParseTemplateText(systemDefaults.Notifications.Endpoints.VerifyEmail, &UrlData{UserID: user.ID, Code: codeString})
if err != nil {
return err
}
emailCodeData := &EmailVerificationCodeData{TemplateData: systemDefaults.Notifications.TemplateData.VerifyEmail, FirstName: user.FirstName, LastName: user.LastName, URL: url}
template, err := templates.GetParsedTemplate(emailCodeData)
if err != nil {
return err
}
return generateEmail(user, template, systemDefaults.Notifications, true)
}

View File

@@ -0,0 +1,39 @@
package types
import (
"github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/crypto"
"github.com/caos/zitadel/internal/notification/templates"
es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
view_model "github.com/caos/zitadel/internal/user/repository/view/model"
)
type InitCodeEmailData struct {
templates.TemplateData
FirstName string
LastName string
URL string
}
type UrlData struct {
UserID string
Code string
}
func SendUserInitCode(user *view_model.NotifyUser, code *es_model.InitUserCode, systemDefaults systemdefaults.SystemDefaults, alg crypto.EncryptionAlgorithm) error {
codeString, err := crypto.DecryptString(code.Code, alg)
if err != nil {
return err
}
url, err := templates.ParseTemplateText(systemDefaults.Notifications.Endpoints.InitCode, &UrlData{UserID: user.ID, Code: codeString})
if err != nil {
return err
}
initCodeData := &InitCodeEmailData{TemplateData: systemDefaults.Notifications.TemplateData.InitCode, FirstName: user.FirstName, LastName: user.LastName, URL: url}
template, err := templates.GetParsedTemplate(initCodeData)
if err != nil {
return err
}
return generateEmail(user, template, systemDefaults.Notifications, true)
}

View File

@@ -0,0 +1,34 @@
package types
import (
"github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/crypto"
"github.com/caos/zitadel/internal/notification/templates"
es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
view_model "github.com/caos/zitadel/internal/user/repository/view/model"
)
type PasswordCodeData struct {
templates.TemplateData
FirstName string
LastName string
URL string
}
func SendPasswordCode(user *view_model.NotifyUser, code *es_model.PasswordCode, systemDefaults systemdefaults.SystemDefaults, alg crypto.EncryptionAlgorithm) error {
codeString, err := crypto.DecryptString(code.Code, alg)
if err != nil {
return err
}
url, err := templates.ParseTemplateText(systemDefaults.Notifications.Endpoints.PasswordReset, &UrlData{UserID: user.ID, Code: codeString})
if err != nil {
return err
}
passwordCodeData := &PasswordCodeData{TemplateData: systemDefaults.Notifications.TemplateData.PasswordReset, FirstName: user.FirstName, LastName: user.LastName, URL: url}
template, err := templates.GetParsedTemplate(passwordCodeData)
if err != nil {
return err
}
return generateEmail(user, template, systemDefaults.Notifications, false)
}

View File

@@ -0,0 +1,29 @@
package types
import (
"github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/crypto"
"github.com/caos/zitadel/internal/notification/templates"
es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
view_model "github.com/caos/zitadel/internal/user/repository/view/model"
)
type PhoneVerificationCodeData struct {
FirstName string
LastName string
Code string
UserID string
}
func SendPhoneVerificationCode(user *view_model.NotifyUser, code *es_model.PhoneCode, systemDefaults systemdefaults.SystemDefaults, alg crypto.EncryptionAlgorithm) error {
codeString, err := crypto.DecryptString(code.Code, alg)
if err != nil {
return err
}
codeData := &PhoneVerificationCodeData{FirstName: user.FirstName, LastName: user.LastName, UserID: user.ID, Code: codeString}
template, err := templates.ParseTemplateText(systemDefaults.Notifications.TemplateData.VerifyPhone.Text, codeData)
if err != nil {
return err
}
return generateSms(user, template, systemDefaults.Notifications, true)
}

View File

@@ -0,0 +1,41 @@
package types
import (
"github.com/caos/zitadel/internal/config/systemdefaults"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/notification/providers"
"github.com/caos/zitadel/internal/notification/providers/chat"
"github.com/caos/zitadel/internal/notification/providers/email"
view_model "github.com/caos/zitadel/internal/user/repository/view/model"
)
func generateEmail(user *view_model.NotifyUser, content string, config systemdefaults.Notifications, lastEmail bool) error {
provider, err := email.InitEmailProvider(config.Providers.Email)
if err != nil {
return err
}
message := &email.EmailMessage{
SenderEmail: config.Providers.Email.From,
Recipients: []string{user.VerifiedEmail},
Subject: config.TemplateData.InitCode.Subject,
Content: content,
}
if lastEmail {
message.Recipients = []string{user.LastEmail}
}
if provider.CanHandleMessage(message) {
if config.DebugMode {
return sendDebugEmail(message, config)
}
return provider.HandleMessage(message)
}
return caos_errs.ThrowInternalf(nil, "NOTIF-s8ipw", "Could not send init message: userid: %v", user.ID)
}
func sendDebugEmail(message providers.Message, config systemdefaults.Notifications) error {
provider, err := chat.InitChatProvider(config.Providers.Chat)
if err != nil {
return err
}
return provider.HandleMessage(message)
}

View File

@@ -0,0 +1,37 @@
package types
import (
"github.com/caos/zitadel/internal/config/systemdefaults"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/notification/providers"
"github.com/caos/zitadel/internal/notification/providers/chat"
"github.com/caos/zitadel/internal/notification/providers/twilio"
view_model "github.com/caos/zitadel/internal/user/repository/view/model"
)
func generateSms(user *view_model.NotifyUser, content string, config systemdefaults.Notifications, lastPhone bool) error {
provider := twilio.InitTwilioProvider(config.Providers.Twilio)
message := &twilio.TwilioMessage{
SenderPhoneNumber: config.Providers.Twilio.From,
RecipientPhoneNumber: user.VerifiedPhone,
Content: content,
}
if lastPhone {
message.RecipientPhoneNumber = user.LastPhone
}
if provider.CanHandleMessage(message) {
if config.DebugMode {
return sendDebugPhone(message, config)
}
return provider.HandleMessage(message)
}
return caos_errs.ThrowInternalf(nil, "NOTIF-s8ipw", "Could not send init message: userid: %v", user.ID)
}
func sendDebugPhone(message providers.Message, config systemdefaults.Notifications) error {
provider, err := chat.InitChatProvider(config.Providers.Chat)
if err != nil {
return err
}
return provider.HandleMessage(message)
}