fix: set quotas (#6597)

* feat: set quotas

* fix: start new period on younger anchor

* cleanup e2e config

* fix set notifications

* lint

* test: fix quota projection tests

* fix add quota tests

* make quota fields nullable

* enable amount 0

* fix initial setup

* create a prerelease

* avoid success comments

* fix quota projection primary key

* Revert "fix quota projection primary key"

This reverts commit e72f4d7fa1.

* simplify write model

* fix aggregate id

* avoid push without changes

* test set quota lifecycle

* test set quota mutations

* fix quota unit test

* fix: quotas

* test quota.set event projection

* use SetQuota in integration tests

* fix: release quotas 3

* reset releaserc

* fix comment

* test notification order doesn't matter

* test notification order doesn't matter

* test with unmarshalled events

* test with unmarshalled events
This commit is contained in:
Elio Bischof
2023-09-22 11:37:16 +02:00
committed by GitHub
parent e6d273b328
commit ae1af6bc8c
20 changed files with 1385 additions and 318 deletions

View File

@@ -5,7 +5,7 @@ import (
"time"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/errors"
zitadel_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
@@ -65,10 +65,10 @@ func newQuotaProjection(ctx context.Context, config crdb.StatementHandlerConfig)
crdb.NewColumn(QuotaColumnID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnInstanceID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnUnit, crdb.ColumnTypeEnum),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64, crdb.Nullable()),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp, crdb.Nullable()),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval, crdb.Nullable()),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool, crdb.Nullable()),
},
crdb.NewPrimaryKey(QuotaColumnInstanceID, QuotaColumnUnit),
),
@@ -118,31 +118,20 @@ func (q *quotaProjection) reducers() []handler.AggregateReducer {
EventRedusers: []handler.EventReducer{
{
Event: quota.AddedEventType,
Reduce: q.reduceQuotaAdded,
Reduce: q.reduceQuotaSet,
},
{
Event: quota.SetEventType,
Reduce: q.reduceQuotaSet,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.RemovedEventType,
Reduce: q.reduceQuotaRemoved,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.NotificationDueEventType,
Reduce: q.reduceQuotaNotificationDue,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.NotifiedEventType,
Reduce: q.reduceQuotaNotified,
@@ -156,26 +145,53 @@ func (q *quotaProjection) reduceQuotaNotified(event eventstore.Event) (*handler.
return crdb.NewNoOpStatement(event), nil
}
func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*quota.AddedEvent](event)
func (q *quotaProjection) reduceQuotaSet(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*quota.SetEvent](event)
if err != nil {
return nil, err
}
var statements []func(e eventstore.Event) crdb.Exec
createStatements := make([]func(e eventstore.Event) crdb.Exec, len(e.Notifications)+1)
createStatements[0] = crdb.AddCreateStatement(
[]handler.Column{
handler.NewCol(QuotaColumnID, e.Aggregate().ID),
handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaColumnUnit, e.Unit),
handler.NewCol(QuotaColumnAmount, e.Amount),
handler.NewCol(QuotaColumnFrom, e.From),
handler.NewCol(QuotaColumnInterval, e.ResetInterval),
handler.NewCol(QuotaColumnLimit, e.Limit),
})
for i := range e.Notifications {
notification := e.Notifications[i]
createStatements[i+1] = crdb.AddCreateStatement(
// 1. Insert or update quota if the event has not only notification changes
quotaConflictColumns := []handler.Column{
handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaColumnUnit, e.Unit),
}
quotaUpdateCols := make([]handler.Column, 0, 4+1+len(quotaConflictColumns))
if e.Limit != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnLimit, *e.Limit))
}
if e.Amount != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnAmount, *e.Amount))
}
if e.From != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnFrom, *e.From))
}
if e.ResetInterval != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnInterval, *e.ResetInterval))
}
if len(quotaUpdateCols) > 0 {
// TODO: Add the quota ID to the primary key in a migration?
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnID, e.Aggregate().ID))
quotaUpdateCols = append(quotaUpdateCols, quotaConflictColumns...)
statements = append(statements, crdb.AddUpsertStatement(quotaConflictColumns, quotaUpdateCols))
}
// 2. Delete existing notifications
if e.Notifications == nil {
return crdb.NewMultiStatement(e, statements...), nil
}
statements = append(statements, crdb.AddDeleteStatement(
[]handler.Condition{
handler.NewCond(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCond(QuotaNotificationColumnUnit, e.Unit),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
))
notifications := *e.Notifications
for i := range notifications {
notification := notifications[i]
statements = append(statements, crdb.AddCreateStatement(
[]handler.Column{
handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaNotificationColumnUnit, e.Unit),
@@ -185,10 +201,9 @@ func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Sta
handler.NewCol(QuotaNotificationColumnRepeat, notification.Repeat),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
)
))
}
return crdb.NewMultiStatement(e, createStatements...), nil
return crdb.NewMultiStatement(e, statements...), nil
}
func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*handler.Statement, error) {
@@ -207,6 +222,8 @@ func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*h
handler.NewCond(QuotaNotificationColumnID, e.ID),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
// The notification could have been removed in the meantime
crdb.WithIgnoreNotFound(),
), nil
}
@@ -279,7 +296,7 @@ func (q *quotaProjection) IncrementUsage(ctx context.Context, unit quota.Unit, i
instanceID, unit, periodStart, count,
).Scan(&sum)
if err != nil {
return 0, errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit)
return 0, zitadel_errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit)
}
return sum, err
}

View File

@@ -29,7 +29,7 @@ func TestQuotasProjection_reduces(t *testing.T) {
want wantReduce
}{
{
name: "reduceQuotaAdded",
name: "reduceQuotaSet with added type",
args: args{
event: getEvent(testEvent(
repository.EventType(quota.AddedEventType),
@@ -41,9 +41,9 @@ func TestQuotasProjection_reduces(t *testing.T) {
"from": "2023-01-01T00:00:00Z",
"interval": 300000000000
}`),
), quota.AddedEventMapper),
), quota.SetEventMapper),
},
reduce: (&quotaProjection{}).reduceQuotaAdded,
reduce: (&quotaProjection{}).reduceQuotaSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("quota"),
sequence: 15,
@@ -51,15 +51,15 @@ func TestQuotasProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.quotas (id, instance_id, unit, amount, from_anchor, interval, limit_usage) VALUES ($1, $2, $3, $4, $5, $6, $7)",
expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)",
expectedArgs: []interface{}{
"agg-id",
"instance-id",
quota.RequestsAllAuthenticated,
true,
uint64(10),
time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC),
time.Minute * 5,
true,
"agg-id",
"instance-id",
quota.RequestsAllAuthenticated,
},
},
},
@@ -67,7 +67,7 @@ func TestQuotasProjection_reduces(t *testing.T) {
},
},
{
name: "reduceQuotaAdded with notification",
name: "reduceQuotaAdded with added type and notification",
args: args{
event: getEvent(testEvent(
repository.EventType(quota.AddedEventType),
@@ -87,9 +87,9 @@ func TestQuotasProjection_reduces(t *testing.T) {
}
]
}`),
), quota.AddedEventMapper),
), quota.SetEventMapper),
},
reduce: (&quotaProjection{}).reduceQuotaAdded,
reduce: (&quotaProjection{}).reduceQuotaSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("quota"),
sequence: 15,
@@ -97,17 +97,126 @@ func TestQuotasProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.quotas (id, instance_id, unit, amount, from_anchor, interval, limit_usage) VALUES ($1, $2, $3, $4, $5, $6, $7)",
expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)",
expectedArgs: []interface{}{
"agg-id",
"instance-id",
quota.RequestsAllAuthenticated,
true,
uint64(10),
time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC),
time.Minute * 5,
"agg-id",
"instance-id",
quota.RequestsAllAuthenticated,
},
},
{
expectedStmt: "DELETE FROM projections.quotas_notifications WHERE (instance_id = $1) AND (unit = $2)",
expectedArgs: []interface{}{
"instance-id",
quota.RequestsAllAuthenticated,
},
},
{
expectedStmt: "INSERT INTO projections.quotas_notifications (instance_id, unit, id, call_url, percent, repeat) VALUES ($1, $2, $3, $4, $5, $6)",
expectedArgs: []interface{}{
"instance-id",
quota.RequestsAllAuthenticated,
"id",
"url",
uint16(100),
true,
},
},
},
},
},
},
{
name: "reduceQuotaSet with set type",
args: args{
event: getEvent(testEvent(
repository.EventType(quota.SetEventType),
quota.AggregateType,
[]byte(`{
"unit": 1,
"amount": 10,
"limit": true,
"from": "2023-01-01T00:00:00Z",
"interval": 300000000000
}`),
), quota.SetEventMapper),
},
reduce: (&quotaProjection{}).reduceQuotaSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("quota"),
sequence: 15,
previousSequence: 10,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)",
expectedArgs: []interface{}{
true,
uint64(10),
time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC),
time.Minute * 5,
"agg-id",
"instance-id",
quota.RequestsAllAuthenticated,
},
},
},
},
},
},
{
name: "reduceQuotaAdded with set type and notification",
args: args{
event: getEvent(testEvent(
repository.EventType(quota.SetEventType),
quota.AggregateType,
[]byte(`{
"unit": 1,
"amount": 10,
"limit": true,
"from": "2023-01-01T00:00:00Z",
"interval": 300000000000,
"notifications": [
{
"id": "id",
"percent": 100,
"repeat": true,
"callURL": "url"
}
]
}`),
), quota.SetEventMapper),
},
reduce: (&quotaProjection{}).reduceQuotaSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("quota"),
sequence: 15,
previousSequence: 10,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)",
expectedArgs: []interface{}{
true,
uint64(10),
time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC),
time.Minute * 5,
"agg-id",
"instance-id",
quota.RequestsAllAuthenticated,
},
},
{
expectedStmt: "DELETE FROM projections.quotas_notifications WHERE (instance_id = $1) AND (unit = $2)",
expectedArgs: []interface{}{
"instance-id",
quota.RequestsAllAuthenticated,
},
},
{
expectedStmt: "INSERT INTO projections.quotas_notifications (instance_id, unit, id, call_url, percent, repeat) VALUES ($1, $2, $3, $4, $5, $6)",
expectedArgs: []interface{}{

View File

@@ -49,7 +49,8 @@ func (q *Queries) GetRemainingQuotaUsage(ctx context.Context, instanceID string,
QuotaColumnLimit.identifier(): true,
},
sq.Expr("age(" + QuotaPeriodColumnStart.identifier() + ") < " + QuotaColumnInterval.identifier()),
sq.Expr(QuotaPeriodColumnStart.identifier() + " < now()"),
sq.Expr(QuotaPeriodColumnStart.identifier() + " <= now()"),
sq.Expr(QuotaPeriodColumnStart.identifier() + " >= " + QuotaColumnFrom.identifier()),
}).
ToSql()
if err != nil {
@@ -73,14 +74,14 @@ func prepareRemainingQuotaUsageQuery(ctx context.Context, db prepareDatabase) (s
From(quotaPeriodsTable.identifier()).
Join(join(QuotaColumnUnit, QuotaPeriodColumnUnit) + db.Timetravel(call.Took(ctx))).
PlaceholderFormat(sq.Dollar), func(row *sql.Row) (*uint64, error) {
usage := new(uint64)
err := row.Scan(usage)
remaining := new(uint64)
err := row.Scan(remaining)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, zitadel_errors.ThrowNotFound(err, "QUERY-quiowi2", "Errors.Internal")
}
return nil, zitadel_errors.ThrowInternal(err, "QUERY-81j1jn2", "Errors.Internal")
}
return usage, nil
return remaining, nil
}
}