fix set notifications

This commit is contained in:
Elio Bischof
2023-09-21 03:26:45 +02:00
parent 79ee4b93b5
commit 89a564dc29
5 changed files with 84 additions and 121 deletions

View File

@@ -1,11 +1,12 @@
package command
import (
"errors"
"fmt"
"slices"
"strings"
"time"
"github.com/zitadel/zitadel/internal/errors"
zitadel_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/id"
@@ -70,7 +71,7 @@ func (wm *quotaWriteModel) Reduce() error {
wm.resetInterval = *e.ResetInterval
}
if e.Notifications != nil {
wm.notifications = e.Notifications
wm.notifications = *e.Notifications
}
case *quota.RemovedEvent:
wm.active = false
@@ -81,7 +82,7 @@ func (wm *quotaWriteModel) Reduce() error {
}
// NewChanges returns all changes that need to be applied to the aggregate.
// If wm is nil, all properties are set.
// If wm is nil or inactive, all quota properties are set.
func (wm *quotaWriteModel) NewChanges(
idGenerator id.Generator,
amount uint64,
@@ -94,6 +95,11 @@ func (wm *quotaWriteModel) NewChanges(
if err != nil {
return nil, err
}
// we sort the input notifications already, so we can return early if they have duplicates
err = sortSetEventNotifications(setEventNotifications)
if err != nil {
return nil, err
}
if wm == nil || wm.active == false {
return []quota.QuotaChange{
quota.ChangeAmount(amount),
@@ -116,23 +122,17 @@ func (wm *quotaWriteModel) NewChanges(
if wm.limit != limit {
changes = append(changes, quota.ChangeLimit(limit))
}
// If the number of notifications differs, we renew the notifications and we can return early
if len(setEventNotifications) != len(wm.notifications) {
changes = append(changes, quota.ChangeNotifications(setEventNotifications))
return changes, nil
}
replaceIDs(wm.notifications, setEventNotifications)
// All IDs are passed and the number of notifications didn't change.
// Now we check if the properties changed.
err = sortSetEventNotifications(setEventNotifications)
if err != nil {
return nil, err
}
// Now we sort the existing notifications too, so comparing the input properties with the existing ones is easier.
// We ignore the sorting error for the existing notifications, because this is system state, not user input.
// If the sorting fails, the notifications will be cleaned up and triggered again.
// If sorting fails this time, the notifications are listed in the event payload and the projection cleans them up anyway.
_ = sortSetEventNotifications(wm.notifications)
for i, notification := range setEventNotifications {
if notification.ID != wm.notifications[i].ID ||
notification.CallURL != wm.notifications[i].CallURL ||
if notification.CallURL != wm.notifications[i].CallURL ||
notification.Percent != wm.notifications[i].Percent ||
notification.Repeat != wm.notifications[i].Repeat {
changes = append(changes, quota.ChangeNotifications(setEventNotifications))
@@ -162,27 +162,24 @@ func (q *QuotaNotifications) newSetEventNotifications(idGenerator id.Generator)
return notifications, nil
}
func replaceIDs(srcs, dsts []*quota.SetEventNotification) {
for _, dst := range dsts {
for _, src := range srcs {
if dst.CallURL == src.CallURL &&
dst.Percent == src.Percent &&
dst.Repeat == src.Repeat {
dst.ID = src.ID
break
}
}
}
}
// sortSetEventNotifications reports an error if there are duplicate notifications or if a pointer is nil
func sortSetEventNotifications(notifications []*quota.SetEventNotification) (err error) {
slices.SortFunc(notifications, func(i, j *quota.SetEventNotification) int {
comp := strings.Compare(i.ID, j.ID)
if comp == 0 {
// TODO: translate
err = errors.ThrowPreconditionFailed(nil, "EVENT-3M9fs", "Errors.Quota.Notifications.Duplicate")
if i == nil || j == nil {
err = zitadel_errors.ThrowInternal(errors.New("sorting slices of *quota.SetEventNotification with nil pointers is not supported"), "QUOTA-8YXPk", "Errors.Internal")
return 0
}
return comp
if i.Percent == j.Percent && i.CallURL == j.CallURL && i.Repeat == j.Repeat {
// TODO: translate
err = zitadel_errors.ThrowInternal(fmt.Errorf("%+v", i), "QUOTA-Pty2n", "Errors.Quota.Notifications.Duplicate")
return 0
}
if i.Percent < j.Percent ||
i.Percent == j.Percent && i.CallURL < j.CallURL ||
i.Percent == j.Percent && i.CallURL == j.CallURL && i.Repeat == false && j.Repeat == true {
return -1
}
return +1
})
return err
}

View File

@@ -1,13 +1,15 @@
package crdb
import (
"database/sql"
"errors"
"strconv"
"strings"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
caos_errs "github.com/zitadel/zitadel/internal/errors"
zitadel_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
)
@@ -16,9 +18,9 @@ type execOption func(*execConfig)
type execConfig struct {
tableName string
args []interface{}
err error
ignoreExecErr func(error) bool
args []interface{}
err error
ignoreNotFound bool
}
func WithTableSuffix(name string) func(*execConfig) {
@@ -27,9 +29,9 @@ func WithTableSuffix(name string) func(*execConfig) {
}
}
func WithIgnoreExecErr(ignore func(error) bool) func(*execConfig) {
func WithIgnoreNotFound() func(*execConfig) {
return func(o *execConfig) {
o.ignoreExecErr = ignore
o.ignoreNotFound = true
}
}
@@ -289,47 +291,30 @@ func NewCopyCol(column, from string) handler.Column {
}
func NewLessThanCond(column string, value interface{}) handler.Condition {
return func() ([]interface{}, func(params []string) string) {
return []interface{}{value}, func(params []string) string {
return column + " < " + params[0]
}
return func(param string) (string, interface{}) {
return column + " < " + param, value
}
}
func NewIsNullCond(column string) handler.Condition {
return func() ([]interface{}, func(params []string) string) {
return nil, func([]string) string {
return column + " IS NULL"
}
return func(param string) (string, interface{}) {
return column + " IS NULL", nil
}
}
// NewTextArrayContainsCond returns a handler.Condition that checks if the column that stores an array of text contains the given value
func NewTextArrayContainsCond(column string, value string) handler.Condition {
return func() ([]interface{}, func(params []string) string) {
return []interface{}{database.StringArray{value}}, func(params []string) string {
return column + " @> " + params[0]
}
return func(param string) (string, interface{}) {
return column + " @> " + param, database.StringArray{value}
}
}
// NewInCond returns an IN condition that matches multiple values
func NewInCond(column string, values []interface{}) handler.Condition {
return func() ([]interface{}, func(params []string) string) {
return values, func(params []string) string {
return column + " IN ( " + strings.Join(params, ", ") + " )"
}
}
}
// Not negates a condition
// Not is a function and not a method, so that calling it is well readable
// For example conditions := []handler.Condition{ Not(NewTextArrayContainsCond())}
func Not(condition handler.Condition) handler.Condition {
return func() ([]interface{}, func(params []string) string) {
values, condFunc := condition()
return values, func(params []string) string {
return "NOT (" + condFunc(params) + ")"
}
return func(param string) (string, interface{}) {
cond, value := condition(param)
return "NOT (" + cond + ")", value
}
}
@@ -433,15 +418,13 @@ func columnsToQuery(cols []handler.Column) (names []string, parameters []string,
func conditionsToWhere(conditions []handler.Condition, paramOffset int) (wheres []string, values []interface{}) {
wheres = make([]string, len(conditions))
for i, condition := range conditions {
conditionValues, conditionFunc := condition()
values = append(values, conditionValues...)
params := make([]string, len(conditionValues))
for j := range conditionValues {
paramOffset++
params[j] = "$" + strconv.Itoa(paramOffset)
values = make([]interface{}, 0, len(conditions))
for i, conditionFunc := range conditions {
condition, value := conditionFunc("$" + strconv.Itoa(i+1+paramOffset))
wheres[i] = "(" + condition + ")"
if value != nil {
values = append(values, value)
}
wheres[i] = "(" + conditionFunc(params) + ")"
}
return wheres, values
}
@@ -464,12 +447,11 @@ func exec(config execConfig, q query, opts []execOption) Exec {
}
if _, err := ex.Exec(q(config), config.args...); err != nil {
execErr := caos_errs.ThrowInternal(err, "CRDB-pKtsr", "exec failed")
if config.ignoreExecErr != nil && config.ignoreExecErr(err) {
logging.Debugf("CRDB-4M9fs", "exec failed, but ignored: %v", err)
if config.ignoreNotFound && errors.Is(err, sql.ErrNoRows) {
logging.WithError(err).Debugf("CRDB-4M9fs", "ignored not found: %v", err)
return nil
}
return execErr
return zitadel_errors.ThrowInternal(err, "CRDB-pKtsr", "exec failed")
}
return nil

View File

@@ -4,7 +4,6 @@ import (
"database/sql"
"encoding/json"
"errors"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -62,15 +61,13 @@ func NewJSONCol(name string, value interface{}) Column {
return NewCol(name, marshalled)
}
type Condition func() ([]interface{}, func(params []string) string)
type Condition func(param string) (string, interface{})
type NamespacedCondition func(namespace string) Condition
func NewCond(name string, value interface{}) Condition {
return func() ([]interface{}, func(params []string) string) {
return []interface{}{value}, func(params []string) string {
return name + " = " + params[0]
}
return func(param string) (string, interface{}) {
return name + " = " + param, value
}
}

View File

@@ -2,8 +2,6 @@ package projection
import (
"context"
"database/sql"
"errors"
"time"
zitadel_errors "github.com/zitadel/zitadel/internal/errors"
@@ -68,10 +66,10 @@ func newQuotaProjection(ctx context.Context, config crdb.StatementHandlerConfig)
crdb.NewColumn(QuotaColumnID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnInstanceID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnUnit, crdb.ColumnTypeEnum),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64, crdb.Nullable()),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp, crdb.Nullable()),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval, crdb.Nullable()),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool, crdb.Nullable()),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool),
},
crdb.NewPrimaryKey(QuotaColumnInstanceID, QuotaColumnUnit),
),
@@ -153,8 +151,7 @@ func (q *quotaProjection) reduceQuotaSet(event eventstore.Event) (*handler.State
if err != nil {
return nil, err
}
statements := make([]func(e eventstore.Event) crdb.Exec, 0, len(e.Notifications)+2)
var statements []func(e eventstore.Event) crdb.Exec
// 1. Insert or update quota if the event has not only notification changes
quotaConflictColumns := []handler.Column{
@@ -181,40 +178,32 @@ func (q *quotaProjection) reduceQuotaSet(event eventstore.Event) (*handler.State
statements = append(statements, crdb.AddUpsertStatement(quotaConflictColumns, quotaUpdateCols))
}
// 2. Delete all notifications with unknown IDs
notificationIDs := make([]interface{}, len(e.Notifications))
for i := range e.Notifications {
notificationIDs[i] = e.Notifications[i].ID
// 2. Delete existing notifications
if e.Notifications == nil {
return crdb.NewMultiStatement(e, statements...), nil
}
statements = append(statements, crdb.AddDeleteStatement(
[]handler.Condition{
handler.NewCond(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCond(QuotaNotificationColumnUnit, e.Unit),
crdb.Not(crdb.NewInCond(QuotaNotificationColumnID, notificationIDs)),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
))
// 3. Upsert notifications with the given IDs
for i := range e.Notifications {
notification := e.Notifications[i]
notificationConflictColumns := []handler.Column{
handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaNotificationColumnUnit, e.Unit),
handler.NewCol(QuotaNotificationColumnID, notification.ID),
}
statements = append(statements, crdb.AddUpsertStatement(
notificationConflictColumns,
append(notificationConflictColumns,
notifications := *e.Notifications
for i := range notifications {
notification := notifications[i]
statements = append(statements, crdb.AddCreateStatement(
[]handler.Column{
handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaNotificationColumnUnit, e.Unit),
handler.NewCol(QuotaNotificationColumnID, notification.ID),
handler.NewCol(QuotaNotificationColumnCallURL, notification.CallURL),
handler.NewCol(QuotaNotificationColumnPercent, notification.Percent),
handler.NewCol(QuotaNotificationColumnRepeat, notification.Repeat),
),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
))
}
return crdb.NewMultiStatement(e, statements...), nil
}
@@ -235,9 +224,7 @@ func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*h
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
// The notification could have been removed in the meantime
crdb.WithIgnoreExecErr(func(err error) bool {
return errors.Is(err, sql.ErrNoRows)
}),
crdb.WithIgnoreNotFound(),
), nil
}

View File

@@ -47,12 +47,12 @@ func NewRemoveQuotaNameUniqueConstraint(unit Unit) *eventstore.EventUniqueConstr
// SetEvent describes that a quota is added or modified and contains only changed properties
type SetEvent struct {
eventstore.BaseEvent `json:"-"`
Unit Unit `json:"unit"`
From *time.Time `json:"from,omitempty"`
ResetInterval *time.Duration `json:"interval,omitempty"`
Amount *uint64 `json:"amount,omitempty"`
Limit *bool `json:"limit,omitempty"`
Notifications []*SetEventNotification `json:"notifications,omitempty"`
Unit Unit `json:"unit"`
From *time.Time `json:"from,omitempty"`
ResetInterval *time.Duration `json:"interval,omitempty"`
Amount *uint64 `json:"amount,omitempty"`
Limit *bool `json:"limit,omitempty"`
Notifications *[]*SetEventNotification `json:"notifications,omitempty"`
}
type SetEventNotification struct {
@@ -113,7 +113,7 @@ func ChangeResetInterval(interval time.Duration) QuotaChange {
func ChangeNotifications(notifications []*SetEventNotification) QuotaChange {
return func(event *SetEvent) {
event.Notifications = notifications
event.Notifications = &notifications
}
}