From ae1af6bc8cd2294f47f6d6412c4b46192105ade5 Mon Sep 17 00:00:00 2001 From: Elio Bischof Date: Fri, 22 Sep 2023 11:37:16 +0200 Subject: [PATCH] fix: set quotas (#6597) * feat: set quotas * fix: start new period on younger anchor * cleanup e2e config * fix set notifications * lint * test: fix quota projection tests * fix add quota tests * make quota fields nullable * enable amount 0 * fix initial setup * create a prerelease * avoid success comments * fix quota projection primary key * Revert "fix quota projection primary key" This reverts commit e72f4d7fa17d03d36493912168490350a320e04f. * simplify write model * fix aggregate id * avoid push without changes * test set quota lifecycle * test set quota mutations * fix quota unit test * fix: quotas * test quota.set event projection * use SetQuota in integration tests * fix: release quotas 3 * reset releaserc * fix comment * test notification order doesn't matter * test notification order doesn't matter * test with unmarshalled events * test with unmarshalled events --- cmd/setup/13.go | 26 + cmd/setup/13/13_fix_quota_constraints.sql | 4 + cmd/setup/config.go | 25 +- cmd/setup/setup.go | 3 + e2e/config/localhost/zitadel.yaml | 17 +- internal/api/grpc/system/quota.go | 18 +- internal/api/grpc/system/quota_converter.go | 28 +- .../api/grpc/system/quota_integration_test.go | 12 +- internal/command/instance.go | 4 +- internal/command/quota.go | 133 +++-- internal/command/quota_model.go | 153 +++++- internal/command/quota_model_test.go | 373 ++++++++++++++ internal/command/quota_test.go | 471 ++++++++++++++---- internal/eventstore/handler/crdb/statement.go | 23 +- internal/query/projection/quota.go | 99 ++-- internal/query/projection/quota_test.go | 139 +++++- internal/query/quota_periods.go | 9 +- internal/repository/quota/events.go | 97 ++-- internal/repository/quota/eventstore.go | 6 +- proto/zitadel/system.proto | 63 +++ 20 files changed, 1385 insertions(+), 318 deletions(-) create mode 100644 cmd/setup/13.go create mode 100644 cmd/setup/13/13_fix_quota_constraints.sql create mode 100644 internal/command/quota_model_test.go diff --git a/cmd/setup/13.go b/cmd/setup/13.go new file mode 100644 index 0000000000..3b5df925b9 --- /dev/null +++ b/cmd/setup/13.go @@ -0,0 +1,26 @@ +package setup + +import ( + "context" + _ "embed" + + "github.com/zitadel/zitadel/internal/database" +) + +var ( + //go:embed 13/13_fix_quota_constraints.sql + fixQuotaConstraints string +) + +type FixQuotaConstraints struct { + dbClient *database.DB +} + +func (mig *FixQuotaConstraints) Execute(ctx context.Context) error { + _, err := mig.dbClient.ExecContext(ctx, fixQuotaConstraints) + return err +} + +func (mig *FixQuotaConstraints) String() string { + return "13_fix_quota_constraints" +} diff --git a/cmd/setup/13/13_fix_quota_constraints.sql b/cmd/setup/13/13_fix_quota_constraints.sql new file mode 100644 index 0000000000..d1f95d74ab --- /dev/null +++ b/cmd/setup/13/13_fix_quota_constraints.sql @@ -0,0 +1,4 @@ +ALTER TABLE IF EXISTS projections.quotas ALTER COLUMN from_anchor DROP NOT NULL; +ALTER TABLE IF EXISTS projections.quotas ALTER COLUMN amount DROP NOT NULL; +ALTER TABLE IF EXISTS projections.quotas ALTER COLUMN interval DROP NOT NULL; +ALTER TABLE IF EXISTS projections.quotas ALTER COLUMN limit_usage DROP NOT NULL; diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 939fc9adf9..3290b59e6a 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -56,18 +56,19 @@ func MustNewConfig(v *viper.Viper) *Config { } type Steps struct { - s1ProjectionTable *ProjectionTable - s2AssetsTable *AssetTable - FirstInstance *FirstInstance - s4EventstoreIndexes *EventstoreIndexesNew - s5LastFailed *LastFailed - s6OwnerRemoveColumns *OwnerRemoveColumns - s7LogstoreTables *LogstoreTables - s8AuthTokens *AuthTokenIndexes - s9EventstoreIndexes2 *EventstoreIndexesNew - CorrectCreationDate *CorrectCreationDate - AddEventCreatedAt *AddEventCreatedAt - s12AddOTPColumns *AddOTPColumns + s1ProjectionTable *ProjectionTable + s2AssetsTable *AssetTable + FirstInstance *FirstInstance + s4EventstoreIndexes *EventstoreIndexesNew + s5LastFailed *LastFailed + s6OwnerRemoveColumns *OwnerRemoveColumns + s7LogstoreTables *LogstoreTables + s8AuthTokens *AuthTokenIndexes + s9EventstoreIndexes2 *EventstoreIndexesNew + CorrectCreationDate *CorrectCreationDate + AddEventCreatedAt *AddEventCreatedAt + s12AddOTPColumns *AddOTPColumns + s13FixQuotaProjection *FixQuotaConstraints } type encryptionKeyConfig struct { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 8097b61643..1efe703986 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -95,6 +95,7 @@ func Setup(config *Config, steps *Steps, masterKey string) { steps.AddEventCreatedAt.dbClient = dbClient steps.AddEventCreatedAt.step10 = steps.CorrectCreationDate steps.s12AddOTPColumns = &AddOTPColumns{dbClient: dbClient} + steps.s13FixQuotaProjection = &FixQuotaConstraints{dbClient: dbClient} err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -137,6 +138,8 @@ func Setup(config *Config, steps *Steps, masterKey string) { logging.OnError(err).Fatal("unable to migrate step 11") err = migration.Migrate(ctx, eventstoreClient, steps.s12AddOTPColumns) logging.OnError(err).Fatal("unable to migrate step 12") + err = migration.Migrate(ctx, eventstoreClient, steps.s13FixQuotaProjection) + logging.OnError(err).Fatal("unable to migrate step 13") for _, repeatableStep := range repeatableSteps { err = migration.Migrate(ctx, eventstoreClient, repeatableStep) diff --git a/e2e/config/localhost/zitadel.yaml b/e2e/config/localhost/zitadel.yaml index de758680fa..1c560ddfde 100644 --- a/e2e/config/localhost/zitadel.yaml +++ b/e2e/config/localhost/zitadel.yaml @@ -17,19 +17,6 @@ FirstInstance: Human: PasswordChangeRequired: false -LogStore: - Access: - Database: - Enabled: true - Debounce: - MinFrequency: 0s - MaxBulkSize: 0 - Execution: - Database: - Enabled: true - Stdout: - Enabled: true - Console: InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}" @@ -40,6 +27,10 @@ Projections: Quotas: Access: + Enabled: true + Debounce: + MinFrequency: 0s + MaxBulkSize: 0 ExhaustedCookieKey: "zitadel.quota.limiting" ExhaustedCookieMaxAge: "600s" diff --git a/internal/api/grpc/system/quota.go b/internal/api/grpc/system/quota.go index 745c449b01..2cb57fc710 100644 --- a/internal/api/grpc/system/quota.go +++ b/internal/api/grpc/system/quota.go @@ -5,7 +5,6 @@ import ( "github.com/zitadel/zitadel/internal/api/grpc/object" "github.com/zitadel/zitadel/pkg/grpc/system" - system_pb "github.com/zitadel/zitadel/pkg/grpc/system" ) func (s *Server) AddQuota(ctx context.Context, req *system.AddQuotaRequest) (*system.AddQuotaResponse, error) { @@ -16,7 +15,20 @@ func (s *Server) AddQuota(ctx context.Context, req *system.AddQuotaRequest) (*sy if err != nil { return nil, err } - return &system_pb.AddQuotaResponse{ + return &system.AddQuotaResponse{ + Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), + }, nil +} + +func (s *Server) SetQuota(ctx context.Context, req *system.SetQuotaRequest) (*system.SetQuotaResponse, error) { + details, err := s.command.SetQuota( + ctx, + instanceQuotaPbToCommand(req), + ) + if err != nil { + return nil, err + } + return &system.SetQuotaResponse{ Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), }, nil } @@ -26,7 +38,7 @@ func (s *Server) RemoveQuota(ctx context.Context, req *system.RemoveQuotaRequest if err != nil { return nil, err } - return &system_pb.RemoveQuotaResponse{ + return &system.RemoveQuotaResponse{ Details: object.ChangeToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), }, nil } diff --git a/internal/api/grpc/system/quota_converter.go b/internal/api/grpc/system/quota_converter.go index 5be57b43d5..1945ff0951 100644 --- a/internal/api/grpc/system/quota_converter.go +++ b/internal/api/grpc/system/quota_converter.go @@ -3,17 +3,27 @@ package system import ( "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/pkg/grpc/quota" - "github.com/zitadel/zitadel/pkg/grpc/system" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) -func instanceQuotaPbToCommand(req *system.AddQuotaRequest) *command.AddQuota { - return &command.AddQuota{ - Unit: instanceQuotaUnitPbToCommand(req.Unit), - From: req.From.AsTime(), - ResetInterval: req.ResetInterval.AsDuration(), - Amount: req.Amount, - Limit: req.Limit, - Notifications: instanceQuotaNotificationsPbToCommand(req.Notifications), +type setQuotaRequest interface { + GetUnit() quota.Unit + GetFrom() *timestamppb.Timestamp + GetResetInterval() *durationpb.Duration + GetAmount() uint64 + GetLimit() bool + GetNotifications() []*quota.Notification +} + +func instanceQuotaPbToCommand(req setQuotaRequest) *command.SetQuota { + return &command.SetQuota{ + Unit: instanceQuotaUnitPbToCommand(req.GetUnit()), + From: req.GetFrom().AsTime(), + ResetInterval: req.GetResetInterval().AsDuration(), + Amount: req.GetAmount(), + Limit: req.GetLimit(), + Notifications: instanceQuotaNotificationsPbToCommand(req.GetNotifications()), } } diff --git a/internal/api/grpc/system/quota_integration_test.go b/internal/api/grpc/system/quota_integration_test.go index a8c6840c87..ba0e2cb4d7 100644 --- a/internal/api/grpc/system/quota_integration_test.go +++ b/internal/api/grpc/system/quota_integration_test.go @@ -28,7 +28,7 @@ func TestServer_QuotaNotification_Limit(t *testing.T) { percent := 50 percentAmount := amount * percent / 100 - _, err := Tester.Client.System.AddQuota(SystemCTX, &system.AddQuotaRequest{ + _, err := Tester.Client.System.SetQuota(SystemCTX, &system.SetQuotaRequest{ InstanceId: instanceID, Unit: quota_pb.Unit_UNIT_REQUESTS_ALL_AUTHENTICATED, From: timestamppb.Now(), @@ -72,7 +72,7 @@ func TestServer_QuotaNotification_NoLimit(t *testing.T) { percent := 50 percentAmount := amount * percent / 100 - _, err := Tester.Client.System.AddQuota(SystemCTX, &system.AddQuotaRequest{ + _, err := Tester.Client.System.SetQuota(SystemCTX, &system.SetQuotaRequest{ InstanceId: instanceID, Unit: quota_pb.Unit_UNIT_REQUESTS_ALL_AUTHENTICATED, From: timestamppb.Now(), @@ -151,7 +151,7 @@ func awaitNotification(t *testing.T, bodies chan []byte, unit quota.Unit, percen func TestServer_AddAndRemoveQuota(t *testing.T) { _, instanceID, _ := Tester.UseIsolatedInstance(CTX, SystemCTX) - got, err := Tester.Client.System.AddQuota(SystemCTX, &system.AddQuotaRequest{ + got, err := Tester.Client.System.SetQuota(SystemCTX, &system.SetQuotaRequest{ InstanceId: instanceID, Unit: quota_pb.Unit_UNIT_REQUESTS_ALL_AUTHENTICATED, From: timestamppb.Now(), @@ -169,7 +169,7 @@ func TestServer_AddAndRemoveQuota(t *testing.T) { require.NoError(t, err) require.Equal(t, got.Details.ResourceOwner, instanceID) - gotAlreadyExisting, errAlreadyExisting := Tester.Client.System.AddQuota(SystemCTX, &system.AddQuotaRequest{ + gotAlreadyExisting, errAlreadyExisting := Tester.Client.System.SetQuota(SystemCTX, &system.SetQuotaRequest{ InstanceId: instanceID, Unit: quota_pb.Unit_UNIT_REQUESTS_ALL_AUTHENTICATED, From: timestamppb.Now(), @@ -184,8 +184,8 @@ func TestServer_AddAndRemoveQuota(t *testing.T) { }, }, }) - require.Error(t, errAlreadyExisting) - require.Nil(t, gotAlreadyExisting) + require.Nil(t, errAlreadyExisting) + require.Equal(t, gotAlreadyExisting.Details.ResourceOwner, instanceID) gotRemove, errRemove := Tester.Client.System.RemoveQuota(SystemCTX, &system.RemoveQuotaRequest{ InstanceId: instanceID, diff --git a/internal/command/instance.go b/internal/command/instance.go index 18c13b6d40..1677d930fe 100644 --- a/internal/command/instance.go +++ b/internal/command/instance.go @@ -110,7 +110,7 @@ type InstanceSetup struct { RefreshTokenExpiration time.Duration } Quotas *struct { - Items []*AddQuota + Items []*SetQuota } } @@ -283,7 +283,7 @@ func (c *Commands) SetUpInstance(ctx context.Context, setup *InstanceSetup) (str if err != nil { return "", "", nil, nil, err } - validations = append(validations, c.AddQuotaCommand(quota.NewAggregate(quotaId, instanceID), q)) + validations = append(validations, c.SetQuotaCommand(quota.NewAggregate(quotaId, instanceID), nil, true, q)) } } diff --git a/internal/command/quota.go b/internal/command/quota.go index b787929104..442772ae57 100644 --- a/internal/command/quota.go +++ b/internal/command/quota.go @@ -10,7 +10,6 @@ import ( "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/repository/quota" ) @@ -32,25 +31,25 @@ func (q QuotaUnit) Enum() quota.Unit { } } +// AddQuota returns and error if the quota already exists. +// AddQuota is deprecated. Use SetQuota instead. func (c *Commands) AddQuota( ctx context.Context, - q *AddQuota, + q *SetQuota, ) (*domain.ObjectDetails, error) { instanceId := authz.GetInstance(ctx).InstanceID() - wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, q.Unit.Enum()) if err != nil { return nil, err } - - if wm.active { + if wm.AggregateID != "" { return nil, errors.ThrowAlreadyExists(nil, "COMMAND-WDfFf", "Errors.Quota.AlreadyExists") } aggregateId, err := c.idGenerator.Next() if err != nil { return nil, err } - cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.AddQuotaCommand(quota.NewAggregate(aggregateId, instanceId), q)) + cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.SetQuotaCommand(quota.NewAggregate(aggregateId, instanceId), wm, true, q)) if err != nil { return nil, err } @@ -65,23 +64,52 @@ func (c *Commands) AddQuota( return writeModelToObjectDetails(&wm.WriteModel), nil } +// SetQuota creates a new quota or updates an existing one. +func (c *Commands) SetQuota( + ctx context.Context, + q *SetQuota, +) (*domain.ObjectDetails, error) { + instanceId := authz.GetInstance(ctx).InstanceID() + wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, q.Unit.Enum()) + if err != nil { + return nil, err + } + aggregateId := wm.AggregateID + createNewQuota := aggregateId == "" + if aggregateId == "" { + aggregateId, err = c.idGenerator.Next() + if err != nil { + return nil, err + } + } + cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, c.SetQuotaCommand(quota.NewAggregate(aggregateId, instanceId), wm, createNewQuota, q)) + if err != nil { + return nil, err + } + if len(cmds) > 0 { + events, err := c.eventstore.Push(ctx, cmds...) + if err != nil { + return nil, err + } + err = AppendAndReduce(wm, events...) + if err != nil { + return nil, err + } + } + return writeModelToObjectDetails(&wm.WriteModel), nil +} + func (c *Commands) RemoveQuota(ctx context.Context, unit QuotaUnit) (*domain.ObjectDetails, error) { instanceId := authz.GetInstance(ctx).InstanceID() - wm, err := c.getQuotaWriteModel(ctx, instanceId, instanceId, unit.Enum()) if err != nil { return nil, err } - - if !wm.active { + if wm.AggregateID == "" { return nil, errors.ThrowNotFound(nil, "COMMAND-WDfFf", "Errors.Quota.NotFound") } - aggregate := quota.NewAggregate(wm.AggregateID, instanceId) - - events := []eventstore.Command{ - quota.NewRemovedEvent(ctx, &aggregate.Aggregate, unit.Enum()), - } + events := []eventstore.Command{quota.NewRemovedEvent(ctx, &aggregate.Aggregate, unit.Enum())} pushedEvents, err := c.eventstore.Push(ctx, events...) if err != nil { return nil, err @@ -104,6 +132,16 @@ type QuotaNotification struct { CallURL string } +// SetQuota configures a quota and activates it if it isn't active already +type SetQuota struct { + Unit QuotaUnit `json:"unit"` + From time.Time `json:"from"` + ResetInterval time.Duration `json:"ResetInterval,omitempty"` + Amount uint64 `json:"Amount,omitempty"` + Limit bool `json:"Limit,omitempty"` + Notifications QuotaNotifications `json:"Notifications,omitempty"` +} + type QuotaNotifications []*QuotaNotification func (q *QuotaNotification) validate() error { @@ -111,94 +149,51 @@ func (q *QuotaNotification) validate() error { if err != nil { return errors.ThrowInvalidArgument(err, "QUOTA-bZ0Fj", "Errors.Quota.Invalid.CallURL") } - if !u.IsAbs() || u.Host == "" { return errors.ThrowInvalidArgument(nil, "QUOTA-HAYmN", "Errors.Quota.Invalid.CallURL") } - if q.Percent < 1 { return errors.ThrowInvalidArgument(nil, "QUOTA-pBfjq", "Errors.Quota.Invalid.Percent") } return nil } -func (q *QuotaNotifications) toAddedEventNotifications(idGenerator id.Generator) ([]*quota.AddedEventNotification, error) { - if q == nil { - return nil, nil - } - - notifications := make([]*quota.AddedEventNotification, len(*q)) - for idx, notification := range *q { - - id, err := idGenerator.Next() - if err != nil { - return nil, err - } - - notifications[idx] = "a.AddedEventNotification{ - ID: id, - Percent: notification.Percent, - Repeat: notification.Repeat, - CallURL: notification.CallURL, - } - } - - return notifications, nil -} - -type AddQuota struct { - Unit QuotaUnit - From time.Time - ResetInterval time.Duration - Amount uint64 - Limit bool - Notifications QuotaNotifications -} - -func (q *AddQuota) validate() error { +func (q *SetQuota) validate() error { for _, notification := range q.Notifications { if err := notification.validate(); err != nil { return err } } - if q.Unit.Enum() == quota.Unimplemented { return errors.ThrowInvalidArgument(nil, "QUOTA-OTeSh", "Errors.Quota.Invalid.Unimplemented") } - - if q.Amount < 1 { + if q.Amount < 0 { return errors.ThrowInvalidArgument(nil, "QUOTA-hOKSJ", "Errors.Quota.Invalid.Amount") } - if q.ResetInterval < time.Minute { return errors.ThrowInvalidArgument(nil, "QUOTA-R5otd", "Errors.Quota.Invalid.ResetInterval") } return nil } -func (c *Commands) AddQuotaCommand(a *quota.Aggregate, q *AddQuota) preparation.Validation { +func (c *Commands) SetQuotaCommand(a *quota.Aggregate, wm *quotaWriteModel, createNew bool, q *SetQuota) preparation.Validation { return func() (preparation.CreateCommands, error) { - if err := q.validate(); err != nil { return nil, err } - return func(ctx context.Context, filter preparation.FilterToQueryReducer) (cmd []eventstore.Command, err error) { - - notifications, err := q.Notifications.toAddedEventNotifications(c.idGenerator) - if err != nil { + changes, err := wm.NewChanges(c.idGenerator, createNew, q.Amount, q.From, q.ResetInterval, q.Limit, q.Notifications...) + if len(changes) == 0 { return nil, err } - - return []eventstore.Command{quota.NewAddedEvent( - ctx, - &a.Aggregate, + return []eventstore.Command{quota.NewSetEvent( + eventstore.NewBaseEventForPush( + ctx, + &a.Aggregate, + quota.SetEventType, + ), q.Unit.Enum(), - q.From, - q.ResetInterval, - q.Amount, - q.Limit, - notifications, + changes..., )}, err }, nil diff --git a/internal/command/quota_model.go b/internal/command/quota_model.go index 6e05b4df0b..7336be41a6 100644 --- a/internal/command/quota_model.go +++ b/internal/command/quota_model.go @@ -1,14 +1,26 @@ package command import ( + "errors" + "fmt" + "slices" + "time" + + zitadel_errors "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/repository/quota" ) type quotaWriteModel struct { eventstore.WriteModel - unit quota.Unit - active bool + rollingAggregateID string + unit quota.Unit + from time.Time + resetInterval time.Duration + amount uint64 + limit bool + notifications []*quota.SetEventNotification } // newQuotaWriteModel aggregateId is filled by reducing unit matching events @@ -30,6 +42,7 @@ func (wm *quotaWriteModel) Query() *eventstore.SearchQueryBuilder { AggregateTypes(quota.AggregateType). EventTypes( quota.AddedEventType, + quota.SetEventType, quota.RemovedEventType, ).EventData(map[string]interface{}{"unit": wm.unit}) @@ -38,15 +51,137 @@ func (wm *quotaWriteModel) Query() *eventstore.SearchQueryBuilder { func (wm *quotaWriteModel) Reduce() error { for _, event := range wm.Events { + wm.ChangeDate = event.CreationDate() switch e := event.(type) { - case *quota.AddedEvent: - wm.AggregateID = e.Aggregate().ID - wm.ChangeDate = e.CreationDate() - wm.active = true + case *quota.SetEvent: + wm.rollingAggregateID = e.Aggregate().ID + if e.Amount != nil { + wm.amount = *e.Amount + } + if e.From != nil { + wm.from = *e.From + } + if e.Limit != nil { + wm.limit = *e.Limit + } + if e.ResetInterval != nil { + wm.resetInterval = *e.ResetInterval + } + if e.Notifications != nil { + wm.notifications = *e.Notifications + } case *quota.RemovedEvent: - wm.AggregateID = e.Aggregate().ID - wm.active = false + wm.rollingAggregateID = "" } } - return wm.WriteModel.Reduce() + if err := wm.WriteModel.Reduce(); err != nil { + return err + } + // wm.WriteModel.Reduce() sets the aggregateID to the first event's aggregateID, but we need the last one + wm.AggregateID = wm.rollingAggregateID + return nil +} + +// NewChanges returns all changes that need to be applied to the aggregate. +// If createNew is true, all quota properties are set. +func (wm *quotaWriteModel) NewChanges( + idGenerator id.Generator, + createNew bool, + amount uint64, + from time.Time, + resetInterval time.Duration, + limit bool, + notifications ...*QuotaNotification, +) (changes []quota.QuotaChange, err error) { + setEventNotifications, err := QuotaNotifications(notifications).newSetEventNotifications(idGenerator) + if err != nil { + return nil, err + } + // we sort the input notifications already, so we can return early if they have duplicates + err = sortSetEventNotifications(setEventNotifications) + if err != nil { + return nil, err + } + if createNew { + return []quota.QuotaChange{ + quota.ChangeAmount(amount), + quota.ChangeFrom(from), + quota.ChangeResetInterval(resetInterval), + quota.ChangeLimit(limit), + quota.ChangeNotifications(setEventNotifications), + }, nil + } + changes = make([]quota.QuotaChange, 0, 5) + if wm.amount != amount { + changes = append(changes, quota.ChangeAmount(amount)) + } + if wm.from != from { + changes = append(changes, quota.ChangeFrom(from)) + } + if wm.resetInterval != resetInterval { + changes = append(changes, quota.ChangeResetInterval(resetInterval)) + } + if wm.limit != limit { + changes = append(changes, quota.ChangeLimit(limit)) + } + // If the number of notifications differs, we renew the notifications and we can return early + if len(setEventNotifications) != len(wm.notifications) { + changes = append(changes, quota.ChangeNotifications(setEventNotifications)) + return changes, nil + } + // Now we sort the existing notifications too, so comparing the input properties with the existing ones is easier. + // We ignore the sorting error for the existing notifications, because this is system state, not user input. + // If sorting fails this time, the notifications are listed in the event payload and the projection cleans them up anyway. + _ = sortSetEventNotifications(wm.notifications) + for i, notification := range setEventNotifications { + if notification.CallURL != wm.notifications[i].CallURL || + notification.Percent != wm.notifications[i].Percent || + notification.Repeat != wm.notifications[i].Repeat { + changes = append(changes, quota.ChangeNotifications(setEventNotifications)) + return changes, nil + } + } + return changes, err +} + +// newSetEventNotifications returns quota.SetEventNotification elements with generated IDs. +func (q QuotaNotifications) newSetEventNotifications(idGenerator id.Generator) (setNotifications []*quota.SetEventNotification, err error) { + if q == nil { + return make([]*quota.SetEventNotification, 0), nil + } + notifications := make([]*quota.SetEventNotification, len(q)) + for idx, notification := range q { + notifications[idx] = "a.SetEventNotification{ + Percent: notification.Percent, + Repeat: notification.Repeat, + CallURL: notification.CallURL, + } + notifications[idx].ID, err = idGenerator.Next() + if err != nil { + return nil, err + } + } + return notifications, nil +} + +// sortSetEventNotifications reports an error if there are duplicate notifications or if a pointer is nil +func sortSetEventNotifications(notifications []*quota.SetEventNotification) (err error) { + slices.SortFunc(notifications, func(i, j *quota.SetEventNotification) int { + if i == nil || j == nil { + err = zitadel_errors.ThrowInternal(errors.New("sorting slices of *quota.SetEventNotification with nil pointers is not supported"), "QUOTA-8YXPk", "Errors.Internal") + return 0 + } + if i.Percent == j.Percent && i.CallURL == j.CallURL && i.Repeat == j.Repeat { + // TODO: translate + err = zitadel_errors.ThrowInternal(fmt.Errorf("%+v", i), "QUOTA-Pty2n", "Errors.Quota.Notifications.Duplicate") + return 0 + } + if i.Percent < j.Percent || + i.Percent == j.Percent && i.CallURL < j.CallURL || + i.Percent == j.Percent && i.CallURL == j.CallURL && i.Repeat == false && j.Repeat == true { + return -1 + } + return +1 + }) + return err } diff --git a/internal/command/quota_model_test.go b/internal/command/quota_model_test.go new file mode 100644 index 0000000000..799c156fa1 --- /dev/null +++ b/internal/command/quota_model_test.go @@ -0,0 +1,373 @@ +package command + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/id" + id_mock "github.com/zitadel/zitadel/internal/id/mock" + "github.com/zitadel/zitadel/internal/repository/quota" +) + +func TestQuotaWriteModel_NewChanges(t *testing.T) { + type fields struct { + from time.Time + resetInterval time.Duration + amount uint64 + limit bool + notifications []*quota.SetEventNotification + } + type args struct { + idGenerator id.Generator + createNew bool + amount uint64 + from time.Time + resetInterval time.Duration + limit bool + notifications []*QuotaNotification + } + tests := []struct { + name string + fields fields + args args + wantEvent quota.SetEvent + wantErr assert.ErrorAssertionFunc + }{{ + name: "change reset interval", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: make([]*quota.SetEventNotification, 0), + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Minute, + limit: true, + notifications: make([]*QuotaNotification, 0), + }, + wantEvent: quota.SetEvent{ + ResetInterval: durationPtr(time.Minute), + }, + wantErr: assert.NoError, + }, { + name: "change reset interval and amount", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: make([]*quota.SetEventNotification, 0), + }, + args: args{ + amount: 10, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Minute, + limit: true, + notifications: make([]*QuotaNotification, 0), + }, + wantEvent: quota.SetEvent{ + ResetInterval: durationPtr(time.Minute), + Amount: uint64Ptr(10), + }, + wantErr: assert.NoError, + }, { + name: "change nothing", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*quota.SetEventNotification{}, + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*QuotaNotification{}, + }, + wantEvent: quota.SetEvent{}, + wantErr: assert.NoError, + }, { + name: "change limit to zero value", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: make([]*quota.SetEventNotification, 0), + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: false, + notifications: make([]*QuotaNotification, 0), + }, + wantEvent: quota.SetEvent{Limit: boolPtr(false)}, + wantErr: assert.NoError, + }, { + name: "change amount to zero value", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: make([]*quota.SetEventNotification, 0), + }, + args: args{ + amount: 0, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: make([]*QuotaNotification, 0), + }, + wantEvent: quota.SetEvent{Amount: uint64Ptr(0)}, + wantErr: assert.NoError, + }, { + name: "change from to zero value", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: make([]*quota.SetEventNotification, 0), + }, + args: args{ + amount: 5, + from: time.Time{}, + resetInterval: time.Hour, + limit: true, + notifications: make([]*QuotaNotification, 0), + }, + wantEvent: quota.SetEvent{From: &time.Time{}}, + wantErr: assert.NoError, + }, { + name: "add notification", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*quota.SetEventNotification{{ + ID: "notification1", + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*QuotaNotification{{ + Percent: 20, + Repeat: true, + CallURL: "https://call.url", + }}, + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "notification1"), + }, + wantEvent: quota.SetEvent{Notifications: &[]*quota.SetEventNotification{{ + ID: "notification1", + Percent: 20, + Repeat: true, + CallURL: "https://call.url", + }}}, + wantErr: assert.NoError, + }, { + name: "change nothing with notification", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*quota.SetEventNotification{{ + ID: "notification1", + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*QuotaNotification{{ + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + idGenerator: id_mock.NewIDGenerator(t), + }, + wantEvent: quota.SetEvent{}, + wantErr: assert.NoError, + }, { + name: "change nothing but notification order", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*quota.SetEventNotification{{ + ID: "notification1", + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }, { + ID: "notification2", + Percent: 10, + Repeat: false, + CallURL: "https://call.url", + }}, + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*QuotaNotification{{ + Percent: 10, + Repeat: false, + CallURL: "https://call.url", + }, { + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "newnotification1", "newnotification2"), + }, + wantEvent: quota.SetEvent{}, + wantErr: assert.NoError, + }, { + name: "change notification to zero value", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*quota.SetEventNotification{{ + ID: "notification1", + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*QuotaNotification{}, + }, + wantEvent: quota.SetEvent{Notifications: &[]*quota.SetEventNotification{}}, + wantErr: assert.NoError, + }, { + name: "create new without notification", + fields: fields{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*quota.SetEventNotification{{ + ID: "notification1", + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + }, + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*QuotaNotification{}, + }, + wantEvent: quota.SetEvent{Notifications: &[]*quota.SetEventNotification{}}, + wantErr: assert.NoError, + }, { + name: "create new with all values values", + args: args{ + amount: 5, + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + resetInterval: time.Hour, + limit: true, + notifications: []*QuotaNotification{{ + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "notification1"), + createNew: true, + }, + wantEvent: quota.SetEvent{ + From: timePtr(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), + ResetInterval: durationPtr(time.Hour), + Amount: uint64Ptr(5), + Limit: boolPtr(true), + Notifications: &[]*quota.SetEventNotification{{ + ID: "notification1", + Percent: 10, + Repeat: true, + CallURL: "https://call.url", + }}, + }, + wantErr: assert.NoError, + }, { + name: "create new with zero values", + args: args{createNew: true}, + wantEvent: quota.SetEvent{ + From: &time.Time{}, + ResetInterval: durationPtr(0), + Amount: uint64Ptr(0), + Limit: boolPtr(false), + Notifications: &[]*quota.SetEventNotification{}, + }, + wantErr: assert.NoError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + wm := "aWriteModel{ + from: tt.fields.from, + resetInterval: tt.fields.resetInterval, + amount: tt.fields.amount, + limit: tt.fields.limit, + notifications: tt.fields.notifications, + } + gotChanges, err := wm.NewChanges(tt.args.idGenerator, tt.args.createNew, tt.args.amount, tt.args.from, tt.args.resetInterval, tt.args.limit, tt.args.notifications...) + if !tt.wantErr(t, err, fmt.Sprintf("NewChanges(%v, %v, %v, %v, %v, %v)", tt.args.createNew, tt.args.amount, tt.args.from, tt.args.resetInterval, tt.args.limit, tt.args.notifications)) { + return + } + marshalled, err := json.Marshal(quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "instance1").Aggregate, + quota.SetEventType, + ), + quota.Unimplemented, + gotChanges..., + )) + assert.NoError(t, err) + unmarshalled := new(quota.SetEvent) + assert.NoError(t, json.Unmarshal(marshalled, unmarshalled)) + assert.Equalf(t, tt.wantEvent, *unmarshalled, "NewChanges(%v, %v, %v, %v, %v, %v)", tt.args.createNew, tt.args.amount, tt.args.from, tt.args.resetInterval, tt.args.limit, tt.args.notifications) + }) + } +} + +func uint64Ptr(i uint64) *uint64 { return &i } +func boolPtr(b bool) *bool { return &b } +func durationPtr(d time.Duration) *time.Duration { return &d } +func timePtr(t time.Time) *time.Time { return &t } diff --git a/internal/command/quota_test.go b/internal/command/quota_test.go index a3f42b5917..75c8e6bdbd 100644 --- a/internal/command/quota_test.go +++ b/internal/command/quota_test.go @@ -25,7 +25,7 @@ func TestQuota_AddQuota(t *testing.T) { } type args struct { ctx context.Context - addQuota *AddQuota + setQuota *SetQuota } type res struct { want *domain.ObjectDetails @@ -44,14 +44,18 @@ func TestQuota_AddQuota(t *testing.T) { t, expectFilter( eventFromEventPusher( - quota.NewAddedEvent(context.Background(), - "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), QuotaRequestsAllAuthenticated.Enum(), - time.Now(), - 30*24*time.Hour, - 1000, - false, - nil, + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(false), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), ), ), ), @@ -59,13 +63,12 @@ func TestQuota_AddQuota(t *testing.T) { }, args: args{ ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), - addQuota: &AddQuota{ + setQuota: &SetQuota{ Unit: QuotaRequestsAllAuthenticated, - From: time.Time{}, - ResetInterval: 0, - Amount: 0, - Limit: false, - Notifications: nil, + From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), + ResetInterval: 30 * 24 * time.Hour, + Amount: 1000, + Limit: true, }, }, res: res{ @@ -83,7 +86,7 @@ func TestQuota_AddQuota(t *testing.T) { }, args: args{ ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), - addQuota: &AddQuota{ + setQuota: &SetQuota{ Unit: "unimplemented", From: time.Time{}, ResetInterval: 0, @@ -108,25 +111,28 @@ func TestQuota_AddQuota(t *testing.T) { []*repository.Event{ eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), - "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), QuotaRequestsAllAuthenticated.Enum(), - time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), - 30*24*time.Hour, - 1000, - true, - nil, + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), ), ), }, - uniqueConstraintsFromEventConstraintWithInstanceID("INSTANCE", quota.NewAddQuotaUnitUniqueConstraint(quota.RequestsAllAuthenticated)), ), ), idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota1"), }, args: args{ ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), - addQuota: &AddQuota{ + setQuota: &SetQuota{ Unit: QuotaRequestsAllAuthenticated, From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), ResetInterval: 30 * 24 * time.Hour, @@ -142,21 +148,25 @@ func TestQuota_AddQuota(t *testing.T) { }, }, { - name: "removed, ok", + name: "recreate quota, ok", fields: fields{ eventstore: eventstoreExpect( t, expectFilter( eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), - "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), QuotaRequestsAllAuthenticated.Enum(), - time.Now(), - 30*24*time.Hour, - 1000, - true, - nil, + quota.ChangeFrom(time.Now()), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), ), ), eventFromEventPusherWithInstanceID( @@ -171,25 +181,28 @@ func TestQuota_AddQuota(t *testing.T) { []*repository.Event{ eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), - "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota2", "INSTANCE").Aggregate, + quota.SetEventType, + ), QuotaRequestsAllAuthenticated.Enum(), - time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), - 30*24*time.Hour, - 1000, - true, - nil, + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), ), ), }, - uniqueConstraintsFromEventConstraintWithInstanceID("INSTANCE", quota.NewAddQuotaUnitUniqueConstraint(quota.RequestsAllAuthenticated)), ), ), - idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota1"), + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota2"), }, args: args{ ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), - addQuota: &AddQuota{ + setQuota: &SetQuota{ Unit: QuotaRequestsAllAuthenticated, From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), ResetInterval: 30 * 24 * time.Hour, @@ -214,32 +227,35 @@ func TestQuota_AddQuota(t *testing.T) { []*repository.Event{ eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), - "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), QuotaRequestsAllAuthenticated.Enum(), - time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), - 30*24*time.Hour, - 1000, - true, - []*quota.AddedEventNotification{ - { + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications( + []*quota.SetEventNotification{{ ID: "notification1", Percent: 20, Repeat: false, CallURL: "https://url.com", - }, - }, + }}, + ), ), ), }, - uniqueConstraintsFromEventConstraintWithInstanceID("INSTANCE", quota.NewAddQuotaUnitUniqueConstraint(quota.RequestsAllAuthenticated)), ), ), idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota1", "notification1"), }, args: args{ ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), - addQuota: &AddQuota{ + setQuota: &SetQuota{ Unit: QuotaRequestsAllAuthenticated, From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), ResetInterval: 30 * 24 * time.Hour, @@ -267,7 +283,288 @@ func TestQuota_AddQuota(t *testing.T) { eventstore: tt.fields.eventstore, idGenerator: tt.fields.idGenerator, } - got, err := r.AddQuota(tt.args.ctx, tt.args.addQuota) + got, err := r.AddQuota(tt.args.ctx, tt.args.setQuota) + if tt.res.err == nil { + assert.NoError(t, err) + } + if tt.res.err != nil && !tt.res.err(err) { + t.Errorf("got wrong err: %v ", err) + } + if tt.res.err == nil { + assert.Equal(t, tt.res.want, got) + } + }) + } +} + +func TestQuota_SetQuota(t *testing.T) { + type fields struct { + eventstore *eventstore.Eventstore + idGenerator id.Generator + } + type args struct { + ctx context.Context + setQuota *SetQuota + } + type res struct { + want *domain.ObjectDetails + err func(error) bool + } + tests := []struct { + name string + fields fields + args args + res res + }{ + { + name: "already existing", + fields: fields{ + eventstore: eventstoreExpect( + t, + expectFilter( + eventFromEventPusher( + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), + QuotaRequestsAllAuthenticated.Enum(), + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), + ), + ), + ), + ), + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), + setQuota: &SetQuota{ + Unit: QuotaRequestsAllAuthenticated, + From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), + ResetInterval: 30 * 24 * time.Hour, + Amount: 1000, + Limit: true, + }, + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "INSTANCE", + }, + }, + }, + { + name: "create quota, validation fail", + fields: fields{ + eventstore: eventstoreExpect( + t, + expectFilter(), + ), + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota1"), + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), + setQuota: &SetQuota{ + Unit: "unimplemented", + From: time.Time{}, + ResetInterval: 0, + Amount: 0, + Limit: false, + Notifications: nil, + }, + }, + res: res{ + err: func(err error) bool { + return errors.Is(err, caos_errors.ThrowInvalidArgument(nil, "QUOTA-OTeSh", "")) + }, + }, + }, + { + name: "create quota, ok", + fields: fields{ + eventstore: eventstoreExpect( + t, + expectFilter(), + expectPush( + []*repository.Event{ + eventFromEventPusherWithInstanceID( + "INSTANCE", + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), + QuotaRequestsAllAuthenticated.Enum(), + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), + ), + ), + }, + ), + ), + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota1"), + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), + setQuota: &SetQuota{ + Unit: QuotaRequestsAllAuthenticated, + From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), + ResetInterval: 30 * 24 * time.Hour, + Amount: 1000, + Limit: true, + Notifications: nil, + }, + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "INSTANCE", + }, + }, + }, + { + name: "recreate quota, ok", + fields: fields{ + eventstore: eventstoreExpect( + t, + expectFilter( + eventFromEventPusherWithInstanceID( + "INSTANCE", + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), + QuotaRequestsAllAuthenticated.Enum(), + quota.ChangeFrom(time.Now()), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), + ), + ), + eventFromEventPusherWithInstanceID( + "INSTANCE", + quota.NewRemovedEvent(context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + QuotaRequestsAllAuthenticated.Enum(), + ), + ), + ), + expectPush( + []*repository.Event{ + eventFromEventPusherWithInstanceID( + "INSTANCE", + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota2", "INSTANCE").Aggregate, + quota.SetEventType, + ), + QuotaRequestsAllAuthenticated.Enum(), + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications(make([]*quota.SetEventNotification, 0)), + ), + ), + }, + ), + ), + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota2"), + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), + setQuota: &SetQuota{ + Unit: QuotaRequestsAllAuthenticated, + From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), + ResetInterval: 30 * 24 * time.Hour, + Amount: 1000, + Limit: true, + Notifications: nil, + }, + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "INSTANCE", + }, + }, + }, + { + name: "create quota with notifications, ok", + fields: fields{ + eventstore: eventstoreExpect( + t, + expectFilter(), + expectPush( + []*repository.Event{ + eventFromEventPusherWithInstanceID( + "INSTANCE", + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), + QuotaRequestsAllAuthenticated.Enum(), + quota.ChangeFrom(time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC)), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), + quota.ChangeNotifications( + []*quota.SetEventNotification{{ + ID: "notification1", + Percent: 20, + Repeat: false, + CallURL: "https://url.com", + }}, + ), + ), + ), + }, + ), + ), + idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "quota1", "notification1"), + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "INSTANCE"), + setQuota: &SetQuota{ + Unit: QuotaRequestsAllAuthenticated, + From: time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), + ResetInterval: 30 * 24 * time.Hour, + Amount: 1000, + Limit: true, + Notifications: QuotaNotifications{ + { + Percent: 20, + Repeat: false, + CallURL: "https://url.com", + }, + }, + }, + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "INSTANCE", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Commands{ + eventstore: tt.fields.eventstore, + idGenerator: tt.fields.idGenerator, + } + got, err := r.SetQuota(tt.args.ctx, tt.args.setQuota) if tt.res.err == nil { assert.NoError(t, err) } @@ -325,14 +622,17 @@ func TestQuota_RemoveQuota(t *testing.T) { expectFilter( eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), - "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), QuotaRequestsAllAuthenticated.Enum(), - time.Now(), - 30*24*time.Hour, - 1000, - true, - nil, + quota.ChangeFrom(time.Now()), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(true), ), ), eventFromEventPusherWithInstanceID( @@ -363,14 +663,17 @@ func TestQuota_RemoveQuota(t *testing.T) { expectFilter( eventFromEventPusherWithInstanceID( "INSTANCE", - quota.NewAddedEvent(context.Background(), - "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + "a.NewAggregate("quota1", "INSTANCE").Aggregate, + quota.SetEventType, + ), QuotaRequestsAllAuthenticated.Enum(), - time.Now(), - 30*24*time.Hour, - 1000, - false, - nil, + quota.ChangeFrom(time.Now()), + quota.ChangeResetInterval(30*24*time.Hour), + quota.ChangeAmount(1000), + quota.ChangeLimit(false), ), ), ), @@ -517,9 +820,9 @@ func TestQuota_QuotaNotification_validate(t *testing.T) { } } -func TestQuota_AddQuota_validate(t *testing.T) { +func TestQuota_SetQuota_validate(t *testing.T) { type args struct { - addQuota *AddQuota + addQuota *SetQuota } type res struct { err func(error) bool @@ -532,7 +835,7 @@ func TestQuota_AddQuota_validate(t *testing.T) { { name: "notification url parse failed", args: args{ - addQuota: &AddQuota{ + addQuota: &SetQuota{ Unit: QuotaRequestsAllAuthenticated, From: time.Now(), ResetInterval: time.Minute * 10, @@ -556,7 +859,7 @@ func TestQuota_AddQuota_validate(t *testing.T) { { name: "unit unimplemented", args: args{ - addQuota: &AddQuota{ + addQuota: &SetQuota{ Unit: "unimplemented", From: time.Now(), ResetInterval: time.Minute * 10, @@ -571,28 +874,10 @@ func TestQuota_AddQuota_validate(t *testing.T) { }, }, }, - { - name: "amount 0", - args: args{ - addQuota: &AddQuota{ - Unit: QuotaRequestsAllAuthenticated, - From: time.Now(), - ResetInterval: time.Minute * 10, - Amount: 0, - Limit: true, - Notifications: nil, - }, - }, - res: res{ - err: func(err error) bool { - return errors.Is(err, caos_errors.ThrowInvalidArgument(nil, "QUOTA-hOKSJ", "")) - }, - }, - }, { name: "reset interval under 1 min", args: args{ - addQuota: &AddQuota{ + addQuota: &SetQuota{ Unit: QuotaRequestsAllAuthenticated, From: time.Now(), ResetInterval: time.Second * 10, @@ -610,7 +895,7 @@ func TestQuota_AddQuota_validate(t *testing.T) { { name: "validate, ok", args: args{ - addQuota: &AddQuota{ + addQuota: &SetQuota{ Unit: QuotaRequestsAllAuthenticated, From: time.Now(), ResetInterval: time.Minute * 10, diff --git a/internal/eventstore/handler/crdb/statement.go b/internal/eventstore/handler/crdb/statement.go index efcc4952e0..bc1d50d0fc 100644 --- a/internal/eventstore/handler/crdb/statement.go +++ b/internal/eventstore/handler/crdb/statement.go @@ -1,11 +1,15 @@ package crdb import ( + "database/sql" + "errors" "strconv" "strings" + "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/database" - caos_errs "github.com/zitadel/zitadel/internal/errors" + zitadel_errors "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" ) @@ -14,8 +18,9 @@ type execOption func(*execConfig) type execConfig struct { tableName string - args []interface{} - err error + args []interface{} + err error + ignoreNotFound bool } func WithTableSuffix(name string) func(*execConfig) { @@ -24,6 +29,12 @@ func WithTableSuffix(name string) func(*execConfig) { } } +func WithIgnoreNotFound() func(*execConfig) { + return func(o *execConfig) { + o.ignoreNotFound = true + } +} + func NewCreateStatement(event eventstore.Event, values []handler.Column, opts ...execOption) *handler.Statement { cols, params, args := columnsToQuery(values) columnNames := strings.Join(cols, ", ") @@ -436,7 +447,11 @@ func exec(config execConfig, q query, opts []execOption) Exec { } if _, err := ex.Exec(q(config), config.args...); err != nil { - return caos_errs.ThrowInternal(err, "CRDB-pKtsr", "exec failed") + if config.ignoreNotFound && errors.Is(err, sql.ErrNoRows) { + logging.WithError(err).Debugf("ignored not found: %v", err) + return nil + } + return zitadel_errors.ThrowInternal(err, "CRDB-pKtsr", "exec failed") } return nil diff --git a/internal/query/projection/quota.go b/internal/query/projection/quota.go index fd6e12266f..e6de8663ac 100644 --- a/internal/query/projection/quota.go +++ b/internal/query/projection/quota.go @@ -5,7 +5,7 @@ import ( "time" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/errors" + zitadel_errors "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" @@ -65,10 +65,10 @@ func newQuotaProjection(ctx context.Context, config crdb.StatementHandlerConfig) crdb.NewColumn(QuotaColumnID, crdb.ColumnTypeText), crdb.NewColumn(QuotaColumnInstanceID, crdb.ColumnTypeText), crdb.NewColumn(QuotaColumnUnit, crdb.ColumnTypeEnum), - crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64), - crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp), - crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval), - crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool), + crdb.NewColumn(QuotaColumnAmount, crdb.ColumnTypeInt64, crdb.Nullable()), + crdb.NewColumn(QuotaColumnFrom, crdb.ColumnTypeTimestamp, crdb.Nullable()), + crdb.NewColumn(QuotaColumnInterval, crdb.ColumnTypeInterval, crdb.Nullable()), + crdb.NewColumn(QuotaColumnLimit, crdb.ColumnTypeBool, crdb.Nullable()), }, crdb.NewPrimaryKey(QuotaColumnInstanceID, QuotaColumnUnit), ), @@ -118,31 +118,20 @@ func (q *quotaProjection) reducers() []handler.AggregateReducer { EventRedusers: []handler.EventReducer{ { Event: quota.AddedEventType, - Reduce: q.reduceQuotaAdded, + Reduce: q.reduceQuotaSet, + }, + { + Event: quota.SetEventType, + Reduce: q.reduceQuotaSet, }, - }, - }, - { - Aggregate: quota.AggregateType, - EventRedusers: []handler.EventReducer{ { Event: quota.RemovedEventType, Reduce: q.reduceQuotaRemoved, }, - }, - }, - { - Aggregate: quota.AggregateType, - EventRedusers: []handler.EventReducer{ { Event: quota.NotificationDueEventType, Reduce: q.reduceQuotaNotificationDue, }, - }, - }, - { - Aggregate: quota.AggregateType, - EventRedusers: []handler.EventReducer{ { Event: quota.NotifiedEventType, Reduce: q.reduceQuotaNotified, @@ -156,26 +145,53 @@ func (q *quotaProjection) reduceQuotaNotified(event eventstore.Event) (*handler. return crdb.NewNoOpStatement(event), nil } -func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Statement, error) { - e, err := assertEvent[*quota.AddedEvent](event) +func (q *quotaProjection) reduceQuotaSet(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*quota.SetEvent](event) if err != nil { return nil, err } + var statements []func(e eventstore.Event) crdb.Exec - createStatements := make([]func(e eventstore.Event) crdb.Exec, len(e.Notifications)+1) - createStatements[0] = crdb.AddCreateStatement( - []handler.Column{ - handler.NewCol(QuotaColumnID, e.Aggregate().ID), - handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID), - handler.NewCol(QuotaColumnUnit, e.Unit), - handler.NewCol(QuotaColumnAmount, e.Amount), - handler.NewCol(QuotaColumnFrom, e.From), - handler.NewCol(QuotaColumnInterval, e.ResetInterval), - handler.NewCol(QuotaColumnLimit, e.Limit), - }) - for i := range e.Notifications { - notification := e.Notifications[i] - createStatements[i+1] = crdb.AddCreateStatement( + // 1. Insert or update quota if the event has not only notification changes + quotaConflictColumns := []handler.Column{ + handler.NewCol(QuotaColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(QuotaColumnUnit, e.Unit), + } + quotaUpdateCols := make([]handler.Column, 0, 4+1+len(quotaConflictColumns)) + if e.Limit != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnLimit, *e.Limit)) + } + if e.Amount != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnAmount, *e.Amount)) + } + if e.From != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnFrom, *e.From)) + } + if e.ResetInterval != nil { + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnInterval, *e.ResetInterval)) + } + if len(quotaUpdateCols) > 0 { + // TODO: Add the quota ID to the primary key in a migration? + quotaUpdateCols = append(quotaUpdateCols, handler.NewCol(QuotaColumnID, e.Aggregate().ID)) + quotaUpdateCols = append(quotaUpdateCols, quotaConflictColumns...) + statements = append(statements, crdb.AddUpsertStatement(quotaConflictColumns, quotaUpdateCols)) + } + + // 2. Delete existing notifications + if e.Notifications == nil { + return crdb.NewMultiStatement(e, statements...), nil + } + statements = append(statements, crdb.AddDeleteStatement( + []handler.Condition{ + handler.NewCond(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCond(QuotaNotificationColumnUnit, e.Unit), + }, + crdb.WithTableSuffix(quotaNotificationsTableSuffix), + )) + notifications := *e.Notifications + for i := range notifications { + notification := notifications[i] + statements = append(statements, crdb.AddCreateStatement( []handler.Column{ handler.NewCol(QuotaNotificationColumnInstanceID, e.Aggregate().InstanceID), handler.NewCol(QuotaNotificationColumnUnit, e.Unit), @@ -185,10 +201,9 @@ func (q *quotaProjection) reduceQuotaAdded(event eventstore.Event) (*handler.Sta handler.NewCol(QuotaNotificationColumnRepeat, notification.Repeat), }, crdb.WithTableSuffix(quotaNotificationsTableSuffix), - ) + )) } - - return crdb.NewMultiStatement(e, createStatements...), nil + return crdb.NewMultiStatement(e, statements...), nil } func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*handler.Statement, error) { @@ -207,6 +222,8 @@ func (q *quotaProjection) reduceQuotaNotificationDue(event eventstore.Event) (*h handler.NewCond(QuotaNotificationColumnID, e.ID), }, crdb.WithTableSuffix(quotaNotificationsTableSuffix), + // The notification could have been removed in the meantime + crdb.WithIgnoreNotFound(), ), nil } @@ -279,7 +296,7 @@ func (q *quotaProjection) IncrementUsage(ctx context.Context, unit quota.Unit, i instanceID, unit, periodStart, count, ).Scan(&sum) if err != nil { - return 0, errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit) + return 0, zitadel_errors.ThrowInternalf(err, "PROJ-SJL3h", "incrementing usage for unit %d failed for at least one quota period", unit) } return sum, err } diff --git a/internal/query/projection/quota_test.go b/internal/query/projection/quota_test.go index dc9184da07..8bccec08a9 100644 --- a/internal/query/projection/quota_test.go +++ b/internal/query/projection/quota_test.go @@ -29,7 +29,7 @@ func TestQuotasProjection_reduces(t *testing.T) { want wantReduce }{ { - name: "reduceQuotaAdded", + name: "reduceQuotaSet with added type", args: args{ event: getEvent(testEvent( repository.EventType(quota.AddedEventType), @@ -41,9 +41,9 @@ func TestQuotasProjection_reduces(t *testing.T) { "from": "2023-01-01T00:00:00Z", "interval": 300000000000 }`), - ), quota.AddedEventMapper), + ), quota.SetEventMapper), }, - reduce: ("aProjection{}).reduceQuotaAdded, + reduce: ("aProjection{}).reduceQuotaSet, want: wantReduce{ aggregateType: eventstore.AggregateType("quota"), sequence: 15, @@ -51,15 +51,15 @@ func TestQuotasProjection_reduces(t *testing.T) { executer: &testExecuter{ executions: []execution{ { - expectedStmt: "INSERT INTO projections.quotas (id, instance_id, unit, amount, from_anchor, interval, limit_usage) VALUES ($1, $2, $3, $4, $5, $6, $7)", + expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)", expectedArgs: []interface{}{ - "agg-id", - "instance-id", - quota.RequestsAllAuthenticated, + true, uint64(10), time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC), time.Minute * 5, - true, + "agg-id", + "instance-id", + quota.RequestsAllAuthenticated, }, }, }, @@ -67,7 +67,7 @@ func TestQuotasProjection_reduces(t *testing.T) { }, }, { - name: "reduceQuotaAdded with notification", + name: "reduceQuotaAdded with added type and notification", args: args{ event: getEvent(testEvent( repository.EventType(quota.AddedEventType), @@ -87,9 +87,9 @@ func TestQuotasProjection_reduces(t *testing.T) { } ] }`), - ), quota.AddedEventMapper), + ), quota.SetEventMapper), }, - reduce: ("aProjection{}).reduceQuotaAdded, + reduce: ("aProjection{}).reduceQuotaSet, want: wantReduce{ aggregateType: eventstore.AggregateType("quota"), sequence: 15, @@ -97,17 +97,126 @@ func TestQuotasProjection_reduces(t *testing.T) { executer: &testExecuter{ executions: []execution{ { - expectedStmt: "INSERT INTO projections.quotas (id, instance_id, unit, amount, from_anchor, interval, limit_usage) VALUES ($1, $2, $3, $4, $5, $6, $7)", + expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)", expectedArgs: []interface{}{ - "agg-id", - "instance-id", - quota.RequestsAllAuthenticated, + true, uint64(10), time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC), time.Minute * 5, + "agg-id", + "instance-id", + quota.RequestsAllAuthenticated, + }, + }, + { + expectedStmt: "DELETE FROM projections.quotas_notifications WHERE (instance_id = $1) AND (unit = $2)", + expectedArgs: []interface{}{ + "instance-id", + quota.RequestsAllAuthenticated, + }, + }, + { + expectedStmt: "INSERT INTO projections.quotas_notifications (instance_id, unit, id, call_url, percent, repeat) VALUES ($1, $2, $3, $4, $5, $6)", + expectedArgs: []interface{}{ + "instance-id", + quota.RequestsAllAuthenticated, + "id", + "url", + uint16(100), true, }, }, + }, + }, + }, + }, + { + name: "reduceQuotaSet with set type", + args: args{ + event: getEvent(testEvent( + repository.EventType(quota.SetEventType), + quota.AggregateType, + []byte(`{ + "unit": 1, + "amount": 10, + "limit": true, + "from": "2023-01-01T00:00:00Z", + "interval": 300000000000 + }`), + ), quota.SetEventMapper), + }, + reduce: ("aProjection{}).reduceQuotaSet, + want: wantReduce{ + aggregateType: eventstore.AggregateType("quota"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)", + expectedArgs: []interface{}{ + true, + uint64(10), + time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC), + time.Minute * 5, + "agg-id", + "instance-id", + quota.RequestsAllAuthenticated, + }, + }, + }, + }, + }, + }, + { + name: "reduceQuotaAdded with set type and notification", + args: args{ + event: getEvent(testEvent( + repository.EventType(quota.SetEventType), + quota.AggregateType, + []byte(`{ + "unit": 1, + "amount": 10, + "limit": true, + "from": "2023-01-01T00:00:00Z", + "interval": 300000000000, + "notifications": [ + { + "id": "id", + "percent": 100, + "repeat": true, + "callURL": "url" + } + ] + }`), + ), quota.SetEventMapper), + }, + reduce: ("aProjection{}).reduceQuotaSet, + want: wantReduce{ + aggregateType: eventstore.AggregateType("quota"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "INSERT INTO projections.quotas (limit_usage, amount, from_anchor, interval, id, instance_id, unit) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, unit) DO UPDATE SET (limit_usage, amount, from_anchor, interval, id) = (EXCLUDED.limit_usage, EXCLUDED.amount, EXCLUDED.from_anchor, EXCLUDED.interval, EXCLUDED.id)", + expectedArgs: []interface{}{ + true, + uint64(10), + time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC), + time.Minute * 5, + "agg-id", + "instance-id", + quota.RequestsAllAuthenticated, + }, + }, + { + expectedStmt: "DELETE FROM projections.quotas_notifications WHERE (instance_id = $1) AND (unit = $2)", + expectedArgs: []interface{}{ + "instance-id", + quota.RequestsAllAuthenticated, + }, + }, { expectedStmt: "INSERT INTO projections.quotas_notifications (instance_id, unit, id, call_url, percent, repeat) VALUES ($1, $2, $3, $4, $5, $6)", expectedArgs: []interface{}{ diff --git a/internal/query/quota_periods.go b/internal/query/quota_periods.go index 5f540e9fcc..ce1b76cd08 100644 --- a/internal/query/quota_periods.go +++ b/internal/query/quota_periods.go @@ -49,7 +49,8 @@ func (q *Queries) GetRemainingQuotaUsage(ctx context.Context, instanceID string, QuotaColumnLimit.identifier(): true, }, sq.Expr("age(" + QuotaPeriodColumnStart.identifier() + ") < " + QuotaColumnInterval.identifier()), - sq.Expr(QuotaPeriodColumnStart.identifier() + " < now()"), + sq.Expr(QuotaPeriodColumnStart.identifier() + " <= now()"), + sq.Expr(QuotaPeriodColumnStart.identifier() + " >= " + QuotaColumnFrom.identifier()), }). ToSql() if err != nil { @@ -73,14 +74,14 @@ func prepareRemainingQuotaUsageQuery(ctx context.Context, db prepareDatabase) (s From(quotaPeriodsTable.identifier()). Join(join(QuotaColumnUnit, QuotaPeriodColumnUnit) + db.Timetravel(call.Took(ctx))). PlaceholderFormat(sq.Dollar), func(row *sql.Row) (*uint64, error) { - usage := new(uint64) - err := row.Scan(usage) + remaining := new(uint64) + err := row.Scan(remaining) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, zitadel_errors.ThrowNotFound(err, "QUERY-quiowi2", "Errors.Internal") } return nil, zitadel_errors.ThrowInternal(err, "QUERY-81j1jn2", "Errors.Internal") } - return usage, nil + return remaining, nil } } diff --git a/internal/repository/quota/events.go b/internal/repository/quota/events.go index 5bdb8f0c44..06f41cdd0c 100644 --- a/internal/repository/quota/events.go +++ b/internal/repository/quota/events.go @@ -17,6 +17,7 @@ const ( UniqueQuotaNameType = "quota_units" eventTypePrefix = eventstore.EventType("quota.") AddedEventType = eventTypePrefix + "added" + SetEventType = eventTypePrefix + "set" NotifiedEventType = eventTypePrefix + "notified" NotificationDueEventType = eventTypePrefix + "notificationdue" RemovedEventType = eventTypePrefix + "removed" @@ -43,65 +44,87 @@ func NewRemoveQuotaNameUniqueConstraint(unit Unit) *eventstore.EventUniqueConstr ) } -type AddedEvent struct { +// SetEvent describes that a quota is added or modified and contains only changed properties +type SetEvent struct { eventstore.BaseEvent `json:"-"` - - Unit Unit `json:"unit"` - From time.Time `json:"from"` - ResetInterval time.Duration `json:"interval,omitempty"` - Amount uint64 `json:"amount"` - Limit bool `json:"limit"` - Notifications []*AddedEventNotification `json:"notifications,omitempty"` + Unit Unit `json:"unit"` + From *time.Time `json:"from,omitempty"` + ResetInterval *time.Duration `json:"interval,omitempty"` + Amount *uint64 `json:"amount,omitempty"` + Limit *bool `json:"limit,omitempty"` + Notifications *[]*SetEventNotification `json:"notifications,omitempty"` } -type AddedEventNotification struct { +type SetEventNotification struct { ID string `json:"id"` Percent uint16 `json:"percent"` - Repeat bool `json:"repeat,omitempty"` + Repeat bool `json:"repeat"` CallURL string `json:"callUrl"` } -func (e *AddedEvent) Data() interface{} { +func (e *SetEvent) Data() interface{} { return e } -func (e *AddedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { - return []*eventstore.EventUniqueConstraint{NewAddQuotaUnitUniqueConstraint(e.Unit)} +func (e *SetEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { + return nil } -func NewAddedEvent( - ctx context.Context, - aggregate *eventstore.Aggregate, +func NewSetEvent( + base *eventstore.BaseEvent, unit Unit, - from time.Time, - resetInterval time.Duration, - amount uint64, - limit bool, - notifications []*AddedEventNotification, -) *AddedEvent { - return &AddedEvent{ - BaseEvent: *eventstore.NewBaseEventForPush( - ctx, - aggregate, - AddedEventType, - ), - Unit: unit, - From: from, - ResetInterval: resetInterval, - Amount: amount, - Limit: limit, - Notifications: notifications, + changes ...QuotaChange, +) *SetEvent { + changedEvent := &SetEvent{ + BaseEvent: *base, + Unit: unit, + } + for _, change := range changes { + change(changedEvent) + } + return changedEvent +} + +type QuotaChange func(*SetEvent) + +func ChangeAmount(amount uint64) QuotaChange { + return func(e *SetEvent) { + e.Amount = &amount } } -func AddedEventMapper(event *repository.Event) (eventstore.Event, error) { - e := &AddedEvent{ +func ChangeLimit(limit bool) QuotaChange { + return func(e *SetEvent) { + e.Limit = &limit + } +} + +func ChangeFrom(from time.Time) QuotaChange { + return func(event *SetEvent) { + event.From = &from + } +} + +func ChangeResetInterval(interval time.Duration) QuotaChange { + return func(event *SetEvent) { + event.ResetInterval = &interval + } +} + +func ChangeNotifications(notifications []*SetEventNotification) QuotaChange { + return func(event *SetEvent) { + event.Notifications = ¬ifications + } +} + +func SetEventMapper(event *repository.Event) (eventstore.Event, error) { + e := &SetEvent{ BaseEvent: *eventstore.BaseEventFromRepo(event), } err := json.Unmarshal(event.Data, e) if err != nil { - return nil, errors.ThrowInternal(err, "QUOTA-4n8vs", "unable to unmarshal quota added") + return nil, errors.ThrowInternal(err, "QUOTA-kmIpI", "unable to unmarshal quota set") } return e, nil diff --git a/internal/repository/quota/eventstore.go b/internal/repository/quota/eventstore.go index 2c87bb0658..f651e58e2b 100644 --- a/internal/repository/quota/eventstore.go +++ b/internal/repository/quota/eventstore.go @@ -5,7 +5,11 @@ import ( ) func RegisterEventMappers(es *eventstore.Eventstore) { - es.RegisterFilterEventMapper(AggregateType, AddedEventType, AddedEventMapper). + // AddedEventType is not emitted anymore. + // For ease of use, old events are directly mapped to SetEvent. + // This works, because the data structures are compatible. + es.RegisterFilterEventMapper(AggregateType, AddedEventType, SetEventMapper). + RegisterFilterEventMapper(AggregateType, SetEventType, SetEventMapper). RegisterFilterEventMapper(AggregateType, RemovedEventType, RemovedEventMapper). RegisterFilterEventMapper(AggregateType, NotificationDueEventType, NotificationDueEventMapper). RegisterFilterEventMapper(AggregateType, NotifiedEventType, NotifiedEventMapper) diff --git a/proto/zitadel/system.proto b/proto/zitadel/system.proto index 0a9070475c..0194dbb232 100644 --- a/proto/zitadel/system.proto +++ b/proto/zitadel/system.proto @@ -361,6 +361,8 @@ service SystemService { } // Creates a new quota + // Returns an error if the quota already exists for the specified unit + // Deprecated: use SetQuota instead rpc AddQuota(AddQuotaRequest) returns (AddQuotaResponse) { option (google.api.http) = { post: "/instances/{instance_id}/quotas" @@ -372,6 +374,19 @@ service SystemService { }; } + // Sets quota configuration properties + // Creates a new quota if it doesn't exist for the specified unit + rpc SetQuota(SetQuotaRequest) returns (SetQuotaResponse) { + option (google.api.http) = { + put: "/instances/{instance_id}/quotas" + body: "*" + }; + + option (zitadel.v1.auth_option) = { + permission: "authenticated"; + }; + } + // Removes a quota rpc RemoveQuota(RemoveQuotaRequest) returns (RemoveQuotaResponse) { option (google.api.http) = { @@ -598,6 +613,54 @@ message AddQuotaResponse { zitadel.v1.ObjectDetails details = 1; } +message SetQuotaRequest { + string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}]; + // the unit a quota should be imposed on + zitadel.quota.v1.Unit unit = 2 [ + (validate.rules).enum = {defined_only: true, not_in: [0]}, + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the unit a quota should be imposed on"; + } + ]; + // the starting time from which the current quota period is calculated from. This is relevant for querying the current usage. + google.protobuf.Timestamp from = 3 [ + (validate.rules).timestamp.required = true, + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + example: "\"2019-04-01T08:45:00.000000Z\""; + description: "the starting time from which the current quota period is calculated from. This is relevant for querying the current usage."; + } + ]; + // the quota periods duration + google.protobuf.Duration reset_interval = 4 [ + (validate.rules).duration.required = true, + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the quota periods duration"; + } + ]; + // the quota amount of units + uint64 amount = 5 [ + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the quota amount of units"; + } + ]; + // whether ZITADEL should block further usage when the configured amount is used + bool limit = 6 [ + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "whether ZITADEL should block further usage when the configured amount is used"; + } + ]; + // the handlers, ZITADEL executes when certain quota percentages are reached + repeated zitadel.quota.v1.Notification notifications = 7 [ + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "the handlers, ZITADEL executes when certain quota percentages are reached"; + } + ]; +} + +message SetQuotaResponse { + zitadel.v1.ObjectDetails details = 1; +} + message RemoveQuotaRequest { string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}]; zitadel.quota.v1.Unit unit = 2;