diff --git a/internal/api/grpc/system/quota.go b/internal/api/grpc/system/quota.go index 745c449b011..2cb57fc7106 100644 --- a/internal/api/grpc/system/quota.go +++ b/internal/api/grpc/system/quota.go @@ -5,7 +5,6 @@ import ( "github.com/zitadel/zitadel/internal/api/grpc/object" "github.com/zitadel/zitadel/pkg/grpc/system" - system_pb "github.com/zitadel/zitadel/pkg/grpc/system" ) func (s *Server) AddQuota(ctx context.Context, req *system.AddQuotaRequest) (*system.AddQuotaResponse, error) { @@ -16,7 +15,20 @@ func (s *Server) AddQuota(ctx context.Context, req *system.AddQuotaRequest) (*sy if err != nil { return nil, err } - return &system_pb.AddQuotaResponse{ + return &system.AddQuotaResponse{ + Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), + }, nil +} + +func (s *Server) SetQuota(ctx context.Context, req *system.SetQuotaRequest) (*system.SetQuotaResponse, error) { + details, err := s.command.SetQuota( + ctx, + instanceQuotaPbToCommand(req), + ) + if err != nil { + return nil, err + } + return &system.SetQuotaResponse{ Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), }, nil } @@ -26,7 +38,7 @@ func (s *Server) RemoveQuota(ctx context.Context, req *system.RemoveQuotaRequest if err != nil { return nil, err } - return &system_pb.RemoveQuotaResponse{ + return &system.RemoveQuotaResponse{ Details: object.ChangeToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), }, nil } diff --git a/internal/api/grpc/system/quota_converter.go b/internal/api/grpc/system/quota_converter.go index 5be57b43d54..1945ff0951b 100644 --- a/internal/api/grpc/system/quota_converter.go +++ b/internal/api/grpc/system/quota_converter.go @@ -3,17 +3,27 @@ package system import ( "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/pkg/grpc/quota" - "github.com/zitadel/zitadel/pkg/grpc/system" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) -func instanceQuotaPbToCommand(req *system.AddQuotaRequest) *command.AddQuota { - return &command.AddQuota{ - Unit: instanceQuotaUnitPbToCommand(req.Unit), - From: req.From.AsTime(), - ResetInterval: req.ResetInterval.AsDuration(), - Amount: req.Amount, - Limit: req.Limit, - Notifications: instanceQuotaNotificationsPbToCommand(req.Notifications), +type setQuotaRequest interface { + GetUnit() quota.Unit + GetFrom() *timestamppb.Timestamp + GetResetInterval() *durationpb.Duration + GetAmount() uint64 + GetLimit() bool + GetNotifications() []*quota.Notification +} + +func instanceQuotaPbToCommand(req setQuotaRequest) *command.SetQuota { + return &command.SetQuota{ + Unit: instanceQuotaUnitPbToCommand(req.GetUnit()), + From: req.GetFrom().AsTime(), + ResetInterval: req.GetResetInterval().AsDuration(), + Amount: req.GetAmount(), + Limit: req.GetLimit(), + Notifications: instanceQuotaNotificationsPbToCommand(req.GetNotifications()), } } diff --git a/internal/command/instance.go b/internal/command/instance.go index 18c13b6d408..1cee71b8d7e 100644 --- a/internal/command/instance.go +++ b/internal/command/instance.go @@ -110,7 +110,7 @@ type InstanceSetup struct { RefreshTokenExpiration time.Duration } Quotas *struct { - Items []*AddQuota + Items []*SetQuota } } @@ -283,7 +283,7 @@ func (c *Commands) SetUpInstance(ctx context.Context, setup *InstanceSetup) (str if err != nil { return "", "", nil, nil, err } - validations = append(validations, c.AddQuotaCommand(quota.NewAggregate(quotaId, instanceID), q)) + validations = append(validations, c.SetQuotaCommand(quota.NewAggregate(quotaId, instanceID), nil, q)) } } diff --git a/internal/command/quota.go b/internal/command/quota.go index b787929104b..360df1ec226 100644 --- a/internal/command/quota.go +++ b/internal/command/quota.go @@ -2,6 +2,7 @@ package command import ( "context" + "fmt" "net/url" "time" @@ -10,7 +11,6 @@ import ( "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/repository/quota" ) @@ -32,17 +32,17 @@ func (q QuotaUnit) Enum() quota.Unit { } } +// AddQuota returns and error if the quota already exists. +// AddQuota is deprecated. Use SetQuota instead. func (c *Commands) AddQuota( ctx context.Context, - q *AddQuota, + q *SetQuota, ) (*domain.ObjectDetails, error) { instanceId := authz.GetInstance(ctx).InstanceID() - wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, q.Unit.Enum()) if err != nil { return nil, err } - if wm.active { return nil, errors.ThrowAlreadyExists(nil, "COMMAND-WDfFf", "Errors.Quota.AlreadyExists") } @@ -50,7 +50,42 @@ func (c *Commands) AddQuota( if err != nil { return nil, err } - cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.AddQuotaCommand(quota.NewAggregate(aggregateId, instanceId), q)) + cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.SetQuotaCommand(quota.NewAggregate(aggregateId, instanceId), wm, q)) + if err != nil { + return nil, err + } + events, err := c.eventstore.Push(ctx, cmds...) + if err != nil { + return nil, err + } + err = AppendAndReduce(wm, events...) + if err != nil { + return nil, err + } + return writeModelToObjectDetails(&wm.WriteModel), nil +} + +// SetQuota creates a new quota or updates an existing one. +func (c *Commands) SetQuota( + ctx context.Context, + q *SetQuota, +) (*domain.ObjectDetails, error) { + instanceId := authz.GetInstance(ctx).InstanceID() + wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, q.Unit.Enum()) + if err != nil { + return nil, err + } + if wm.AggregateID == "" && wm.active { + return nil, errors.ThrowInternal(fmt.Errorf("quota is active but has no id"), "COMMAND-3M9ds", "Errors.Internal") + } + aggregateId := wm.AggregateID + if aggregateId == "" { + aggregateId, err = c.idGenerator.Next() + if err != nil { + return nil, err + } + } + cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.SetQuotaCommand(quota.NewAggregate(aggregateId, instanceId), wm, q)) if err != nil { return nil, err } @@ -67,21 +102,15 @@ func (c *Commands) AddQuota( func (c *Commands) RemoveQuota(ctx context.Context, unit QuotaUnit) (*domain.ObjectDetails, error) { instanceId := authz.GetInstance(ctx).InstanceID() - wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, unit.Enum()) if err != nil { return nil, err } - if !wm.active { return nil, errors.ThrowNotFound(nil, "COMMAND-WDfFf", "Errors.Quota.NotFound") } - aggregate := quota.NewAggregate(wm.AggregateID, instanceId) - - events := []eventstore.Command{ - quota.NewRemovedEvent(ctx, &aggregate.Aggregate, unit.Enum()), - } + events := []eventstore.Command{quota.NewRemovedEvent(ctx, &aggregate.Aggregate, unit.Enum())} pushedEvents, err := c.eventstore.Push(ctx, events...) if err != nil { return nil, err @@ -104,6 +133,16 @@ type QuotaNotification struct { CallURL string } +// SetQuota configures a quota and activates it if it isn't active already +type SetQuota struct { + Unit QuotaUnit `json:"unit"` + From time.Time `json:"from"` + ResetInterval time.Duration `json:"ResetInterval,omitempty"` + Amount uint64 `json:"Amount,omitempty"` + Limit bool `json:"Limit,omitempty"` + Notifications QuotaNotifications `json:"Notifications,omitempty"` +} + type QuotaNotifications []*QuotaNotification func (q *QuotaNotification) validate() error { @@ -111,94 +150,51 @@ func (q *QuotaNotification) validate() error { if err != nil { return errors.ThrowInvalidArgument(err, "QUOTA-bZ0Fj", "Errors.Quota.Invalid.CallURL") } - if !u.IsAbs() || u.Host == "" { return errors.ThrowInvalidArgument(nil, "QUOTA-HAYmN", "Errors.Quota.Invalid.CallURL") } - if q.Percent < 1 { return errors.ThrowInvalidArgument(nil, "QUOTA-pBfjq", "Errors.Quota.Invalid.Percent") } return nil } -func (q *QuotaNotifications) toAddedEventNotifications(idGenerator id.Generator) ([]*quota.AddedEventNotification, error) { - if q == nil { - return nil, nil - } - - notifications := make([]*quota.AddedEventNotification, len(*q)) - for idx, notification := range *q { - - id, err := idGenerator.Next() - if err != nil { - return nil, err - } - - notifications[idx] = "a.AddedEventNotification{ - ID: id, - Percent: notification.Percent, - Repeat: notification.Repeat, - CallURL: notification.CallURL, - } - } - - return notifications, nil -} - -type AddQuota struct { - Unit QuotaUnit - From time.Time - ResetInterval time.Duration - Amount uint64 - Limit bool - Notifications QuotaNotifications -} - -func (q *AddQuota) validate() error { +func (q *SetQuota) validate() error { for _, notification := range q.Notifications { if err := notification.validate(); err != nil { return err } } - if q.Unit.Enum() == quota.Unimplemented { return errors.ThrowInvalidArgument(nil, "QUOTA-OTeSh", "Errors.Quota.Invalid.Unimplemented") } - - if q.Amount < 1 { + if q.Amount < 0 { return errors.ThrowInvalidArgument(nil, "QUOTA-hOKSJ", "Errors.Quota.Invalid.Amount") } - if q.ResetInterval < time.Minute { return errors.ThrowInvalidArgument(nil, "QUOTA-R5otd", "Errors.Quota.Invalid.ResetInterval") } return nil } -func (c *Commands) AddQuotaCommand(a *quota.Aggregate, q *AddQuota) preparation.Validation { +func (c *Commands) SetQuotaCommand(a *quota.Aggregate, wm *quotaWriteModel, q *SetQuota) preparation.Validation { return func() (preparation.CreateCommands, error) { - if err := q.validate(); err != nil { return nil, err } - return func(ctx context.Context, filter preparation.FilterToQueryReducer) (cmd []eventstore.Command, err error) { - - notifications, err := q.Notifications.toAddedEventNotifications(c.idGenerator) - if err != nil { + changes, err := wm.NewChanges(c.idGenerator, q.Amount, q.From, q.ResetInterval, q.Limit, q.Notifications) + if len(changes) == 0 { return nil, err } - - return []eventstore.Command{quota.NewAddedEvent( - ctx, - &a.Aggregate, + return []eventstore.Command{quota.NewSetEvent( + eventstore.NewBaseEventForPush( + ctx, + &a.Aggregate, + quota.SetEventType, + ), q.Unit.Enum(), - q.From, - q.ResetInterval, - q.Amount, - q.Limit, - notifications, + changes, )}, err }, nil diff --git a/internal/command/quota_model.go b/internal/command/quota_model.go index 6e05b4df0b4..18c840e164a 100644 --- a/internal/command/quota_model.go +++ b/internal/command/quota_model.go @@ -1,14 +1,27 @@ package command import ( + "slices" + "strings" + "time" + + "github.com/zitadel/zitadel/internal/errors" + + "github.com/zitadel/zitadel/internal/id" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/repository/quota" ) type quotaWriteModel struct { eventstore.WriteModel - unit quota.Unit - active bool + active bool + unit quota.Unit + from time.Time + resetInterval time.Duration + amount uint64 + limit bool + notifications []*quota.SetEventNotification } // newQuotaWriteModel aggregateId is filled by reducing unit matching events @@ -30,6 +43,7 @@ func (wm *quotaWriteModel) Query() *eventstore.SearchQueryBuilder { AggregateTypes(quota.AggregateType). EventTypes( quota.AddedEventType, + quota.SetEventType, quota.RemovedEventType, ).EventData(map[string]interface{}{"unit": wm.unit}) @@ -38,15 +52,137 @@ func (wm *quotaWriteModel) Query() *eventstore.SearchQueryBuilder { func (wm *quotaWriteModel) Reduce() error { for _, event := range wm.Events { + wm.ChangeDate = event.CreationDate() switch e := event.(type) { - case *quota.AddedEvent: - wm.AggregateID = e.Aggregate().ID - wm.ChangeDate = e.CreationDate() + case *quota.SetEvent: wm.active = true + wm.AggregateID = event.Aggregate().ID + if e.Amount != nil { + wm.amount = *e.Amount + } + if e.From != nil { + wm.from = *e.From + } + if e.Limit != nil { + wm.limit = *e.Limit + } + if e.ResetInterval != nil { + wm.resetInterval = *e.ResetInterval + } + if e.Notifications != nil { + wm.notifications = e.Notifications + } case *quota.RemovedEvent: - wm.AggregateID = e.Aggregate().ID wm.active = false + wm.AggregateID = "" } } return wm.WriteModel.Reduce() } + +// NewChanges returns all changes that need to be applied to the aggregate. +// If wm is nil, all properties are set. +func (wm *quotaWriteModel) NewChanges( + idGenerator id.Generator, + amount uint64, + from time.Time, + resetInterval time.Duration, + limit bool, + notifications QuotaNotifications, +) (changes []quota.QuotaChange, err error) { + setEventNotifications, err := notifications.newSetEventNotifications(idGenerator) + if err != nil { + return nil, err + } + if wm == nil || wm.active == false { + return []quota.QuotaChange{ + quota.ChangeAmount(amount), + quota.ChangeFrom(from), + quota.ChangeResetInterval(resetInterval), + quota.ChangeLimit(limit), + quota.ChangeNotifications(setEventNotifications), + }, nil + } + changes = make([]quota.QuotaChange, 0, 5) + if wm.amount != amount { + changes = append(changes, quota.ChangeAmount(amount)) + } + if wm.from != from { + changes = append(changes, quota.ChangeFrom(from)) + } + if wm.resetInterval != resetInterval { + changes = append(changes, quota.ChangeResetInterval(resetInterval)) + } + if wm.limit != limit { + changes = append(changes, quota.ChangeLimit(limit)) + } + if len(setEventNotifications) != len(wm.notifications) { + changes = append(changes, quota.ChangeNotifications(setEventNotifications)) + return changes, nil + } + replaceIDs(wm.notifications, setEventNotifications) + // All IDs are passed and the number of notifications didn't change. + // Now we check if the properties changed. + err = sortSetEventNotifications(setEventNotifications) + if err != nil { + return nil, err + } + // We ignore the sorting error for the existing notifications, because this is system state, not user input. + // If the sorting fails, the notifications will be cleaned up and triggered again. + _ = sortSetEventNotifications(wm.notifications) + for i, notification := range setEventNotifications { + if notification.ID != wm.notifications[i].ID || + notification.CallURL != wm.notifications[i].CallURL || + notification.Percent != wm.notifications[i].Percent || + notification.Repeat != wm.notifications[i].Repeat { + changes = append(changes, quota.ChangeNotifications(setEventNotifications)) + return changes, nil + } + } + return changes, err +} + +// newSetEventNotifications returns quota.SetEventNotification elements with generated IDs. +func (q *QuotaNotifications) newSetEventNotifications(idGenerator id.Generator) (setNotifications []*quota.SetEventNotification, err error) { + if q == nil { + return nil, nil + } + notifications := make([]*quota.SetEventNotification, len(*q)) + for idx, notification := range *q { + notifications[idx] = "a.SetEventNotification{ + Percent: notification.Percent, + Repeat: notification.Repeat, + CallURL: notification.CallURL, + } + notifications[idx].ID, err = idGenerator.Next() + if err != nil { + return nil, err + } + } + return notifications, nil +} + +func replaceIDs(srcs, dsts []*quota.SetEventNotification) { + for _, dst := range dsts { + for _, src := range srcs { + if dst.CallURL == src.CallURL && + dst.Percent == src.Percent && + dst.Repeat == src.Repeat { + dst.ID = src.ID + break + } + } + } +} + +func sortSetEventNotifications(notifications []*quota.SetEventNotification) (err error) { + slices.SortFunc(notifications, func(i, j *quota.SetEventNotification) int { + comp := strings.Compare(i.ID, j.ID) + if comp == 0 { + // TODO: translate + err = errors.ThrowPreconditionFailed(nil, "EVENT-3M9fs", "Errors.Quota.Notifications.Duplicate") + } + return comp + }) + return err +} diff --git a/internal/command/quota_test.go b/internal/command/quota_test.go index a3f42b5917c..5c323f53ea4 100644 --- a/internal/command/quota_test.go +++ b/internal/command/quota_test.go @@ -44,7 +44,7 @@ func TestQuota_AddQuota(t *testing.T) { t, expectFilter( eventFromEventPusher( - quota.NewAddedEvent(context.Background(), + quota.NewSetEvent(context.Background(), "a.NewAggregate("quota1", "INSTANCE").Aggregate, QuotaRequestsAllAuthenticated.Enum(), time.Now(), @@ -108,7 +108,7 @@ func TestQuota_AddQuota(t *testing.T) { []*repository.Event{ eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), + quota.NewSetEvent(context.Background(), "a.NewAggregate("quota1", "INSTANCE").Aggregate, QuotaRequestsAllAuthenticated.Enum(), time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), @@ -149,7 +149,7 @@ func TestQuota_AddQuota(t *testing.T) { expectFilter( eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), + quota.NewSetEvent(context.Background(), "a.NewAggregate("quota1", "INSTANCE").Aggregate, QuotaRequestsAllAuthenticated.Enum(), time.Now(), @@ -171,7 +171,7 @@ func TestQuota_AddQuota(t *testing.T) { []*repository.Event{ eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), + quota.NewSetEvent(context.Background(), "a.NewAggregate("quota1", "INSTANCE").Aggregate, QuotaRequestsAllAuthenticated.Enum(), time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), @@ -214,7 +214,7 @@ func TestQuota_AddQuota(t *testing.T) { []*repository.Event{ eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), + quota.NewSetEvent(context.Background(), "a.NewAggregate("quota1", "INSTANCE").Aggregate, QuotaRequestsAllAuthenticated.Enum(), time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), @@ -325,7 +325,7 @@ func TestQuota_RemoveQuota(t *testing.T) { expectFilter( eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), + quota.NewSetEvent(context.Background(), "a.NewAggregate("quota1", "INSTANCE").Aggregate, QuotaRequestsAllAuthenticated.Enum(), time.Now(), @@ -363,7 +363,7 @@ func TestQuota_RemoveQuota(t *testing.T) { expectFilter( eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), + quota.NewSetEvent(context.Background(), "a.NewAggregate("quota1", "INSTANCE").Aggregate, QuotaRequestsAllAuthenticated.Enum(), time.Now(), diff --git a/internal/eventstore/handler/crdb/statement.go b/internal/eventstore/handler/crdb/statement.go index efcc4952e0e..5c2ca7ad4eb 100644 --- a/internal/eventstore/handler/crdb/statement.go +++ b/internal/eventstore/handler/crdb/statement.go @@ -4,6 +4,8 @@ import ( "strconv" "strings" + "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/database" caos_errs "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" @@ -14,8 +16,9 @@ type execOption func(*execConfig) type execConfig struct { tableName string - args []interface{} - err error + args []interface{} + err error + ignoreExecErr func(error) bool } func WithTableSuffix(name string) func(*execConfig) { @@ -24,6 +27,12 @@ func WithTableSuffix(name string) func(*execConfig) { } } +func WithIgnoreExecErr(ignore func(error) bool) func(*execConfig) { + return func(o *execConfig) { + o.ignoreExecErr = ignore + } +} + func NewCreateStatement(event eventstore.Event, values []handler.Column, opts ...execOption) *handler.Statement { cols, params, args := columnsToQuery(values) columnNames := strings.Join(cols, ", ") @@ -280,30 +289,47 @@ func NewCopyCol(column, from string) handler.Column { } func NewLessThanCond(column string, value interface{}) handler.Condition { - return func(param string) (string, interface{}) { - return column + " < " + param, value + return func() ([]interface{}, func(params []string) string) { + return []interface{}{value}, func(params []string) string { + return column + " < " + params[0] + } } } func NewIsNullCond(column string) handler.Condition { - return func(param string) (string, interface{}) { - return column + " IS NULL", nil + return func() ([]interface{}, func(params []string) string) { + return nil, func([]string) string { + return column + " IS NULL" + } } } // NewTextArrayContainsCond returns a handler.Condition that checks if the column that stores an array of text contains the given value func NewTextArrayContainsCond(column string, value string) handler.Condition { - return func(param string) (string, interface{}) { - return column + " @> " + param, database.StringArray{value} + return func() ([]interface{}, func(params []string) string) { + return []interface{}{database.StringArray{value}}, func(params []string) string { + return column + " @> " + params[0] + } } } -// Not is a function and not a method, so that calling it is well readable +// NewInCond returns an IN condition that matches multiple values +func NewInCond(column string, values []interface{}) handler.Condition { + return func() ([]interface{}, func(params []string) string) { + return values, func(params []string) string { + return column + " IN ( " + strings.Join(params, ", ") + " )" + } + } +} + +// Not negates a condition // For example conditions := []handler.Condition{ Not(NewTextArrayContainsCond())} func Not(condition handler.Condition) handler.Condition { - return func(param string) (string, interface{}) { - cond, value := condition(param) - return "NOT (" + cond + ")", value + return func() ([]interface{}, func(params []string) string) { + values, condFunc := condition() + return values, func(params []string) string { + return "NOT (" + condFunc(params) + ")" + } } } @@ -407,13 +433,15 @@ func columnsToQuery(cols []handler.Column) (names []string, parameters []string, func conditionsToWhere(conditions []handler.Condition, paramOffset int) (wheres []string, values []interface{}) { wheres = make([]string, len(conditions)) - values = make([]interface{}, 0, len(conditions)) - for i, conditionFunc := range conditions { - condition, value := conditionFunc("$" + strconv.Itoa(i+1+paramOffset)) - wheres[i] = "(" + condition + ")" - if value != nil { - values = append(values, value) + for i, condition := range conditions { + conditionValues, conditionFunc := condition() + values = append(values, conditionValues...) + params := make([]string, len(conditionValues)) + for j := range conditionValues { + paramOffset++ + params[j] = "$" + strconv.Itoa(paramOffset) } + wheres[i] = "(" + conditionFunc(params) + ")" } return wheres, values } @@ -436,7 +464,12 @@ func exec(config execConfig, q query, opts []execOption) Exec { } if _, err := ex.Exec(q(config), config.args...); err != nil { - return caos_errs.ThrowInternal(err, "CRDB-pKtsr", "exec failed") + execErr := caos_errs.ThrowInternal(err, "CRDB-pKtsr", "exec failed") + if config.ignoreExecErr != nil && config.ignoreExecErr(err) { + logging.Debugf("CRDB-4M9fs", "exec failed, but ignored: %v", err) + return nil + } + return execErr } return nil diff --git a/internal/eventstore/handler/statement.go b/internal/eventstore/handler/statement.go index 1eecd78f23d..d1ab478351b 100644 --- a/internal/eventstore/handler/statement.go +++ b/internal/eventstore/handler/statement.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "errors" + "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -61,13 +62,15 @@ func NewJSONCol(name string, value interface{}) Column { return NewCol(name, marshalled) } -type Condition func(param string) (string, interface{}) +type Condition func() ([]interface{}, func(params []string) string) type NamespacedCondition func(namespace string) Condition func NewCond(name string, value interface{}) Condition { - return func(param string) (string, interface{}) { - return name + " = " + param, value + return func() ([]interface{}, func(params []string) string) { + return []interface{}{value}, func(params []string) string { + return name + " = " + params[0] + } } } diff --git a/internal/query/projection/quota.go b/internal/query/projection/quota.go index fd6e12266f2..191f5cf43af 100644 --- a/internal/query/projection/quota.go +++ b/internal/query/projection/quota.go @@ -2,10 +2,13 @@ package projection import ( "context" + "database/sql" + "errors" "time" + zitadel_errors "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" @@ -65,10 +68,10 @@ func newQuotaProjection(ctx context.Context, config crdb.StatementHandlerConfig) crdb.NewColumn(QuotaColumnID, crdb.ColumnTypeText), crdb.NewColumn(QuotaColumnInstanceID, crdb.ColumnTypeText), crdb.NewColumn(QuotaColumnUnit, crdb.ColumnTypeEnum), - crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64), - crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp), - crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval), - crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool), + crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64, crdb.Nullable()), + crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp, crdb.Nullable()), + crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval, crdb.Nullable()), + crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool, crdb.Nullable()), }, crdb.NewPrimaryKey(QuotaColumnInstanceID, QuotaColumnUnit), ), @@ -118,31 +121,20 @@ func (q *quotaProjection) reducers() []handler.AggregateReducer { EventRedusers: []handler.EventReducer{ { Event: quota.AddedEventType, - Reduce: q.reduceQuotaAdded, + Reduce: q.reduceQuotaSet, + }, + { + Event: quota.SetEventType, + Reduce: q.reduceQuotaSet, }, - }, - }, - { - Aggregate: quota.AggregateType, - EventRedusers: []handler.EventReducer{ { Event: quota.RemovedEventType, Reduce: q.reduceQuotaRemoved, }, - }, - }, - { - Aggregate: quota.AggregateType, - EventRedusers: []handler.EventReducer{ { Event: quota.NotificationDueEventType, Reduce: q.reduceQuotaNotificationDue, }, - }, - }, - { - Aggregate: quota.AggregateType, - EventRedusers: []handler.EventReducer{ { Event: quota.NotifiedEventType, Reduce: q.reduceQuotaNotified, @@ -156,39 +148,74 @@ func (q *quotaProjection) reduceQuotaNotified(event eventstore.Event) (*handler. return crdb.NewNoOpStatement(event), nil } -func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Statement, error) { - e, err := assertEvent[*quota.AddedEvent](event) +func (q *quotaProjection) reduceQuotaSet(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*quota.SetEvent](event) if err != nil { return nil, err } - createStatements := make([]func(e eventstore.Event) crdb.Exec, len(e.Notifications)+1) - createStatements[0] = crdb.AddCreateStatement( - []handler.Column{ - handler.NewCol(QuotaColumnID, e.Aggregate().ID), - handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID), - handler.NewCol(QuotaColumnUnit, e.Unit), - handler.NewCol(QuotaColumnAmount, e.Amount), - handler.NewCol(QuotaColumnFrom, e.From), - handler.NewCol(QuotaColumnInterval, e.ResetInterval), - handler.NewCol(QuotaColumnLimit, e.Limit), - }) + statements := make([]func(e eventstore.Event) crdb.Exec, 0, len(e.Notifications)+2) + + // 1. Insert or update quota if the event has not only notification changes + quotaConflictColumns := []handler.Column{ + handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(QuotaColumnUnit, e.Unit), + } + quotaUpdateCols := make([]handler.Column, 0, 4+1+len(quotaConflictColumns)) + if e.Limit != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnLimit, *e.Limit)) + } + if e.Amount != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnAmount, *e.Amount)) + } + if e.From != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnFrom, *e.From)) + } + if e.ResetInterval != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnInterval, *e.ResetInterval)) + } + if len(quotaUpdateCols) > 0 { + // TODO: Add the quota ID to the primary key in a migration? + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnID, e.Aggregate().ID)) + quotaUpdateCols = append(quotaUpdateCols, quotaConflictColumns...) + statements = append(statements, crdb.AddUpsertStatement(quotaConflictColumns, quotaUpdateCols)) + } + + // 2. Delete all notifications with unknown IDs + notificationIDs := make([]interface{}, len(e.Notifications)) + for i := range e.Notifications { + notificationIDs[i] = e.Notifications[i].ID + } + statements = append(statements, crdb.AddDeleteStatement( + []handler.Condition{ + handler.NewCond(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCond(QuotaNotificationColumnUnit, e.Unit), + + crdb.Not(crdb.NewInCond(QuotaNotificationColumnID, notificationIDs)), + }, + crdb.WithTableSuffix(quotaNotificationsTableSuffix), + )) + + // 3. Upsert notifications with the given IDs for i := range e.Notifications { notification := e.Notifications[i] - createStatements[i+1] = crdb.AddCreateStatement( - []handler.Column{ - handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID), - handler.NewCol(QuotaNotificationColumnUnit, e.Unit), - handler.NewCol(QuotaNotificationColumnID, notification.ID), + notificationConflictColumns := []handler.Column{ + handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(QuotaNotificationColumnUnit, e.Unit), + handler.NewCol(QuotaNotificationColumnID, notification.ID), + } + statements = append(statements, crdb.AddUpsertStatement( + notificationConflictColumns, + append(notificationConflictColumns, handler.NewCol(QuotaNotificationColumnCallURL, notification.CallURL), handler.NewCol(QuotaNotificationColumnPercent, notification.Percent), handler.NewCol(QuotaNotificationColumnRepeat, notification.Repeat), - }, + ), crdb.WithTableSuffix(quotaNotificationsTableSuffix), - ) + )) } - return crdb.NewMultiStatement(e, createStatements...), nil + return crdb.NewMultiStatement(e, statements...), nil } func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*handler.Statement, error) { @@ -207,6 +234,10 @@ func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*h handler.NewCond(QuotaNotificationColumnID, e.ID), }, crdb.WithTableSuffix(quotaNotificationsTableSuffix), + // The notification could have been removed in the meantime + crdb.WithIgnoreExecErr(func(err error) bool { + return errors.Is(err, sql.ErrNoRows) + }), ), nil } @@ -279,7 +310,7 @@ func (q *quotaProjection) IncrementUsage(ctx context.Context, unit quota.Unit, i instanceID, unit, periodStart, count, ).Scan(&sum) if err != nil { - return 0, errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit) + return 0, zitadel_errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit) } return sum, err } diff --git a/internal/repository/quota/events.go b/internal/repository/quota/events.go index 5bdb8f0c44f..8b14688386f 100644 --- a/internal/repository/quota/events.go +++ b/internal/repository/quota/events.go @@ -17,6 +17,7 @@ const ( UniqueQuotaNameType = "quota_units" eventTypePrefix = eventstore.EventType("quota.") AddedEventType = eventTypePrefix + "added" + SetEventType = eventTypePrefix + "set" NotifiedEventType = eventTypePrefix + "notified" NotificationDueEventType = eventTypePrefix + "notificationdue" RemovedEventType = eventTypePrefix + "removed" @@ -43,65 +44,87 @@ func NewRemoveQuotaNameUniqueConstraint(unit Unit) *eventstore.EventUniqueConstr ) } -type AddedEvent struct { +// SetEvent describes that a quota is added or modified and contains only changed properties +type SetEvent struct { eventstore.BaseEvent `json:"-"` - - Unit Unit `json:"unit"` - From time.Time `json:"from"` - ResetInterval time.Duration `json:"interval,omitempty"` - Amount uint64 `json:"amount"` - Limit bool `json:"limit"` - Notifications []*AddedEventNotification `json:"notifications,omitempty"` + Unit Unit `json:"unit"` + From *time.Time `json:"from,omitempty"` + ResetInterval *time.Duration `json:"interval,omitempty"` + Amount *uint64 `json:"amount,omitempty"` + Limit *bool `json:"limit,omitempty"` + Notifications []*SetEventNotification `json:"notifications,omitempty"` } -type AddedEventNotification struct { +type SetEventNotification struct { ID string `json:"id"` Percent uint16 `json:"percent"` - Repeat bool `json:"repeat,omitempty"` + Repeat bool `json:"repeat"` CallURL string `json:"callUrl"` } -func (e *AddedEvent) Data() interface{} { +func (e *SetEvent) Data() interface{} { return e } -func (e *AddedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { - return []*eventstore.EventUniqueConstraint{NewAddQuotaUnitUniqueConstraint(e.Unit)} +func (e *SetEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { + return nil } -func NewAddedEvent( - ctx context.Context, - aggregate *eventstore.Aggregate, +func NewSetEvent( + base *eventstore.BaseEvent, unit Unit, - from time.Time, - resetInterval time.Duration, - amount uint64, - limit bool, - notifications []*AddedEventNotification, -) *AddedEvent { - return &AddedEvent{ - BaseEvent: *eventstore.NewBaseEventForPush( - ctx, - aggregate, - AddedEventType, - ), - Unit: unit, - From: from, - ResetInterval: resetInterval, - Amount: amount, - Limit: limit, - Notifications: notifications, + changes []QuotaChange, +) *SetEvent { + changedEvent := &SetEvent{ + BaseEvent: *base, + Unit: unit, + } + for _, change := range changes { + change(changedEvent) + } + return changedEvent +} + +type QuotaChange func(*SetEvent) + +func ChangeAmount(amount uint64) QuotaChange { + return func(e *SetEvent) { + e.Amount = &amount } } -func AddedEventMapper(event *repository.Event) (eventstore.Event, error) { - e := &AddedEvent{ +func ChangeLimit(limit bool) QuotaChange { + return func(e *SetEvent) { + e.Limit = &limit + } +} + +func ChangeFrom(from time.Time) QuotaChange { + return func(event *SetEvent) { + event.From = &from + } +} + +func ChangeResetInterval(interval time.Duration) QuotaChange { + return func(event *SetEvent) { + event.ResetInterval = &interval + } +} + +func ChangeNotifications(notifications []*SetEventNotification) QuotaChange { + return func(event *SetEvent) { + event.Notifications = notifications + } +} + +func SetEventMapper(event *repository.Event) (eventstore.Event, error) { + e := &SetEvent{ BaseEvent: *eventstore.BaseEventFromRepo(event), } err := json.Unmarshal(event.Data, e) if err != nil { - return nil, errors.ThrowInternal(err, "QUOTA-4n8vs", "unable to unmarshal quota added") + return nil, errors.ThrowInternal(err, "QUOTA-kmIpI", "unable to unmarshal quota set") } return e, nil diff --git a/internal/repository/quota/eventstore.go b/internal/repository/quota/eventstore.go index 2c87bb0658b..f651e58e2b9 100644 --- a/internal/repository/quota/eventstore.go +++ b/internal/repository/quota/eventstore.go @@ -5,7 +5,11 @@ import ( ) func RegisterEventMappers(es *eventstore.Eventstore) { - es.RegisterFilterEventMapper(AggregateType, AddedEventType, AddedEventMapper). + // AddedEventType is not emitted anymore. + // For ease of use, old events are directly mapped to SetEvent. + // This works, because the data structures are compatible. + es.RegisterFilterEventMapper(AggregateType, AddedEventType, SetEventMapper). + RegisterFilterEventMapper(AggregateType, SetEventType, SetEventMapper). RegisterFilterEventMapper(AggregateType, RemovedEventType, RemovedEventMapper). RegisterFilterEventMapper(AggregateType, NotificationDueEventType, NotificationDueEventMapper). RegisterFilterEventMapper(AggregateType, NotifiedEventType, NotifiedEventMapper) diff --git a/proto/zitadel/system.proto b/proto/zitadel/system.proto index e2e95dc9c59..b279c570d56 100644 --- a/proto/zitadel/system.proto +++ b/proto/zitadel/system.proto @@ -361,6 +361,8 @@ service SystemService { } // Creates a new quota + // Returns an error if the quota already exists for the specified unit + // Deprecated: use SetQuota instead rpc AddQuota(AddQuotaRequest) returns (AddQuotaResponse) { option (google.api.http) = { post: "/instances/{instance_id}/quotas" @@ -372,6 +374,19 @@ service SystemService { }; } + // Sets quota configuration properties + // Creates a new quota if it doesn't exist for the specified unit + rpc SetQuota(SetQuotaRequest) returns (SetQuotaResponse) { + option (google.api.http) = { + put: "/instances/{instance_id}/quotas" + body: "*" + }; + + option (zitadel.v1.auth_option) = { + permission: "authenticated"; + }; + } + // Removes a quota rpc RemoveQuota(RemoveQuotaRequest) returns (RemoveQuotaResponse) { option (google.api.http) = { @@ -598,6 +613,55 @@ message AddQuotaResponse { zitadel.v1.ObjectDetails details = 1; } +message SetQuotaRequest { + string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}]; + // the unit a quota should be imposed on + zitadel.quota.v1.Unit unit = 2 [ + (validate.rules).enum = {defined_only: true, not_in: [0]}, + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the unit a quota should be imposed on"; + } + ]; + // the starting time from which the current quota period is calculated from. This is relevant for querying the current usage. + google.protobuf.Timestamp from = 3 [ + (validate.rules).timestamp.required = true, + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + example: "\"2019-04-01T08:45:00.000000Z\""; + description: "the starting time from which the current quota period is calculated from. This is relevant for querying the current usage."; + } + ]; + // the quota periods duration + google.protobuf.Duration reset_interval = 4 [ + (validate.rules).duration.required = true, + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the quota periods duration"; + } + ]; + // the quota amount of units + uint64 amount = 5 [ + (validate.rules).uint64.gt = 0, + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the quota amount of units"; + } + ]; + // whether ZITADEL should block further usage when the configured amount is used + bool limit = 6 [ + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "whether ZITADEL should block further usage when the configured amount is used"; + } + ]; + // the handlers, ZITADEL executes when certain quota percentages are reached + repeated zitadel.quota.v1.Notification notifications = 7 [ + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the handlers, ZITADEL executes when certain quota percentages are reached"; + } + ]; +} + +message SetQuotaResponse { + zitadel.v1.ObjectDetails details = 1; +} + message RemoveQuotaRequest { string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}]; zitadel.quota.v1.Unit unit = 2;