fix: set quotas (#6597)

* feat: set quotas

* fix: start new period on younger anchor

* cleanup e2e config

* fix set notifications

* lint

* test: fix quota projection tests

* fix add quota tests

* make quota fields nullable

* enable amount 0

* fix initial setup

* create a prerelease

* avoid success comments

* fix quota projection primary key

* Revert "fix quota projection primary key"

This reverts commit e72f4d7fa1.

* simplify write model

* fix aggregate id

* avoid push without changes

* test set quota lifecycle

* test set quota mutations

* fix quota unit test

* fix: quotas

* test quota.set event projection

* use SetQuota in integration tests

* fix: release quotas 3

* reset releaserc

* fix comment

* test notification order doesn't matter

* test notification order doesn't matter

* test with unmarshalled events

* test with unmarshalled events

(cherry picked from commit ae1af6bc8c)
This commit is contained in:
Elio Bischof
2023-09-22 11:37:16 +02:00
committed by Livio Spring
parent 41e31aad41
commit 1d4ec6cdba
20 changed files with 1385 additions and 318 deletions

View File

@@ -5,7 +5,7 @@ import (
"time"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/errors"
zitadel_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
@@ -65,10 +65,10 @@ func newQuotaProjection(ctx context.Context, config crdb.StatementHandlerConfig)
crdb.NewColumn(QuotaColumnID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnInstanceID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnUnit, crdb.ColumnTypeEnum),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64, crdb.Nullable()),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp, crdb.Nullable()),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval, crdb.Nullable()),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool, crdb.Nullable()),
},
crdb.NewPrimaryKey(QuotaColumnInstanceID, QuotaColumnUnit),
),
@@ -118,31 +118,20 @@ func (q *quotaProjection) reducers() []handler.AggregateReducer {
EventRedusers: []handler.EventReducer{
{
Event: quota.AddedEventType,
Reduce: q.reduceQuotaAdded,
Reduce: q.reduceQuotaSet,
},
{
Event: quota.SetEventType,
Reduce: q.reduceQuotaSet,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.RemovedEventType,
Reduce: q.reduceQuotaRemoved,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.NotificationDueEventType,
Reduce: q.reduceQuotaNotificationDue,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.NotifiedEventType,
Reduce: q.reduceQuotaNotified,
@@ -156,26 +145,53 @@ func (q *quotaProjection) reduceQuotaNotified(event eventstore.Event) (*handler.
return crdb.NewNoOpStatement(event), nil
}
func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*quota.AddedEvent](event)
func (q *quotaProjection) reduceQuotaSet(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*quota.SetEvent](event)
if err != nil {
return nil, err
}
var statements []func(e eventstore.Event) crdb.Exec
createStatements := make([]func(e eventstore.Event) crdb.Exec, len(e.Notifications)+1)
createStatements[0] = crdb.AddCreateStatement(
[]handler.Column{
handler.NewCol(QuotaColumnID, e.Aggregate().ID),
handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaColumnUnit, e.Unit),
handler.NewCol(QuotaColumnAmount, e.Amount),
handler.NewCol(QuotaColumnFrom, e.From),
handler.NewCol(QuotaColumnInterval, e.ResetInterval),
handler.NewCol(QuotaColumnLimit, e.Limit),
})
for i := range e.Notifications {
notification := e.Notifications[i]
createStatements[i+1] = crdb.AddCreateStatement(
// 1. Insert or update quota if the event has not only notification changes
quotaConflictColumns := []handler.Column{
handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaColumnUnit, e.Unit),
}
quotaUpdateCols := make([]handler.Column, 0, 4+1+len(quotaConflictColumns))
if e.Limit != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnLimit, *e.Limit))
}
if e.Amount != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnAmount, *e.Amount))
}
if e.From != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnFrom, *e.From))
}
if e.ResetInterval != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnInterval, *e.ResetInterval))
}
if len(quotaUpdateCols) > 0 {
// TODO: Add the quota ID to the primary key in a migration?
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnID, e.Aggregate().ID))
quotaUpdateCols = append(quotaUpdateCols, quotaConflictColumns...)
statements = append(statements, crdb.AddUpsertStatement(quotaConflictColumns, quotaUpdateCols))
}
// 2. Delete existing notifications
if e.Notifications == nil {
return crdb.NewMultiStatement(e, statements...), nil
}
statements = append(statements, crdb.AddDeleteStatement(
[]handler.Condition{
handler.NewCond(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCond(QuotaNotificationColumnUnit, e.Unit),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
))
notifications := *e.Notifications
for i := range notifications {
notification := notifications[i]
statements = append(statements, crdb.AddCreateStatement(
[]handler.Column{
handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaNotificationColumnUnit, e.Unit),
@@ -185,10 +201,9 @@ func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Sta
handler.NewCol(QuotaNotificationColumnRepeat, notification.Repeat),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
)
))
}
return crdb.NewMultiStatement(e, createStatements...), nil
return crdb.NewMultiStatement(e, statements...), nil
}
func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*handler.Statement, error) {
@@ -207,6 +222,8 @@ func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*h
handler.NewCond(QuotaNotificationColumnID, e.ID),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
// The notification could have been removed in the meantime
crdb.WithIgnoreNotFound(),
), nil
}
@@ -279,7 +296,7 @@ func (q *quotaProjection) IncrementUsage(ctx context.Context, unit quota.Unit, i
instanceID, unit, periodStart, count,
).Scan(&sum)
if err != nil {
return 0, errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit)
return 0, zitadel_errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit)
}
return sum, err
}