feat: set quotas

This commit is contained in:
Elio Bischof
2023-09-20 23:25:03 +02:00
parent a5decda201
commit 243a47e516
12 changed files with 509 additions and 197 deletions

View File

@@ -5,7 +5,6 @@ import (
"github.com/zitadel/zitadel/internal/api/grpc/object"
"github.com/zitadel/zitadel/pkg/grpc/system"
system_pb "github.com/zitadel/zitadel/pkg/grpc/system"
)
func (s *Server) AddQuota(ctx context.Context, req *system.AddQuotaRequest) (*system.AddQuotaResponse, error) {
@@ -16,7 +15,20 @@ func (s *Server) AddQuota(ctx context.Context, req *system.AddQuotaRequest) (*sy
if err != nil {
return nil, err
}
return &system_pb.AddQuotaResponse{
return &system.AddQuotaResponse{
Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner),
}, nil
}
func (s *Server) SetQuota(ctx context.Context, req *system.SetQuotaRequest) (*system.SetQuotaResponse, error) {
details, err := s.command.SetQuota(
ctx,
instanceQuotaPbToCommand(req),
)
if err != nil {
return nil, err
}
return &system.SetQuotaResponse{
Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner),
}, nil
}
@@ -26,7 +38,7 @@ func (s *Server) RemoveQuota(ctx context.Context, req *system.RemoveQuotaRequest
if err != nil {
return nil, err
}
return &system_pb.RemoveQuotaResponse{
return &system.RemoveQuotaResponse{
Details: object.ChangeToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner),
}, nil
}

View File

@@ -3,17 +3,27 @@ package system
import (
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/pkg/grpc/quota"
"github.com/zitadel/zitadel/pkg/grpc/system"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
func instanceQuotaPbToCommand(req *system.AddQuotaRequest) *command.AddQuota {
return &command.AddQuota{
Unit: instanceQuotaUnitPbToCommand(req.Unit),
From: req.From.AsTime(),
ResetInterval: req.ResetInterval.AsDuration(),
Amount: req.Amount,
Limit: req.Limit,
Notifications: instanceQuotaNotificationsPbToCommand(req.Notifications),
type setQuotaRequest interface {
GetUnit() quota.Unit
GetFrom() *timestamppb.Timestamp
GetResetInterval() *durationpb.Duration
GetAmount() uint64
GetLimit() bool
GetNotifications() []*quota.Notification
}
func instanceQuotaPbToCommand(req setQuotaRequest) *command.SetQuota {
return &command.SetQuota{
Unit: instanceQuotaUnitPbToCommand(req.GetUnit()),
From: req.GetFrom().AsTime(),
ResetInterval: req.GetResetInterval().AsDuration(),
Amount: req.GetAmount(),
Limit: req.GetLimit(),
Notifications: instanceQuotaNotificationsPbToCommand(req.GetNotifications()),
}
}

View File

@@ -110,7 +110,7 @@ type InstanceSetup struct {
RefreshTokenExpiration time.Duration
}
Quotas *struct {
Items []*AddQuota
Items []*SetQuota
}
}
@@ -283,7 +283,7 @@ func (c *Commands) SetUpInstance(ctx context.Context, setup *InstanceSetup) (str
if err != nil {
return "", "", nil, nil, err
}
validations = append(validations, c.AddQuotaCommand(quota.NewAggregate(quotaId, instanceID), q))
validations = append(validations, c.SetQuotaCommand(quota.NewAggregate(quotaId, instanceID), nil, q))
}
}

View File

@@ -2,6 +2,7 @@ package command
import (
"context"
"fmt"
"net/url"
"time"
@@ -10,7 +11,6 @@ import (
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/id"
"github.com/zitadel/zitadel/internal/repository/quota"
)
@@ -32,17 +32,17 @@ func (q QuotaUnit) Enum() quota.Unit {
}
}
// AddQuota returns and error if the quota already exists.
// AddQuota is deprecated. Use SetQuota instead.
func (c *Commands) AddQuota(
ctx context.Context,
q *AddQuota,
q *SetQuota,
) (*domain.ObjectDetails, error) {
instanceId := authz.GetInstance(ctx).InstanceID()
wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, q.Unit.Enum())
if err != nil {
return nil, err
}
if wm.active {
return nil, errors.ThrowAlreadyExists(nil, "COMMAND-WDfFf", "Errors.Quota.AlreadyExists")
}
@@ -50,7 +50,42 @@ func (c *Commands) AddQuota(
if err != nil {
return nil, err
}
cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.AddQuotaCommand(quota.NewAggregate(aggregateId, instanceId), q))
cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.SetQuotaCommand(quota.NewAggregate(aggregateId, instanceId), wm, q))
if err != nil {
return nil, err
}
events, err := c.eventstore.Push(ctx, cmds...)
if err != nil {
return nil, err
}
err = AppendAndReduce(wm, events...)
if err != nil {
return nil, err
}
return writeModelToObjectDetails(&wm.WriteModel), nil
}
// SetQuota creates a new quota or updates an existing one.
func (c *Commands) SetQuota(
ctx context.Context,
q *SetQuota,
) (*domain.ObjectDetails, error) {
instanceId := authz.GetInstance(ctx).InstanceID()
wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, q.Unit.Enum())
if err != nil {
return nil, err
}
if wm.AggregateID == "" && wm.active {
return nil, errors.ThrowInternal(fmt.Errorf("quota is active but has no id"), "COMMAND-3M9ds", "Errors.Internal")
}
aggregateId := wm.AggregateID
if aggregateId == "" {
aggregateId, err = c.idGenerator.Next()
if err != nil {
return nil, err
}
}
cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.SetQuotaCommand(quota.NewAggregate(aggregateId, instanceId), wm, q))
if err != nil {
return nil, err
}
@@ -67,21 +102,15 @@ func (c *Commands) AddQuota(
func (c *Commands) RemoveQuota(ctx context.Context, unit QuotaUnit) (*domain.ObjectDetails, error) {
instanceId := authz.GetInstance(ctx).InstanceID()
wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, unit.Enum())
if err != nil {
return nil, err
}
if !wm.active {
return nil, errors.ThrowNotFound(nil, "COMMAND-WDfFf", "Errors.Quota.NotFound")
}
aggregate := quota.NewAggregate(wm.AggregateID, instanceId)
events := []eventstore.Command{
quota.NewRemovedEvent(ctx, &aggregate.Aggregate, unit.Enum()),
}
events := []eventstore.Command{quota.NewRemovedEvent(ctx, &aggregate.Aggregate, unit.Enum())}
pushedEvents, err := c.eventstore.Push(ctx, events...)
if err != nil {
return nil, err
@@ -104,6 +133,16 @@ type QuotaNotification struct {
CallURL string
}
// SetQuota configures a quota and activates it if it isn't active already
type SetQuota struct {
Unit QuotaUnit `json:"unit"`
From time.Time `json:"from"`
ResetInterval time.Duration `json:"ResetInterval,omitempty"`
Amount uint64 `json:"Amount,omitempty"`
Limit bool `json:"Limit,omitempty"`
Notifications QuotaNotifications `json:"Notifications,omitempty"`
}
type QuotaNotifications []*QuotaNotification
func (q *QuotaNotification) validate() error {
@@ -111,94 +150,51 @@ func (q *QuotaNotification) validate() error {
if err != nil {
return errors.ThrowInvalidArgument(err, "QUOTA-bZ0Fj", "Errors.Quota.Invalid.CallURL")
}
if !u.IsAbs() || u.Host == "" {
return errors.ThrowInvalidArgument(nil, "QUOTA-HAYmN", "Errors.Quota.Invalid.CallURL")
}
if q.Percent < 1 {
return errors.ThrowInvalidArgument(nil, "QUOTA-pBfjq", "Errors.Quota.Invalid.Percent")
}
return nil
}
func (q *QuotaNotifications) toAddedEventNotifications(idGenerator id.Generator) ([]*quota.AddedEventNotification, error) {
if q == nil {
return nil, nil
}
notifications := make([]*quota.AddedEventNotification, len(*q))
for idx, notification := range *q {
id, err := idGenerator.Next()
if err != nil {
return nil, err
}
notifications[idx] = &quota.AddedEventNotification{
ID: id,
Percent: notification.Percent,
Repeat: notification.Repeat,
CallURL: notification.CallURL,
}
}
return notifications, nil
}
type AddQuota struct {
Unit QuotaUnit
From time.Time
ResetInterval time.Duration
Amount uint64
Limit bool
Notifications QuotaNotifications
}
func (q *AddQuota) validate() error {
func (q *SetQuota) validate() error {
for _, notification := range q.Notifications {
if err := notification.validate(); err != nil {
return err
}
}
if q.Unit.Enum() == quota.Unimplemented {
return errors.ThrowInvalidArgument(nil, "QUOTA-OTeSh", "Errors.Quota.Invalid.Unimplemented")
}
if q.Amount < 1 {
if q.Amount < 0 {
return errors.ThrowInvalidArgument(nil, "QUOTA-hOKSJ", "Errors.Quota.Invalid.Amount")
}
if q.ResetInterval < time.Minute {
return errors.ThrowInvalidArgument(nil, "QUOTA-R5otd", "Errors.Quota.Invalid.ResetInterval")
}
return nil
}
func (c *Commands) AddQuotaCommand(a *quota.Aggregate, q *AddQuota) preparation.Validation {
func (c *Commands) SetQuotaCommand(a *quota.Aggregate, wm *quotaWriteModel, q *SetQuota) preparation.Validation {
return func() (preparation.CreateCommands, error) {
if err := q.validate(); err != nil {
return nil, err
}
return func(ctx context.Context, filter preparation.FilterToQueryReducer) (cmd []eventstore.Command, err error) {
notifications, err := q.Notifications.toAddedEventNotifications(c.idGenerator)
if err != nil {
changes, err := wm.NewChanges(c.idGenerator, q.Amount, q.From, q.ResetInterval, q.Limit, q.Notifications)
if len(changes) == 0 {
return nil, err
}
return []eventstore.Command{quota.NewAddedEvent(
ctx,
&a.Aggregate,
return []eventstore.Command{quota.NewSetEvent(
eventstore.NewBaseEventForPush(
ctx,
&a.Aggregate,
quota.SetEventType,
),
q.Unit.Enum(),
q.From,
q.ResetInterval,
q.Amount,
q.Limit,
notifications,
changes,
)}, err
},
nil

View File

@@ -1,14 +1,27 @@
package command
import (
"slices"
"strings"
"time"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/id"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/quota"
)
type quotaWriteModel struct {
eventstore.WriteModel
unit quota.Unit
active bool
active bool
unit quota.Unit
from time.Time
resetInterval time.Duration
amount uint64
limit bool
notifications []*quota.SetEventNotification
}
// newQuotaWriteModel aggregateId is filled by reducing unit matching events
@@ -30,6 +43,7 @@ func (wm *quotaWriteModel) Query() *eventstore.SearchQueryBuilder {
AggregateTypes(quota.AggregateType).
EventTypes(
quota.AddedEventType,
quota.SetEventType,
quota.RemovedEventType,
).EventData(map[string]interface{}{"unit": wm.unit})
@@ -38,15 +52,137 @@ func (wm *quotaWriteModel) Query() *eventstore.SearchQueryBuilder {
func (wm *quotaWriteModel) Reduce() error {
for _, event := range wm.Events {
wm.ChangeDate = event.CreationDate()
switch e := event.(type) {
case *quota.AddedEvent:
wm.AggregateID = e.Aggregate().ID
wm.ChangeDate = e.CreationDate()
case *quota.SetEvent:
wm.active = true
wm.AggregateID = event.Aggregate().ID
if e.Amount != nil {
wm.amount = *e.Amount
}
if e.From != nil {
wm.from = *e.From
}
if e.Limit != nil {
wm.limit = *e.Limit
}
if e.ResetInterval != nil {
wm.resetInterval = *e.ResetInterval
}
if e.Notifications != nil {
wm.notifications = e.Notifications
}
case *quota.RemovedEvent:
wm.AggregateID = e.Aggregate().ID
wm.active = false
wm.AggregateID = ""
}
}
return wm.WriteModel.Reduce()
}
// NewChanges returns all changes that need to be applied to the aggregate.
// If wm is nil, all properties are set.
func (wm *quotaWriteModel) NewChanges(
idGenerator id.Generator,
amount uint64,
from time.Time,
resetInterval time.Duration,
limit bool,
notifications QuotaNotifications,
) (changes []quota.QuotaChange, err error) {
setEventNotifications, err := notifications.newSetEventNotifications(idGenerator)
if err != nil {
return nil, err
}
if wm == nil || wm.active == false {
return []quota.QuotaChange{
quota.ChangeAmount(amount),
quota.ChangeFrom(from),
quota.ChangeResetInterval(resetInterval),
quota.ChangeLimit(limit),
quota.ChangeNotifications(setEventNotifications),
}, nil
}
changes = make([]quota.QuotaChange, 0, 5)
if wm.amount != amount {
changes = append(changes, quota.ChangeAmount(amount))
}
if wm.from != from {
changes = append(changes, quota.ChangeFrom(from))
}
if wm.resetInterval != resetInterval {
changes = append(changes, quota.ChangeResetInterval(resetInterval))
}
if wm.limit != limit {
changes = append(changes, quota.ChangeLimit(limit))
}
if len(setEventNotifications) != len(wm.notifications) {
changes = append(changes, quota.ChangeNotifications(setEventNotifications))
return changes, nil
}
replaceIDs(wm.notifications, setEventNotifications)
// All IDs are passed and the number of notifications didn't change.
// Now we check if the properties changed.
err = sortSetEventNotifications(setEventNotifications)
if err != nil {
return nil, err
}
// We ignore the sorting error for the existing notifications, because this is system state, not user input.
// If the sorting fails, the notifications will be cleaned up and triggered again.
_ = sortSetEventNotifications(wm.notifications)
for i, notification := range setEventNotifications {
if notification.ID != wm.notifications[i].ID ||
notification.CallURL != wm.notifications[i].CallURL ||
notification.Percent != wm.notifications[i].Percent ||
notification.Repeat != wm.notifications[i].Repeat {
changes = append(changes, quota.ChangeNotifications(setEventNotifications))
return changes, nil
}
}
return changes, err
}
// newSetEventNotifications returns quota.SetEventNotification elements with generated IDs.
func (q *QuotaNotifications) newSetEventNotifications(idGenerator id.Generator) (setNotifications []*quota.SetEventNotification, err error) {
if q == nil {
return nil, nil
}
notifications := make([]*quota.SetEventNotification, len(*q))
for idx, notification := range *q {
notifications[idx] = &quota.SetEventNotification{
Percent: notification.Percent,
Repeat: notification.Repeat,
CallURL: notification.CallURL,
}
notifications[idx].ID, err = idGenerator.Next()
if err != nil {
return nil, err
}
}
return notifications, nil
}
func replaceIDs(srcs, dsts []*quota.SetEventNotification) {
for _, dst := range dsts {
for _, src := range srcs {
if dst.CallURL == src.CallURL &&
dst.Percent == src.Percent &&
dst.Repeat == src.Repeat {
dst.ID = src.ID
break
}
}
}
}
func sortSetEventNotifications(notifications []*quota.SetEventNotification) (err error) {
slices.SortFunc(notifications, func(i, j *quota.SetEventNotification) int {
comp := strings.Compare(i.ID, j.ID)
if comp == 0 {
// TODO: translate
err = errors.ThrowPreconditionFailed(nil, "EVENT-3M9fs", "Errors.Quota.Notifications.Duplicate")
}
return comp
})
return err
}

View File

@@ -44,7 +44,7 @@ func TestQuota_AddQuota(t *testing.T) {
t,
expectFilter(
eventFromEventPusher(
quota.NewAddedEvent(context.Background(),
quota.NewSetEvent(context.Background(),
&quota.NewAggregate("quota1", "INSTANCE").Aggregate,
QuotaRequestsAllAuthenticated.Enum(),
time.Now(),
@@ -108,7 +108,7 @@ func TestQuota_AddQuota(t *testing.T) {
[]*repository.Event{
eventFromEventPusherWithInstanceID(
"INSTANCE",
quota.NewAddedEvent(context.Background(),
quota.NewSetEvent(context.Background(),
&quota.NewAggregate("quota1", "INSTANCE").Aggregate,
QuotaRequestsAllAuthenticated.Enum(),
time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC),
@@ -149,7 +149,7 @@ func TestQuota_AddQuota(t *testing.T) {
expectFilter(
eventFromEventPusherWithInstanceID(
"INSTANCE",
quota.NewAddedEvent(context.Background(),
quota.NewSetEvent(context.Background(),
&quota.NewAggregate("quota1", "INSTANCE").Aggregate,
QuotaRequestsAllAuthenticated.Enum(),
time.Now(),
@@ -171,7 +171,7 @@ func TestQuota_AddQuota(t *testing.T) {
[]*repository.Event{
eventFromEventPusherWithInstanceID(
"INSTANCE",
quota.NewAddedEvent(context.Background(),
quota.NewSetEvent(context.Background(),
&quota.NewAggregate("quota1", "INSTANCE").Aggregate,
QuotaRequestsAllAuthenticated.Enum(),
time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC),
@@ -214,7 +214,7 @@ func TestQuota_AddQuota(t *testing.T) {
[]*repository.Event{
eventFromEventPusherWithInstanceID(
"INSTANCE",
quota.NewAddedEvent(context.Background(),
quota.NewSetEvent(context.Background(),
&quota.NewAggregate("quota1", "INSTANCE").Aggregate,
QuotaRequestsAllAuthenticated.Enum(),
time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC),
@@ -325,7 +325,7 @@ func TestQuota_RemoveQuota(t *testing.T) {
expectFilter(
eventFromEventPusherWithInstanceID(
"INSTANCE",
quota.NewAddedEvent(context.Background(),
quota.NewSetEvent(context.Background(),
&quota.NewAggregate("quota1", "INSTANCE").Aggregate,
QuotaRequestsAllAuthenticated.Enum(),
time.Now(),
@@ -363,7 +363,7 @@ func TestQuota_RemoveQuota(t *testing.T) {
expectFilter(
eventFromEventPusherWithInstanceID(
"INSTANCE",
quota.NewAddedEvent(context.Background(),
quota.NewSetEvent(context.Background(),
&quota.NewAggregate("quota1", "INSTANCE").Aggregate,
QuotaRequestsAllAuthenticated.Enum(),
time.Now(),

View File

@@ -4,6 +4,8 @@ import (
"strconv"
"strings"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
caos_errs "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -14,8 +16,9 @@ type execOption func(*execConfig)
type execConfig struct {
tableName string
args []interface{}
err error
args []interface{}
err error
ignoreExecErr func(error) bool
}
func WithTableSuffix(name string) func(*execConfig) {
@@ -24,6 +27,12 @@ func WithTableSuffix(name string) func(*execConfig) {
}
}
func WithIgnoreExecErr(ignore func(error) bool) func(*execConfig) {
return func(o *execConfig) {
o.ignoreExecErr = ignore
}
}
func NewCreateStatement(event eventstore.Event, values []handler.Column, opts ...execOption) *handler.Statement {
cols, params, args := columnsToQuery(values)
columnNames := strings.Join(cols, ", ")
@@ -280,30 +289,47 @@ func NewCopyCol(column, from string) handler.Column {
}
func NewLessThanCond(column string, value interface{}) handler.Condition {
return func(param string) (string, interface{}) {
return column + " < " + param, value
return func() ([]interface{}, func(params []string) string) {
return []interface{}{value}, func(params []string) string {
return column + " < " + params[0]
}
}
}
func NewIsNullCond(column string) handler.Condition {
return func(param string) (string, interface{}) {
return column + " IS NULL", nil
return func() ([]interface{}, func(params []string) string) {
return nil, func([]string) string {
return column + " IS NULL"
}
}
}
// NewTextArrayContainsCond returns a handler.Condition that checks if the column that stores an array of text contains the given value
func NewTextArrayContainsCond(column string, value string) handler.Condition {
return func(param string) (string, interface{}) {
return column + " @> " + param, database.StringArray{value}
return func() ([]interface{}, func(params []string) string) {
return []interface{}{database.StringArray{value}}, func(params []string) string {
return column + " @> " + params[0]
}
}
}
// Not is a function and not a method, so that calling it is well readable
// NewInCond returns an IN condition that matches multiple values
func NewInCond(column string, values []interface{}) handler.Condition {
return func() ([]interface{}, func(params []string) string) {
return values, func(params []string) string {
return column + " IN ( " + strings.Join(params, ", ") + " )"
}
}
}
// Not negates a condition
// For example conditions := []handler.Condition{ Not(NewTextArrayContainsCond())}
func Not(condition handler.Condition) handler.Condition {
return func(param string) (string, interface{}) {
cond, value := condition(param)
return "NOT (" + cond + ")", value
return func() ([]interface{}, func(params []string) string) {
values, condFunc := condition()
return values, func(params []string) string {
return "NOT (" + condFunc(params) + ")"
}
}
}
@@ -407,13 +433,15 @@ func columnsToQuery(cols []handler.Column) (names []string, parameters []string,
func conditionsToWhere(conditions []handler.Condition, paramOffset int) (wheres []string, values []interface{}) {
wheres = make([]string, len(conditions))
values = make([]interface{}, 0, len(conditions))
for i, conditionFunc := range conditions {
condition, value := conditionFunc("$" + strconv.Itoa(i+1+paramOffset))
wheres[i] = "(" + condition + ")"
if value != nil {
values = append(values, value)
for i, condition := range conditions {
conditionValues, conditionFunc := condition()
values = append(values, conditionValues...)
params := make([]string, len(conditionValues))
for j := range conditionValues {
paramOffset++
params[j] = "$" + strconv.Itoa(paramOffset)
}
wheres[i] = "(" + conditionFunc(params) + ")"
}
return wheres, values
}
@@ -436,7 +464,12 @@ func exec(config execConfig, q query, opts []execOption) Exec {
}
if _, err := ex.Exec(q(config), config.args...); err != nil {
return caos_errs.ThrowInternal(err, "CRDB-pKtsr", "exec failed")
execErr := caos_errs.ThrowInternal(err, "CRDB-pKtsr", "exec failed")
if config.ignoreExecErr != nil && config.ignoreExecErr(err) {
logging.Debugf("CRDB-4M9fs", "exec failed, but ignored: %v", err)
return nil
}
return execErr
}
return nil

View File

@@ -4,6 +4,7 @@ import (
"database/sql"
"encoding/json"
"errors"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -61,13 +62,15 @@ func NewJSONCol(name string, value interface{}) Column {
return NewCol(name, marshalled)
}
type Condition func(param string) (string, interface{})
type Condition func() ([]interface{}, func(params []string) string)
type NamespacedCondition func(namespace string) Condition
func NewCond(name string, value interface{}) Condition {
return func(param string) (string, interface{}) {
return name + " = " + param, value
return func() ([]interface{}, func(params []string) string) {
return []interface{}{value}, func(params []string) string {
return name + " = " + params[0]
}
}
}

View File

@@ -2,10 +2,13 @@ package projection
import (
"context"
"database/sql"
"errors"
"time"
zitadel_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
@@ -65,10 +68,10 @@ func newQuotaProjection(ctx context.Context, config crdb.StatementHandlerConfig)
crdb.NewColumn(QuotaColumnID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnInstanceID, crdb.ColumnTypeText),
crdb.NewColumn(QuotaColumnUnit, crdb.ColumnTypeEnum),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool),
crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64, crdb.Nullable()),
crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp, crdb.Nullable()),
crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval, crdb.Nullable()),
crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool, crdb.Nullable()),
},
crdb.NewPrimaryKey(QuotaColumnInstanceID, QuotaColumnUnit),
),
@@ -118,31 +121,20 @@ func (q *quotaProjection) reducers() []handler.AggregateReducer {
EventRedusers: []handler.EventReducer{
{
Event: quota.AddedEventType,
Reduce: q.reduceQuotaAdded,
Reduce: q.reduceQuotaSet,
},
{
Event: quota.SetEventType,
Reduce: q.reduceQuotaSet,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.RemovedEventType,
Reduce: q.reduceQuotaRemoved,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.NotificationDueEventType,
Reduce: q.reduceQuotaNotificationDue,
},
},
},
{
Aggregate: quota.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: quota.NotifiedEventType,
Reduce: q.reduceQuotaNotified,
@@ -156,39 +148,74 @@ func (q *quotaProjection) reduceQuotaNotified(event eventstore.Event) (*handler.
return crdb.NewNoOpStatement(event), nil
}
func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*quota.AddedEvent](event)
func (q *quotaProjection) reduceQuotaSet(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*quota.SetEvent](event)
if err != nil {
return nil, err
}
createStatements := make([]func(e eventstore.Event) crdb.Exec, len(e.Notifications)+1)
createStatements[0] = crdb.AddCreateStatement(
[]handler.Column{
handler.NewCol(QuotaColumnID, e.Aggregate().ID),
handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaColumnUnit, e.Unit),
handler.NewCol(QuotaColumnAmount, e.Amount),
handler.NewCol(QuotaColumnFrom, e.From),
handler.NewCol(QuotaColumnInterval, e.ResetInterval),
handler.NewCol(QuotaColumnLimit, e.Limit),
})
statements := make([]func(e eventstore.Event) crdb.Exec, 0, len(e.Notifications)+2)
// 1. Insert or update quota if the event has not only notification changes
quotaConflictColumns := []handler.Column{
handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaColumnUnit, e.Unit),
}
quotaUpdateCols := make([]handler.Column, 0, 4+1+len(quotaConflictColumns))
if e.Limit != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnLimit, *e.Limit))
}
if e.Amount != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnAmount, *e.Amount))
}
if e.From != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnFrom, *e.From))
}
if e.ResetInterval != nil {
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnInterval, *e.ResetInterval))
}
if len(quotaUpdateCols) > 0 {
// TODO: Add the quota ID to the primary key in a migration?
quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnID, e.Aggregate().ID))
quotaUpdateCols = append(quotaUpdateCols, quotaConflictColumns...)
statements = append(statements, crdb.AddUpsertStatement(quotaConflictColumns, quotaUpdateCols))
}
// 2. Delete all notifications with unknown IDs
notificationIDs := make([]interface{}, len(e.Notifications))
for i := range e.Notifications {
notificationIDs[i] = e.Notifications[i].ID
}
statements = append(statements, crdb.AddDeleteStatement(
[]handler.Condition{
handler.NewCond(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCond(QuotaNotificationColumnUnit, e.Unit),
crdb.Not(crdb.NewInCond(QuotaNotificationColumnID, notificationIDs)),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
))
// 3. Upsert notifications with the given IDs
for i := range e.Notifications {
notification := e.Notifications[i]
createStatements[i+1] = crdb.AddCreateStatement(
[]handler.Column{
handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaNotificationColumnUnit, e.Unit),
handler.NewCol(QuotaNotificationColumnID, notification.ID),
notificationConflictColumns := []handler.Column{
handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(QuotaNotificationColumnUnit, e.Unit),
handler.NewCol(QuotaNotificationColumnID, notification.ID),
}
statements = append(statements, crdb.AddUpsertStatement(
notificationConflictColumns,
append(notificationConflictColumns,
handler.NewCol(QuotaNotificationColumnCallURL, notification.CallURL),
handler.NewCol(QuotaNotificationColumnPercent, notification.Percent),
handler.NewCol(QuotaNotificationColumnRepeat, notification.Repeat),
},
),
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
)
))
}
return crdb.NewMultiStatement(e, createStatements...), nil
return crdb.NewMultiStatement(e, statements...), nil
}
func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*handler.Statement, error) {
@@ -207,6 +234,10 @@ func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*h
handler.NewCond(QuotaNotificationColumnID, e.ID),
},
crdb.WithTableSuffix(quotaNotificationsTableSuffix),
// The notification could have been removed in the meantime
crdb.WithIgnoreExecErr(func(err error) bool {
return errors.Is(err, sql.ErrNoRows)
}),
), nil
}
@@ -279,7 +310,7 @@ func (q *quotaProjection) IncrementUsage(ctx context.Context, unit quota.Unit, i
instanceID, unit, periodStart, count,
).Scan(&sum)
if err != nil {
return 0, errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit)
return 0, zitadel_errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit)
}
return sum, err
}

View File

@@ -17,6 +17,7 @@ const (
UniqueQuotaNameType = "quota_units"
eventTypePrefix = eventstore.EventType("quota.")
AddedEventType = eventTypePrefix + "added"
SetEventType = eventTypePrefix + "set"
NotifiedEventType = eventTypePrefix + "notified"
NotificationDueEventType = eventTypePrefix + "notificationdue"
RemovedEventType = eventTypePrefix + "removed"
@@ -43,65 +44,87 @@ func NewRemoveQuotaNameUniqueConstraint(unit Unit) *eventstore.EventUniqueConstr
)
}
type AddedEvent struct {
// SetEvent describes that a quota is added or modified and contains only changed properties
type SetEvent struct {
eventstore.BaseEvent `json:"-"`
Unit Unit `json:"unit"`
From time.Time `json:"from"`
ResetInterval time.Duration `json:"interval,omitempty"`
Amount uint64 `json:"amount"`
Limit bool `json:"limit"`
Notifications []*AddedEventNotification `json:"notifications,omitempty"`
Unit Unit `json:"unit"`
From *time.Time `json:"from,omitempty"`
ResetInterval *time.Duration `json:"interval,omitempty"`
Amount *uint64 `json:"amount,omitempty"`
Limit *bool `json:"limit,omitempty"`
Notifications []*SetEventNotification `json:"notifications,omitempty"`
}
type AddedEventNotification struct {
type SetEventNotification struct {
ID string `json:"id"`
Percent uint16 `json:"percent"`
Repeat bool `json:"repeat,omitempty"`
Repeat bool `json:"repeat"`
CallURL string `json:"callUrl"`
}
func (e *AddedEvent) Data() interface{} {
func (e *SetEvent) Data() interface{} {
return e
}
func (e *AddedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
return []*eventstore.EventUniqueConstraint{NewAddQuotaUnitUniqueConstraint(e.Unit)}
func (e *SetEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
return nil
}
func NewAddedEvent(
ctx context.Context,
aggregate *eventstore.Aggregate,
func NewSetEvent(
base *eventstore.BaseEvent,
unit Unit,
from time.Time,
resetInterval time.Duration,
amount uint64,
limit bool,
notifications []*AddedEventNotification,
) *AddedEvent {
return &AddedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
aggregate,
AddedEventType,
),
Unit: unit,
From: from,
ResetInterval: resetInterval,
Amount: amount,
Limit: limit,
Notifications: notifications,
changes []QuotaChange,
) *SetEvent {
changedEvent := &SetEvent{
BaseEvent: *base,
Unit: unit,
}
for _, change := range changes {
change(changedEvent)
}
return changedEvent
}
type QuotaChange func(*SetEvent)
func ChangeAmount(amount uint64) QuotaChange {
return func(e *SetEvent) {
e.Amount = &amount
}
}
func AddedEventMapper(event *repository.Event) (eventstore.Event, error) {
e := &AddedEvent{
func ChangeLimit(limit bool) QuotaChange {
return func(e *SetEvent) {
e.Limit = &limit
}
}
func ChangeFrom(from time.Time) QuotaChange {
return func(event *SetEvent) {
event.From = &from
}
}
func ChangeResetInterval(interval time.Duration) QuotaChange {
return func(event *SetEvent) {
event.ResetInterval = &interval
}
}
func ChangeNotifications(notifications []*SetEventNotification) QuotaChange {
return func(event *SetEvent) {
event.Notifications = notifications
}
}
func SetEventMapper(event *repository.Event) (eventstore.Event, error) {
e := &SetEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
err := json.Unmarshal(event.Data, e)
if err != nil {
return nil, errors.ThrowInternal(err, "QUOTA-4n8vs", "unable to unmarshal quota added")
return nil, errors.ThrowInternal(err, "QUOTA-kmIpI", "unable to unmarshal quota set")
}
return e, nil

View File

@@ -5,7 +5,11 @@ import (
)
func RegisterEventMappers(es *eventstore.Eventstore) {
es.RegisterFilterEventMapper(AggregateType, AddedEventType, AddedEventMapper).
// AddedEventType is not emitted anymore.
// For ease of use, old events are directly mapped to SetEvent.
// This works, because the data structures are compatible.
es.RegisterFilterEventMapper(AggregateType, AddedEventType, SetEventMapper).
RegisterFilterEventMapper(AggregateType, SetEventType, SetEventMapper).
RegisterFilterEventMapper(AggregateType, RemovedEventType, RemovedEventMapper).
RegisterFilterEventMapper(AggregateType, NotificationDueEventType, NotificationDueEventMapper).
RegisterFilterEventMapper(AggregateType, NotifiedEventType, NotifiedEventMapper)

View File

@@ -361,6 +361,8 @@ service SystemService {
}
// Creates a new quota
// Returns an error if the quota already exists for the specified unit
// Deprecated: use SetQuota instead
rpc AddQuota(AddQuotaRequest) returns (AddQuotaResponse) {
option (google.api.http) = {
post: "/instances/{instance_id}/quotas"
@@ -372,6 +374,19 @@ service SystemService {
};
}
// Sets quota configuration properties
// Creates a new quota if it doesn't exist for the specified unit
rpc SetQuota(SetQuotaRequest) returns (SetQuotaResponse) {
option (google.api.http) = {
put: "/instances/{instance_id}/quotas"
body: "*"
};
option (zitadel.v1.auth_option) = {
permission: "authenticated";
};
}
// Removes a quota
rpc RemoveQuota(RemoveQuotaRequest) returns (RemoveQuotaResponse) {
option (google.api.http) = {
@@ -598,6 +613,55 @@ message AddQuotaResponse {
zitadel.v1.ObjectDetails details = 1;
}
message SetQuotaRequest {
string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}];
// the unit a quota should be imposed on
zitadel.quota.v1.Unit unit = 2 [
(validate.rules).enum = {defined_only: true, not_in: [0]},
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "the unit a quota should be imposed on";
}
];
// the starting time from which the current quota period is calculated from. This is relevant for querying the current usage.
google.protobuf.Timestamp from = 3 [
(validate.rules).timestamp.required = true,
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
example: "\"2019-04-01T08:45:00.000000Z\"";
description: "the starting time from which the current quota period is calculated from. This is relevant for querying the current usage.";
}
];
// the quota periods duration
google.protobuf.Duration reset_interval = 4 [
(validate.rules).duration.required = true,
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "the quota periods duration";
}
];
// the quota amount of units
uint64 amount = 5 [
(validate.rules).uint64.gt = 0,
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "the quota amount of units";
}
];
// whether ZITADEL should block further usage when the configured amount is used
bool limit = 6 [
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "whether ZITADEL should block further usage when the configured amount is used";
}
];
// the handlers, ZITADEL executes when certain quota percentages are reached
repeated zitadel.quota.v1.Notification notifications = 7 [
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "the handlers, ZITADEL executes when certain quota percentages are reached";
}
];
}
message SetQuotaResponse {
zitadel.v1.ObjectDetails details = 1;
}
message RemoveQuotaRequest {
string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}];
zitadel.quota.v1.Unit unit = 2;