From 385a55bd21a50fbf663a60e2b5472091a8ca324a Mon Sep 17 00:00:00 2001 From: Elio Bischof Date: Wed, 25 Oct 2023 13:42:00 +0200 Subject: [PATCH] feat: limit audit trail (#6744) * feat: enable limiting audit trail * support AddExclusiveQuery * fix invalid condition * register event mappers * fix NullDuration validity * test query side for limits * lint * acceptance test audit trail limit * fix acceptance test * translate limits not found * update tests * fix linting * add audit log retention to default instance * fix tests * update docs * remove todo * improve test name --- cmd/defaults.yaml | 11 +- cmd/start/start.go | 5 +- docs/docs/self-hosting/manage/production.md | 4 +- docs/docs/self-hosting/manage/quotas.md | 61 ---- .../docs/self-hosting/manage/usage_control.md | 117 +++++++ docs/sidebars.js | 2 +- e2e/config/localhost/zitadel.yaml | 11 - internal/api/grpc/admin/event.go | 2 +- internal/api/grpc/auth/server.go | 32 +- internal/api/grpc/auth/user.go | 2 +- internal/api/grpc/management/org.go | 2 +- internal/api/grpc/management/project.go | 4 +- .../grpc/management/project_application.go | 2 +- internal/api/grpc/management/server.go | 32 +- internal/api/grpc/management/user.go | 2 +- internal/api/grpc/system/limits.go | 32 ++ internal/api/grpc/system/limits_converter.go | 16 + .../grpc/system/limits_integration_test.go | 213 ++++++++++++ internal/command/command.go | 2 + internal/command/instance.go | 17 +- internal/command/limits.go | 105 ++++++ internal/command/limits_model.go | 73 ++++ internal/command/limits_test.go | 313 ++++++++++++++++++ internal/command/main_test.go | 2 + internal/database/type.go | 22 ++ internal/database/type_test.go | 57 ++++ internal/query/event.go | 56 ++-- internal/query/limits.go | 119 +++++++ internal/query/limits_test.go | 116 +++++++ internal/query/prepare_test.go | 12 + internal/query/projection/limits.go | 114 +++++++ internal/query/projection/limits_test.go | 96 ++++++ internal/query/projection/main_test.go | 2 + internal/query/projection/projection.go | 3 + internal/query/query.go | 5 + internal/query/quota_test.go | 23 +- internal/repository/limits/aggregate.go | 26 ++ internal/repository/limits/events.go | 86 +++++ internal/repository/limits/eventstore.go | 10 + internal/repository/quota/events.go | 8 - internal/static/i18n/bg.yaml | 3 + internal/static/i18n/de.yaml | 3 + internal/static/i18n/en.yaml | 3 + internal/static/i18n/es.yaml | 3 + internal/static/i18n/fr.yaml | 3 + internal/static/i18n/it.yaml | 3 + internal/static/i18n/ja.yaml | 3 + internal/static/i18n/mk.yaml | 3 + internal/static/i18n/pl.yaml | 3 + internal/static/i18n/pt.yaml | 3 + internal/static/i18n/zh.yaml | 3 + proto/zitadel/system.proto | 100 +++++- 52 files changed, 1778 insertions(+), 172 deletions(-) delete mode 100644 docs/docs/self-hosting/manage/quotas.md create mode 100644 docs/docs/self-hosting/manage/usage_control.md create mode 100644 internal/api/grpc/system/limits.go create mode 100644 internal/api/grpc/system/limits_converter.go create mode 100644 internal/api/grpc/system/limits_integration_test.go create mode 100644 internal/command/limits.go create mode 100644 internal/command/limits_model.go create mode 100644 internal/command/limits_test.go create mode 100644 internal/query/limits.go create mode 100644 internal/query/limits_test.go create mode 100644 internal/query/projection/limits.go create mode 100644 internal/query/projection/limits_test.go create mode 100644 internal/repository/limits/aggregate.go create mode 100644 internal/repository/limits/events.go create mode 100644 internal/repository/limits/eventstore.go diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 54eaaeeb6a..4baf021b9e 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -795,7 +795,11 @@ DefaultInstance: ButtonText: Login Features: - FeatureLoginDefaultOrg: true - + Limits: + # AuditLogRetention limits the number of events that can be queried via the events API by their age. + # A value of "0s" means that all events are available. + # If this value is set, it overwrites the system default unless it is not reset via the admin API. + AuditLogRetention: # ZITADEL_DEFAULTINSTANCE_LIMITS_AUDITLOGRETENTION Quotas: # Items take a slice of quota configurations, whereas, for each unit type and instance, one or zero quotas may exist. # The following unit types are supported @@ -830,7 +834,10 @@ DefaultInstance: # # CallURL is called when a relative amount of the quota is used. # CallURL: "https://httpbin.org/post" -AuditLogRetention: 0s +# AuditLogRetention limits the number of events that can be queried via the events API by their age. +# A value of "0s" means that all events are available. +# If an audit log retention is set using an instance limit, it will overwrite the system default. +AuditLogRetention: 0s # ZITADEL_AUDITLOGRETENTION InternalAuthZ: RolePermissionMappings: diff --git a/cmd/start/start.go b/cmd/start/start.go index 744e1f80ac..078075a17b 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -164,6 +164,7 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error return internal_authz.CheckPermission(ctx, &authz_es.UserMembershipRepo{Queries: q}, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID) } }, + config.AuditLogRetention, config.SystemAPIUsers, ) if err != nil { @@ -364,10 +365,10 @@ func startAPIs( if err := apis.RegisterServer(ctx, admin.CreateServer(config.Database.DatabaseName(), commands, queries, config.SystemDefaults, config.ExternalSecure, keys.User, config.AuditLogRetention)); err != nil { return err } - if err := apis.RegisterServer(ctx, management.CreateServer(commands, queries, config.SystemDefaults, keys.User, config.ExternalSecure, config.AuditLogRetention)); err != nil { + if err := apis.RegisterServer(ctx, management.CreateServer(commands, queries, config.SystemDefaults, keys.User, config.ExternalSecure)); err != nil { return err } - if err := apis.RegisterServer(ctx, auth.CreateServer(commands, queries, authRepo, config.SystemDefaults, keys.User, config.ExternalSecure, config.AuditLogRetention)); err != nil { + if err := apis.RegisterServer(ctx, auth.CreateServer(commands, queries, authRepo, config.SystemDefaults, keys.User, config.ExternalSecure)); err != nil { return err } if err := apis.RegisterService(ctx, user_v2.CreateServer(commands, queries, keys.User, keys.IDPConfig, idp.CallbackURL(config.ExternalSecure), idp.SAMLRootURL(config.ExternalSecure))); err != nil { diff --git a/docs/docs/self-hosting/manage/production.md b/docs/docs/self-hosting/manage/production.md index 14117ebcce..b4ea7f3405 100644 --- a/docs/docs/self-hosting/manage/production.md +++ b/docs/docs/self-hosting/manage/production.md @@ -209,10 +209,10 @@ DefaultInstance: - Probably, you also want to [apply your custom branding](/guides/manage/customize/branding), [hook into certain events](/guides/manage/customize/behavior), [customize texts](/guides/manage/customize/texts) or [add metadata to your users](/guides/manage/customize/user-metadata). - If you want to automatically create ZITADEL resources, you can use the [ZITADEL Terraform Provider](/guides/manage/terraform/basics). -## Quotas +## Limits and Quotas If you host ZITADEL as a service, -you might want to [limit usage and/or execute tasks on certain usage units and levels](/self-hosting/manage/quotas). +you might want to [limit usage and/or execute tasks on certain usage units and levels](/self-hosting/manage/usage_control). ## Minimum system requirements diff --git a/docs/docs/self-hosting/manage/quotas.md b/docs/docs/self-hosting/manage/quotas.md deleted file mode 100644 index 23659242b4..0000000000 --- a/docs/docs/self-hosting/manage/quotas.md +++ /dev/null @@ -1,61 +0,0 @@ ---- -title: Usage Quotas in ZITADEL -sidebar_label: Usage Quotas ---- - -Quotas is an enterprise feature that is relevant if you want to host ZITADEL as a service. -It enables you to limit usage and/or register webhooks that trigger on configurable usage levels for certain units. -For example, you might want to report usage to an external billing tool and notify users when 80 percent of a quota is exhausted. -Quotas are currently supported [for the instance level only](/concepts/structure/instance). -Please refer to the [system API docs](/apis/resources/system) for detailed explanations about how to use the quotas feature. - -ZITADEL supports limiting authenticated requests and action run seconds - -## Authenticated Requests - -For using the quotas feature for authenticated requests you have to enable the database logstore for access logs in your ZITADEL configurations LogStore section: - -```yaml -LogStore: - Access: - Database: - # If enabled, all access logs are stored in the database table logstore.access - Enabled: false - # Logs that are older than the keep duration are cleaned up continuously - Keep: 2160h # 90 days - # CleanupInterval defines the time between cleanup iterations - CleanupInterval: 4h - # Debouncing enables to asynchronously emit log entries, so the normal execution performance is not impaired - # Log entries are held in-memory until one of the conditions MinFrequency or MaxBulkSize meets. - Debounce: - MinFrequency: 2m - MaxBulkSize: 100 -``` - -If a quota is configured to limit requests and the quotas amount is exhausted, all further requests are blocked except requests to the System API. -Also, a cookie is set, to make it easier to block further traffic before it reaches your ZITADEL runtime. - -## Action Run Seconds - -For using the quotas feature for action run seconds you have to enable the database logstore for execution logs in your ZITADEL configurations LogStore section: - -```yaml -LogStore: - Execution: - Database: - # If enabled, all action execution logs are stored in the database table logstore.execution - Enabled: false - # Logs that are older than the keep duration are cleaned up continuously - Keep: 2160h # 90 days - # CleanupInterval defines the time between cleanup iterations - CleanupInterval: 4h - # Debouncing enables to asynchronously emit log entries, so the normal execution performance is not impaired - # Log entries are held in-memory until one of the conditions MinFrequency or MaxBulkSize meets. - Debounce: - MinFrequency: 0s - MaxBulkSize: 0 -``` - -If a quota is configured to limit action run seconds and the quotas amount is exhausted, all further actions will fail immediately with a context timeout exceeded error. -The action that runs into the limit also fails with the context timeout exceeded error. - diff --git a/docs/docs/self-hosting/manage/usage_control.md b/docs/docs/self-hosting/manage/usage_control.md new file mode 100644 index 0000000000..af646e46e3 --- /dev/null +++ b/docs/docs/self-hosting/manage/usage_control.md @@ -0,0 +1,117 @@ +--- +title: Usage Control +sidebar_label: Usage Control +--- + +If you have a self-hosted ZITADEL environment, you can limit the usage of your [instances](/concepts/structure/instance). +For example, if you provide your customers [their own virtual instances](/concepts/structure/instance#multiple-virtual-instances) with access on their own domains, you can design a pricing model based on the usage of their instances. +The usage control features are currently limited to the instance level only. + +## Limit Audit Trails + +You can restrict the maximum age of events returned by the following APIs: + +- [Events Search](/apis/resources/admin/admin-service-list-events), See also the [Event API guide](guides/integrate/event-api) +- [My User History](/apis/resources/auth/auth-service-list-my-user-changes) +- [A Users History](/apis/resources/mgmt/management-service-list-user-changes) +- [An Applications History](/apis/resources/mgmt/management-service-list-app-changes) +- [An Organizations History](/apis/resources/mgmt/management-service-list-org-changes) +- [A Projects History](/apis/resources/mgmt/management-service-list-project-changes) +- [A Project Grants History](/apis/resources/mgmt/management-service-list-project-grant-changes) + +You can set a global default limit as well as a default limit [for new virtual instances](/concepts/structure/instance#multiple-virtual-instances) in the ZITADEL configuration. +The following snippets shows the defaults: + +```yaml +# AuditLogRetention limits the number of events that can be queried via the events API by their age. +# A value of "0s" means that all events are available. +# If an audit log retention is set using an instance limit, it will overwrite the system default. +AuditLogRetention: 0s # ZITADEL_AUDITLOGRETENTION +DefaultInstance: + Limits: + # AuditLogRetention limits the number of events that can be queried via the events API by their age. + # A value of "0s" means that all events are available. + # If this value is set, it overwrites the system default unless it is not reset via the admin API. + AuditLogRetention: # ZITADEL_DEFAULTINSTANCE_LIMITS_AUDITLOGRETENTION +``` + +You can also set a limit for [a specific virtual instance](/concepts/structure/instance#multiple-virtual-instances) using the [system API](/category/apis/resources/system/limits). + +## Quotas + +Quotas enables you to limit usage and/or register webhooks that trigger on configurable usage levels for certain units. +For example, you might want to report usage to an external billing tool and notify users when 80 percent of a quota is exhausted. + +ZITADEL supports limiting authenticated requests and action run seconds with quotas. + +For using the quotas feature you have to activate it in your ZITADEL configurations *Quotas* section. +The following snippets shows the defaults: + +```yaml +Quotas: + Access: + # If enabled, authenticated requests are counted and potentially limited depending on the configured quota of the instance + Enabled: false # ZITADEL_QUOTAS_ACCESS_ENABLED + Debounce: + MinFrequency: 0s # ZITADEL_QUOTAS_ACCESS_DEBOUNCE_MINFREQUENCY + MaxBulkSize: 0 # ZITADEL_QUOTAS_ACCESS_DEBOUNCE_MAXBULKSIZE + ExhaustedCookieKey: "zitadel.quota.exhausted" # ZITADEL_QUOTAS_ACCESS_EXHAUSTEDCOOKIEKEY + ExhaustedCookieMaxAge: "300s" # ZITADEL_QUOTAS_ACCESS_EXHAUSTEDCOOKIEMAXAGE + Execution: + # If enabled, all action executions are counted and potentially limited depending on the configured quota of the instance + Enabled: false # ZITADEL_QUOTAS_EXECUTION_DATABASE_ENABLED + Debounce: + MinFrequency: 0s # ZITADEL_QUOTAS_EXECUTION_DEBOUNCE_MINFREQUENCY + MaxBulkSize: 0 # ZITADEL_QUOTAS_EXECUTION_DEBOUNCE_MAXBULKSIZE +``` + +Once you have activated the quotas feature, you can configure quotas [for your virtual instances](/concepts/structure/instance#multiple-virtual-instances) using the [system API](/category/apis/resources/system/quotas) or the *DefaultInstances.Quotas* section. +The following snippets shows the defaults: + +```yaml +DefaultInstance: + Quotas: + # Items take a slice of quota configurations, whereas, for each unit type and instance, one or zero quotas may exist. + # The following unit types are supported + + # "requests.all.authenticated" + # The sum of all requests to the ZITADEL API with an authorization header, + # excluding the following exceptions + # - Calls to the System API + # - Calls that cause internal server errors + # - Failed authorizations + # - Requests after the quota already exceeded + + # "actions.all.runs.seconds" + # The sum of all actions run durations in seconds + Items: +# - Unit: "requests.all.authenticated" +# # From defines the starting time from which the current quota period is calculated. +# # This is relevant for querying the current usage. +# From: "2023-01-01T00:00:00Z" +# # ResetInterval defines the quota periods duration +# ResetInterval: 720h # 30 days +# # Amount defines the number of units for this quota +# Amount: 25000 +# # Limit defines whether ZITADEL should block further usage when the configured amount is used +# Limit: false +# # Notifications are emitted by ZITADEL when certain quota percentages are reached +# Notifications: +# # Percent defines the relative amount of used units, after which a notification should be emitted. +# - Percent: 100 +# # Repeat defines, whether a notification should be emitted each time when a multitude of the configured Percent is used. +# Repeat: true +# # CallURL is called when a relative amount of the quota is used. +# CallURL: "https://httpbin.org/post" +``` + +### Exhausted Authenticated Requests + +If a quota is configured to limit requests and the quotas amount is exhausted, all further requests are blocked except requests to the System API. +Also, a cookie is set, to make it easier to block further traffic before it reaches your ZITADEL runtime. + +### Exhausted Action Run Seconds + +If a quota is configured to limit action run seconds and the quotas amount is exhausted, all further actions will fail immediately with a context timeout exceeded error. +The action that runs into the limit also fails with the context timeout exceeded error. + diff --git a/docs/sidebars.js b/docs/sidebars.js index a2ba179c80..10c5e87961 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -664,7 +664,7 @@ module.exports = { "self-hosting/manage/tls_modes", "self-hosting/manage/database/database", "self-hosting/manage/updating_scaling", - "self-hosting/manage/quotas" + "self-hosting/manage/usage_control" ], }, ], diff --git a/e2e/config/localhost/zitadel.yaml b/e2e/config/localhost/zitadel.yaml index cfafaa637f..aa051ac112 100644 --- a/e2e/config/localhost/zitadel.yaml +++ b/e2e/config/localhost/zitadel.yaml @@ -40,17 +40,6 @@ Quotas: DefaultInstance: LoginPolicy: MfaInitSkipLifetime: "0" - Quotas: - Items: - - Unit: "actions.all.runs.seconds" - From: "2023-01-01T00:00:00Z" - ResetInterval: 5m - Amount: 20 - Limit: false - Notifications: - - Percent: 100 - Repeat: true - CallURL: "https://httpbin.org/post" SystemAPIUsers: - cypress: diff --git a/internal/api/grpc/admin/event.go b/internal/api/grpc/admin/event.go index 2c7a2fe63f..434290d8ea 100644 --- a/internal/api/grpc/admin/event.go +++ b/internal/api/grpc/admin/event.go @@ -17,7 +17,7 @@ func (s *Server) ListEvents(ctx context.Context, in *admin_pb.ListEventsRequest) if err != nil { return nil, err } - events, err := s.query.SearchEvents(ctx, filter, s.auditLogRetention) + events, err := s.query.SearchEvents(ctx, filter) if err != nil { return nil, err } diff --git a/internal/api/grpc/auth/server.go b/internal/api/grpc/auth/server.go index 16fb66022f..015c8ce83f 100644 --- a/internal/api/grpc/auth/server.go +++ b/internal/api/grpc/auth/server.go @@ -2,7 +2,6 @@ package auth import ( "context" - "time" "google.golang.org/grpc" @@ -26,14 +25,13 @@ const ( type Server struct { auth.UnimplementedAuthServiceServer - command *command.Commands - query *query.Queries - repo repository.Repository - defaults systemdefaults.SystemDefaults - assetsAPIDomain func(context.Context) string - userCodeAlg crypto.EncryptionAlgorithm - externalSecure bool - auditLogRetention time.Duration + command *command.Commands + query *query.Queries + repo repository.Repository + defaults systemdefaults.SystemDefaults + assetsAPIDomain func(context.Context) string + userCodeAlg crypto.EncryptionAlgorithm + externalSecure bool } type Config struct { @@ -46,17 +44,15 @@ func CreateServer(command *command.Commands, defaults systemdefaults.SystemDefaults, userCodeAlg crypto.EncryptionAlgorithm, externalSecure bool, - auditLogRetention time.Duration, ) *Server { return &Server{ - command: command, - query: query, - repo: authRepo, - defaults: defaults, - assetsAPIDomain: assets.AssetAPI(externalSecure), - userCodeAlg: userCodeAlg, - externalSecure: externalSecure, - auditLogRetention: auditLogRetention, + command: command, + query: query, + repo: authRepo, + defaults: defaults, + assetsAPIDomain: assets.AssetAPI(externalSecure), + userCodeAlg: userCodeAlg, + externalSecure: externalSecure, } } diff --git a/internal/api/grpc/auth/user.go b/internal/api/grpc/auth/user.go index 11ecb95cf8..87a146efe6 100644 --- a/internal/api/grpc/auth/user.go +++ b/internal/api/grpc/auth/user.go @@ -84,7 +84,7 @@ func (s *Server) ListMyUserChanges(ctx context.Context, req *auth_pb.ListMyUserC query.OrderAsc() } - changes, err := s.query.SearchEvents(ctx, query, s.auditLogRetention) + changes, err := s.query.SearchEvents(ctx, query) if err != nil { return nil, err } diff --git a/internal/api/grpc/management/org.go b/internal/api/grpc/management/org.go index 5879cd9718..157537260a 100644 --- a/internal/api/grpc/management/org.go +++ b/internal/api/grpc/management/org.go @@ -63,7 +63,7 @@ func (s *Server) ListOrgChanges(ctx context.Context, req *mgmt_pb.ListOrgChanges query.OrderAsc() } - response, err := s.query.SearchEvents(ctx, query, s.auditLogRetention) + response, err := s.query.SearchEvents(ctx, query) if err != nil { return nil, err } diff --git a/internal/api/grpc/management/project.go b/internal/api/grpc/management/project.go index 9d02dc7a58..fc43227c95 100644 --- a/internal/api/grpc/management/project.go +++ b/internal/api/grpc/management/project.go @@ -87,7 +87,7 @@ func (s *Server) ListProjectGrantChanges(ctx context.Context, req *mgmt_pb.ListP query.OrderAsc() } - changes, err := s.query.SearchEvents(ctx, query, s.auditLogRetention) + changes, err := s.query.SearchEvents(ctx, query) if err != nil { return nil, err } @@ -166,7 +166,7 @@ func (s *Server) ListProjectChanges(ctx context.Context, req *mgmt_pb.ListProjec query.OrderAsc() } - changes, err := s.query.SearchEvents(ctx, query, s.auditLogRetention) + changes, err := s.query.SearchEvents(ctx, query) if err != nil { return nil, err } diff --git a/internal/api/grpc/management/project_application.go b/internal/api/grpc/management/project_application.go index ff053089ed..ef18563c1e 100644 --- a/internal/api/grpc/management/project_application.go +++ b/internal/api/grpc/management/project_application.go @@ -70,7 +70,7 @@ func (s *Server) ListAppChanges(ctx context.Context, req *mgmt_pb.ListAppChanges query.OrderAsc() } - changes, err := s.query.SearchEvents(ctx, query, s.auditLogRetention) + changes, err := s.query.SearchEvents(ctx, query) if err != nil { return nil, err } diff --git a/internal/api/grpc/management/server.go b/internal/api/grpc/management/server.go index 12413e1bc8..b0f9879278 100644 --- a/internal/api/grpc/management/server.go +++ b/internal/api/grpc/management/server.go @@ -2,7 +2,6 @@ package management import ( "context" - "time" "google.golang.org/grpc" @@ -24,14 +23,13 @@ var _ management.ManagementServiceServer = (*Server)(nil) type Server struct { management.UnimplementedManagementServiceServer - command *command.Commands - query *query.Queries - systemDefaults systemdefaults.SystemDefaults - assetAPIPrefix func(context.Context) string - passwordHashAlg crypto.HashAlgorithm - userCodeAlg crypto.EncryptionAlgorithm - externalSecure bool - auditLogRetention time.Duration + command *command.Commands + query *query.Queries + systemDefaults systemdefaults.SystemDefaults + assetAPIPrefix func(context.Context) string + passwordHashAlg crypto.HashAlgorithm + userCodeAlg crypto.EncryptionAlgorithm + externalSecure bool } func CreateServer( @@ -40,17 +38,15 @@ func CreateServer( sd systemdefaults.SystemDefaults, userCodeAlg crypto.EncryptionAlgorithm, externalSecure bool, - auditLogRetention time.Duration, ) *Server { return &Server{ - command: command, - query: query, - systemDefaults: sd, - assetAPIPrefix: assets.AssetAPI(externalSecure), - passwordHashAlg: crypto.NewBCrypt(sd.SecretGenerators.PasswordSaltCost), - userCodeAlg: userCodeAlg, - externalSecure: externalSecure, - auditLogRetention: auditLogRetention, + command: command, + query: query, + systemDefaults: sd, + assetAPIPrefix: assets.AssetAPI(externalSecure), + passwordHashAlg: crypto.NewBCrypt(sd.SecretGenerators.PasswordSaltCost), + userCodeAlg: userCodeAlg, + externalSecure: externalSecure, } } diff --git a/internal/api/grpc/management/user.go b/internal/api/grpc/management/user.go index c61832169a..33b5606141 100644 --- a/internal/api/grpc/management/user.go +++ b/internal/api/grpc/management/user.go @@ -109,7 +109,7 @@ func (s *Server) ListUserChanges(ctx context.Context, req *mgmt_pb.ListUserChang query.OrderAsc() } - changes, err := s.query.SearchEvents(ctx, query, s.auditLogRetention) + changes, err := s.query.SearchEvents(ctx, query) if err != nil { return nil, err } diff --git a/internal/api/grpc/system/limits.go b/internal/api/grpc/system/limits.go new file mode 100644 index 0000000000..f41ddb2231 --- /dev/null +++ b/internal/api/grpc/system/limits.go @@ -0,0 +1,32 @@ +package system + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/grpc/object" + "github.com/zitadel/zitadel/pkg/grpc/system" +) + +func (s *Server) SetLimits(ctx context.Context, req *system.SetLimitsRequest) (*system.SetLimitsResponse, error) { + details, err := s.command.SetLimits( + ctx, + req.GetInstanceId(), + instanceLimitsPbToCommand(req), + ) + if err != nil { + return nil, err + } + return &system.SetLimitsResponse{ + Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), + }, nil +} + +func (s *Server) ResetLimits(ctx context.Context, req *system.ResetLimitsRequest) (*system.ResetLimitsResponse, error) { + details, err := s.command.ResetLimits(ctx, req.GetInstanceId()) + if err != nil { + return nil, err + } + return &system.ResetLimitsResponse{ + Details: object.ChangeToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner), + }, nil +} diff --git a/internal/api/grpc/system/limits_converter.go b/internal/api/grpc/system/limits_converter.go new file mode 100644 index 0000000000..de7f330475 --- /dev/null +++ b/internal/api/grpc/system/limits_converter.go @@ -0,0 +1,16 @@ +package system + +import ( + "github.com/muhlemmer/gu" + + "github.com/zitadel/zitadel/internal/command" + "github.com/zitadel/zitadel/pkg/grpc/system" +) + +func instanceLimitsPbToCommand(req *system.SetLimitsRequest) *command.SetLimits { + var setLimits = new(command.SetLimits) + if req.AuditLogRetention != nil { + setLimits.AuditLogRetention = gu.Ptr(req.AuditLogRetention.AsDuration()) + } + return setLimits +} diff --git a/internal/api/grpc/system/limits_integration_test.go b/internal/api/grpc/system/limits_integration_test.go new file mode 100644 index 0000000000..e2480d0c0c --- /dev/null +++ b/internal/api/grpc/system/limits_integration_test.go @@ -0,0 +1,213 @@ +//go:build integration + +package system_test + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" + + "github.com/zitadel/zitadel/pkg/grpc/admin" + "github.com/zitadel/zitadel/pkg/grpc/auth" + "github.com/zitadel/zitadel/pkg/grpc/management" + "github.com/zitadel/zitadel/pkg/grpc/system" +) + +func TestServer_Limits_AuditLogRetention(t *testing.T) { + _, instanceID, iamOwnerCtx := Tester.UseIsolatedInstance(CTX, SystemCTX) + userID, projectID, appID, projectGrantID := seedObjects(iamOwnerCtx, t) + beforeTime := time.Now() + zeroCounts := &eventCounts{} + seededCount := requireEventually(t, iamOwnerCtx, userID, projectID, appID, projectGrantID, func(c assert.TestingT, counts *eventCounts) { + counts.assertAll(t, c, "seeded events are > 0", assert.Greater, zeroCounts) + }, "wait for seeded event assertions to pass") + produceEvents(iamOwnerCtx, t, userID, appID, projectID, projectGrantID) + addedCount := requireEventually(t, iamOwnerCtx, userID, projectID, appID, projectGrantID, func(c assert.TestingT, counts *eventCounts) { + counts.assertAll(t, c, "added events are > seeded events", assert.Greater, seededCount) + }, "wait for added event assertions to pass") + _, err := Tester.Client.System.SetLimits(SystemCTX, &system.SetLimitsRequest{ + InstanceId: instanceID, + AuditLogRetention: durationpb.New(time.Now().Sub(beforeTime)), + }) + require.NoError(t, err) + requireEventually(t, iamOwnerCtx, userID, projectID, appID, projectGrantID, func(c assert.TestingT, counts *eventCounts) { + counts.assertAll(t, c, "limited events < added events", assert.Less, addedCount) + counts.assertAll(t, c, "limited events > 0", assert.Greater, zeroCounts) + }, "wait for limited event assertions to pass") + _, err = Tester.Client.System.ResetLimits(SystemCTX, &system.ResetLimitsRequest{ + InstanceId: instanceID, + }) + require.NoError(t, err) + requireEventually(t, iamOwnerCtx, userID, projectID, appID, projectGrantID, func(c assert.TestingT, counts *eventCounts) { + counts.assertAll(t, c, "with reset limit, added events are > seeded events", assert.Greater, seededCount) + }, "wait for reset event assertions to pass") +} + +func requireEventually( + t *testing.T, + ctx context.Context, + userID, projectID, appID, projectGrantID string, + assertCounts func(assert.TestingT, *eventCounts), + msg string, +) (counts *eventCounts) { + countTimeout := 30 * time.Second + assertTimeout := countTimeout + time.Second + countCtx, cancel := context.WithTimeout(ctx, countTimeout) + defer cancel() + require.EventuallyWithT(t, func(c *assert.CollectT) { + counts = countEvents(countCtx, t, userID, projectID, appID, projectGrantID) + assertCounts(c, counts) + }, assertTimeout, time.Second, msg) + return counts +} + +var runes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randomString(resourceType string, n int) string { + b := make([]rune, n) + for i := range b { + b[i] = runes[rand.Intn(len(runes))] + } + return "test" + resourceType + "-" + string(b) +} + +func seedObjects(ctx context.Context, t *testing.T) (string, string, string, string) { + t.Helper() + project, err := Tester.Client.Mgmt.AddProject(ctx, &management.AddProjectRequest{ + Name: randomString("project", 5), + }) + require.NoError(t, err) + app, err := Tester.Client.Mgmt.AddOIDCApp(ctx, &management.AddOIDCAppRequest{ + Name: randomString("app", 5), + ProjectId: project.GetId(), + }) + org, err := Tester.Client.Mgmt.AddOrg(ctx, &management.AddOrgRequest{ + Name: randomString("org", 5), + }) + require.NoError(t, err) + role := randomString("role", 5) + require.NoError(t, err) + _, err = Tester.Client.Mgmt.AddProjectRole(ctx, &management.AddProjectRoleRequest{ + ProjectId: project.GetId(), + RoleKey: role, + DisplayName: role, + }) + require.NoError(t, err) + projectGrant, err := Tester.Client.Mgmt.AddProjectGrant(ctx, &management.AddProjectGrantRequest{ + ProjectId: project.GetId(), + GrantedOrgId: org.GetId(), + RoleKeys: []string{role}, + }) + require.NoError(t, err) + user, err := Tester.Client.Auth.GetMyUser(ctx, &auth.GetMyUserRequest{}) + require.NoError(t, err) + userID := user.GetUser().GetId() + requireUserEvent(ctx, t, userID) + return userID, project.GetId(), app.GetAppId(), projectGrant.GetGrantId() +} + +func produceEvents(ctx context.Context, t *testing.T, machineID, appID, projectID, grantID string) { + t.Helper() + _, err := Tester.Client.Mgmt.UpdateOrg(ctx, &management.UpdateOrgRequest{ + Name: randomString("org", 5), + }) + require.NoError(t, err) + _, err = Tester.Client.Mgmt.UpdateProject(ctx, &management.UpdateProjectRequest{ + Id: projectID, + Name: randomString("project", 5), + }) + require.NoError(t, err) + _, err = Tester.Client.Mgmt.UpdateApp(ctx, &management.UpdateAppRequest{ + AppId: appID, + ProjectId: projectID, + Name: randomString("app", 5), + }) + require.NoError(t, err) + requireUserEvent(ctx, t, machineID) + _, err = Tester.Client.Mgmt.UpdateProjectGrant(ctx, &management.UpdateProjectGrantRequest{ + ProjectId: projectID, + GrantId: grantID, + }) + require.NoError(t, err) +} + +func requireUserEvent(ctx context.Context, t *testing.T, machineID string) { + _, err := Tester.Client.Mgmt.UpdateMachine(ctx, &management.UpdateMachineRequest{ + UserId: machineID, + Name: randomString("machine", 5), + }) + require.NoError(t, err) +} + +type eventCounts struct { + all, myUser, aUser, grant, project, app, org int +} + +func (e *eventCounts) assertAll(t *testing.T, c assert.TestingT, name string, compare assert.ComparisonAssertionFunc, than *eventCounts) { + t.Run(name, func(t *testing.T) { + compare(c, e.all, than.all, "ListEvents") + compare(c, e.myUser, than.myUser, "ListMyUserChanges") + compare(c, e.aUser, than.aUser, "ListUserChanges") + compare(c, e.grant, than.grant, "ListProjectGrantChanges") + compare(c, e.project, than.project, "ListProjectChanges") + compare(c, e.app, than.app, "ListAppChanges") + compare(c, e.org, than.org, "ListOrgChanges") + }) +} + +func countEvents(ctx context.Context, t *testing.T, userID, projectID, appID, grantID string) *eventCounts { + t.Helper() + counts := new(eventCounts) + var wg sync.WaitGroup + wg.Add(7) + go func() { + defer wg.Done() + result, err := Tester.Client.Admin.ListEvents(ctx, &admin.ListEventsRequest{}) + require.NoError(t, err) + counts.all = len(result.GetEvents()) + }() + go func() { + defer wg.Done() + result, err := Tester.Client.Auth.ListMyUserChanges(ctx, &auth.ListMyUserChangesRequest{}) + require.NoError(t, err) + counts.myUser = len(result.GetResult()) + }() + go func() { + defer wg.Done() + result, err := Tester.Client.Mgmt.ListUserChanges(ctx, &management.ListUserChangesRequest{UserId: userID}) + require.NoError(t, err) + counts.aUser = len(result.GetResult()) + }() + go func() { + defer wg.Done() + result, err := Tester.Client.Mgmt.ListAppChanges(ctx, &management.ListAppChangesRequest{ProjectId: projectID, AppId: appID}) + require.NoError(t, err) + counts.app = len(result.GetResult()) + }() + go func() { + defer wg.Done() + result, err := Tester.Client.Mgmt.ListOrgChanges(ctx, &management.ListOrgChangesRequest{}) + require.NoError(t, err) + counts.org = len(result.GetResult()) + }() + go func() { + defer wg.Done() + result, err := Tester.Client.Mgmt.ListProjectChanges(ctx, &management.ListProjectChangesRequest{ProjectId: projectID}) + require.NoError(t, err) + counts.project = len(result.GetResult()) + }() + go func() { + defer wg.Done() + result, err := Tester.Client.Mgmt.ListProjectGrantChanges(ctx, &management.ListProjectGrantChangesRequest{ProjectId: projectID, GrantId: grantID}) + require.NoError(t, err) + counts.grant = len(result.GetResult()) + }() + wg.Wait() + return counts +} diff --git a/internal/command/command.go b/internal/command/command.go index 6288bbd234..5589965a39 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -26,6 +26,7 @@ import ( "github.com/zitadel/zitadel/internal/repository/idpintent" instance_repo "github.com/zitadel/zitadel/internal/repository/instance" "github.com/zitadel/zitadel/internal/repository/keypair" + "github.com/zitadel/zitadel/internal/repository/limits" "github.com/zitadel/zitadel/internal/repository/milestone" "github.com/zitadel/zitadel/internal/repository/oidcsession" "github.com/zitadel/zitadel/internal/repository/org" @@ -150,6 +151,7 @@ func StartCommands( keypair.RegisterEventMappers(repo.eventstore) action.RegisterEventMappers(repo.eventstore) quota.RegisterEventMappers(repo.eventstore) + limits.RegisterEventMappers(repo.eventstore) session.RegisterEventMappers(repo.eventstore) idpintent.RegisterEventMappers(repo.eventstore) authrequest.RegisterEventMappers(repo.eventstore) diff --git a/internal/command/instance.go b/internal/command/instance.go index f82252f5a3..4c5bb2faa5 100644 --- a/internal/command/instance.go +++ b/internal/command/instance.go @@ -17,6 +17,7 @@ import ( "github.com/zitadel/zitadel/internal/notification/channels/smtp" "github.com/zitadel/zitadel/internal/repository/feature" "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/repository/limits" "github.com/zitadel/zitadel/internal/repository/org" "github.com/zitadel/zitadel/internal/repository/project" "github.com/zitadel/zitadel/internal/repository/quota" @@ -114,6 +115,9 @@ type InstanceSetup struct { Items []*SetQuota } Features map[domain.Feature]any + Limits *struct { + AuditLogRetention *time.Duration + } } type SecretGenerators struct { @@ -135,6 +139,7 @@ type ZitadelConfig struct { adminAppID string authAppID string consoleAppID string + limitsID string } func (s *InstanceSetup) generateIDs(idGenerator id.Generator) (err error) { @@ -159,7 +164,10 @@ func (s *InstanceSetup) generateIDs(idGenerator id.Generator) (err error) { } s.zitadel.consoleAppID, err = idGenerator.Next() - + if err != nil { + return err + } + s.zitadel.limitsID, err = idGenerator.Next() return err } @@ -190,6 +198,7 @@ func (c *Commands) SetUpInstance(ctx context.Context, setup *InstanceSetup) (str orgAgg := org.NewAggregate(orgID) userAgg := user.NewAggregate(userID, orgID) projectAgg := project.NewAggregate(setup.zitadel.projectID, orgID) + limitsAgg := limits.NewAggregate(setup.zitadel.limitsID, instanceID, instanceID) validations := []preparation.Validation{ prepareAddInstance(instanceAgg, setup.InstanceName, setup.DefaultLanguage), @@ -441,6 +450,12 @@ func (c *Commands) SetUpInstance(ctx context.Context, setup *InstanceSetup) (str } } + if setup.Limits != nil { + validations = append(validations, c.SetLimitsCommand(limitsAgg, &limitsWriteModel{}, &SetLimits{ + AuditLogRetention: setup.Limits.AuditLogRetention, + })) + } + cmds, err := preparation.PrepareCommands(ctx, c.eventstore.Filter, validations...) if err != nil { return "", "", nil, nil, err diff --git a/internal/command/limits.go b/internal/command/limits.go new file mode 100644 index 0000000000..5c7cdd8ee5 --- /dev/null +++ b/internal/command/limits.go @@ -0,0 +1,105 @@ +package command + +import ( + "context" + "time" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/command/preparation" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/repository/limits" +) + +type SetLimits struct { + AuditLogRetention *time.Duration `json:"AuditLogRetention,omitempty"` +} + +// SetLimits creates new limits or updates existing limits. +func (c *Commands) SetLimits( + ctx context.Context, + resourceOwner string, + setLimits *SetLimits, +) (*domain.ObjectDetails, error) { + instanceId := authz.GetInstance(ctx).InstanceID() + wm, err := c.getLimitsWriteModel(ctx, instanceId, resourceOwner) + if err != nil { + return nil, err + } + aggregateId := wm.AggregateID + if aggregateId == "" { + aggregateId, err = c.idGenerator.Next() + if err != nil { + return nil, err + } + } + if err != nil { + return nil, err + } + createCmds, err := c.SetLimitsCommand(limits.NewAggregate(aggregateId, instanceId, resourceOwner), wm, setLimits)() + if err != nil { + return nil, err + } + cmds, err := createCmds(ctx, nil) + if len(cmds) > 0 { + events, err := c.eventstore.Push(ctx, cmds...) + if err != nil { + return nil, err + } + err = AppendAndReduce(wm, events...) + if err != nil { + return nil, err + } + } + return writeModelToObjectDetails(&wm.WriteModel), nil +} + +func (c *Commands) ResetLimits(ctx context.Context, resourceOwner string) (*domain.ObjectDetails, error) { + instanceId := authz.GetInstance(ctx).InstanceID() + wm, err := c.getLimitsWriteModel(ctx, instanceId, resourceOwner) + if err != nil { + return nil, err + } + if wm.AggregateID == "" { + return nil, errors.ThrowNotFound(nil, "COMMAND-9JToT", "Errors.Limits.NotFound") + } + aggregate := limits.NewAggregate(wm.AggregateID, instanceId, resourceOwner) + events := []eventstore.Command{limits.NewResetEvent(ctx, &aggregate.Aggregate)} + pushedEvents, err := c.eventstore.Push(ctx, events...) + if err != nil { + return nil, err + } + err = AppendAndReduce(wm, pushedEvents...) + if err != nil { + return nil, err + } + return writeModelToObjectDetails(&wm.WriteModel), nil +} + +func (c *Commands) getLimitsWriteModel(ctx context.Context, instanceId, resourceOwner string) (*limitsWriteModel, error) { + wm := newLimitsWriteModel(instanceId, resourceOwner) + return wm, c.eventstore.FilterToQueryReducer(ctx, wm) +} + +func (c *Commands) SetLimitsCommand(a *limits.Aggregate, wm *limitsWriteModel, setLimits *SetLimits) preparation.Validation { + return func() (preparation.CreateCommands, error) { + if setLimits == nil || setLimits.AuditLogRetention == nil { + return nil, errors.ThrowInvalidArgument(nil, "COMMAND-4M9vs", "Errors.Limits.NoneSpecified") + } + return func(ctx context.Context, _ preparation.FilterToQueryReducer) ([]eventstore.Command, error) { + changes := wm.NewChanges(setLimits) + if len(changes) == 0 { + return nil, nil + } + return []eventstore.Command{limits.NewSetEvent( + eventstore.NewBaseEventForPush( + ctx, + &a.Aggregate, + limits.SetEventType, + ), + changes..., + )}, nil + }, nil + } +} diff --git a/internal/command/limits_model.go b/internal/command/limits_model.go new file mode 100644 index 0000000000..528c0873fa --- /dev/null +++ b/internal/command/limits_model.go @@ -0,0 +1,73 @@ +package command + +import ( + "time" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/repository/limits" +) + +type limitsWriteModel struct { + eventstore.WriteModel + rollingAggregateID string + auditLogRetention *time.Duration +} + +// newLimitsWriteModel aggregateId is filled by reducing unit matching events +func newLimitsWriteModel(instanceId, resourceOwner string) *limitsWriteModel { + return &limitsWriteModel{ + WriteModel: eventstore.WriteModel{ + InstanceID: instanceId, + ResourceOwner: resourceOwner, + }, + } +} + +func (wm *limitsWriteModel) Query() *eventstore.SearchQueryBuilder { + query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + ResourceOwner(wm.ResourceOwner). + InstanceID(wm.InstanceID). + AddQuery(). + AggregateTypes(limits.AggregateType). + EventTypes( + limits.SetEventType, + limits.ResetEventType, + ) + + return query.Builder() +} + +func (wm *limitsWriteModel) Reduce() error { + for _, event := range wm.Events { + wm.ChangeDate = event.CreatedAt() + switch e := event.(type) { + case *limits.SetEvent: + wm.rollingAggregateID = e.Aggregate().ID + if e.AuditLogRetention != nil { + wm.auditLogRetention = e.AuditLogRetention + } + case *limits.ResetEvent: + wm.rollingAggregateID = "" + wm.auditLogRetention = nil + } + } + if err := wm.WriteModel.Reduce(); err != nil { + return err + } + // wm.WriteModel.Reduce() sets the aggregateID to the first event's aggregateID, but we need the last one + wm.AggregateID = wm.rollingAggregateID + return nil +} + +// NewChanges returns all changes that need to be applied to the aggregate. +// nil properties in setLimits are ignored +func (wm *limitsWriteModel) NewChanges(setLimits *SetLimits) (changes []limits.LimitsChange) { + if setLimits == nil { + return nil + } + changes = make([]limits.LimitsChange, 0, 1) + if setLimits.AuditLogRetention != nil && (wm.auditLogRetention == nil || *wm.auditLogRetention != *setLimits.AuditLogRetention) { + changes = append(changes, limits.ChangeAuditLogRetention(setLimits.AuditLogRetention)) + } + return changes +} diff --git a/internal/command/limits_test.go b/internal/command/limits_test.go new file mode 100644 index 0000000000..1b315ca130 --- /dev/null +++ b/internal/command/limits_test.go @@ -0,0 +1,313 @@ +package command + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/muhlemmer/gu" + "github.com/stretchr/testify/assert" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/domain" + caos_errors "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/id" + id_mock "github.com/zitadel/zitadel/internal/id/mock" + "github.com/zitadel/zitadel/internal/repository/limits" +) + +func TestLimits_SetLimits(t *testing.T) { + type fields func(*testing.T) (*eventstore.Eventstore, id.Generator) + type args struct { + ctx context.Context + resourceOwner string + setLimits *SetLimits + } + type res struct { + want *domain.ObjectDetails + err func(error) bool + } + tests := []struct { + name string + fields fields + args args + res res + }{ + { + name: "create limits, ok", + fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) { + return eventstoreExpect( + t, + expectFilter(), + expectPush( + eventFromEventPusherWithInstanceID( + "instance1", + limits.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + limits.SetEventType, + ), + limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)), + ), + ), + ), + ), + id_mock.NewIDGeneratorExpectIDs(t, "limits1") + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "instance1"), + resourceOwner: "instance1", + setLimits: &SetLimits{ + AuditLogRetention: gu.Ptr(time.Hour), + }, + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "instance1", + }, + }, + }, + { + name: "update limits, ok", + fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) { + return eventstoreExpect( + t, + expectFilter( + eventFromEventPusher( + limits.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + limits.SetEventType, + ), + limits.ChangeAuditLogRetention(gu.Ptr(time.Minute)), + ), + ), + ), + expectPush( + eventFromEventPusherWithInstanceID( + "instance1", + limits.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + limits.SetEventType, + ), + limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)), + ), + ), + ), + ), + nil + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "instance1"), + resourceOwner: "instance1", + setLimits: &SetLimits{ + AuditLogRetention: gu.Ptr(time.Hour), + }, + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "instance1", + }, + }, + }, + { + name: "set limits after resetting limits, ok", + fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) { + return eventstoreExpect( + t, + expectFilter( + eventFromEventPusher( + limits.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + limits.SetEventType, + ), + limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)), + ), + ), + eventFromEventPusher( + limits.NewResetEvent( + context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + ), + ), + ), + expectPush( + eventFromEventPusherWithInstanceID( + "instance1", + limits.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + &limits.NewAggregate("limits2", "instance1", "instance1").Aggregate, + limits.SetEventType, + ), + limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)), + ), + ), + ), + ), + id_mock.NewIDGeneratorExpectIDs(t, "limits2") + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "instance1"), + resourceOwner: "instance1", + setLimits: &SetLimits{ + AuditLogRetention: gu.Ptr(time.Hour), + }, + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "instance1", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := new(Commands) + r.eventstore, r.idGenerator = tt.fields(t) + got, err := r.SetLimits(tt.args.ctx, tt.args.resourceOwner, tt.args.setLimits) + if tt.res.err == nil { + assert.NoError(t, err) + } + if tt.res.err != nil && !tt.res.err(err) { + t.Errorf("got wrong err: %v ", err) + } + if tt.res.err == nil { + assert.Equal(t, tt.res.want, got) + } + }) + } +} + +func TestLimits_ResetLimits(t *testing.T) { + type fields func(*testing.T) *eventstore.Eventstore + type args struct { + ctx context.Context + resourceOwner string + } + type res struct { + want *domain.ObjectDetails + err func(error) bool + } + tests := []struct { + name string + fields fields + args args + res res + }{ + { + name: "not found", + fields: func(tt *testing.T) *eventstore.Eventstore { + return eventstoreExpect( + tt, + expectFilter(), + ) + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "instance1"), + resourceOwner: "instance1", + }, + res: res{ + err: func(err error) bool { + return errors.Is(err, caos_errors.ThrowNotFound(nil, "COMMAND-9JToT", "Errors.Limits.NotFound")) + }, + }, + }, + { + name: "already removed", + fields: func(tt *testing.T) *eventstore.Eventstore { + return eventstoreExpect( + t, + expectFilter( + eventFromEventPusher( + limits.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + limits.SetEventType, + ), + limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)), + ), + ), + eventFromEventPusher( + limits.NewResetEvent(context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + ), + ), + ), + ) + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "instance1"), + resourceOwner: "instance1", + }, + res: res{ + err: func(err error) bool { + return errors.Is(err, caos_errors.ThrowNotFound(nil, "COMMAND-9JToT", "Errors.Limits.NotFound")) + }, + }, + }, + { + name: "reset limits, ok", + fields: func(tt *testing.T) *eventstore.Eventstore { + return eventstoreExpect( + t, + expectFilter( + eventFromEventPusher( + limits.NewSetEvent( + eventstore.NewBaseEventForPush( + context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + limits.SetEventType, + ), + limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)), + ), + ), + ), + expectPush( + eventFromEventPusherWithInstanceID( + "instance1", + limits.NewResetEvent(context.Background(), + &limits.NewAggregate("limits1", "instance1", "instance1").Aggregate, + ), + ), + ), + ) + }, + args: args{ + ctx: authz.WithInstanceID(context.Background(), "instance1"), + resourceOwner: "instance1", + }, + res: res{ + want: &domain.ObjectDetails{ + ResourceOwner: "instance1", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Commands{ + eventstore: tt.fields(t), + } + got, err := r.ResetLimits(tt.args.ctx, tt.args.resourceOwner) + if tt.res.err == nil { + assert.NoError(t, err) + } + if tt.res.err != nil && !tt.res.err(err) { + t.Errorf("got wrong err: %v ", err) + } + if tt.res.err == nil { + assert.Equal(t, tt.res.want, got) + } + }) + } +} diff --git a/internal/command/main_test.go b/internal/command/main_test.go index c830409489..5458811eb7 100644 --- a/internal/command/main_test.go +++ b/internal/command/main_test.go @@ -24,6 +24,7 @@ import ( "github.com/zitadel/zitadel/internal/repository/idpintent" iam_repo "github.com/zitadel/zitadel/internal/repository/instance" key_repo "github.com/zitadel/zitadel/internal/repository/keypair" + "github.com/zitadel/zitadel/internal/repository/limits" "github.com/zitadel/zitadel/internal/repository/oidcsession" "github.com/zitadel/zitadel/internal/repository/org" proj_repo "github.com/zitadel/zitadel/internal/repository/project" @@ -58,6 +59,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore { authrequest.RegisterEventMappers(es) oidcsession.RegisterEventMappers(es) quota_repo.RegisterEventMappers(es) + limits.RegisterEventMappers(es) feature.RegisterEventMappers(es) return es } diff --git a/internal/database/type.go b/internal/database/type.go index 2608dc4f40..1bd5436419 100644 --- a/internal/database/type.go +++ b/internal/database/type.go @@ -103,3 +103,25 @@ func (d *Duration) Scan(src any) error { *d = Duration(time.Duration(interval.Microseconds*1000) + time.Duration(interval.Days)*24*time.Hour + time.Duration(interval.Months)*30*24*time.Hour) return nil } + +// NullDuration can be used for NULL intervals. +// If Valid is false, the scanned value was NULL +// This behavior is similar to [database/sql.NullString] +type NullDuration struct { + Valid bool + Duration time.Duration +} + +// Scan implements the [database/sql.Scanner] interface. +func (d *NullDuration) Scan(src any) error { + if src == nil { + d.Duration, d.Valid = 0, false + return nil + } + duration := new(Duration) + if err := duration.Scan(src); err != nil { + return err + } + d.Duration, d.Valid = time.Duration(*duration), true + return nil +} diff --git a/internal/database/type_test.go b/internal/database/type_test.go index 9ac777dab6..f0c308a184 100644 --- a/internal/database/type_test.go +++ b/internal/database/type_test.go @@ -3,6 +3,7 @@ package database import ( "database/sql/driver" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -118,6 +119,62 @@ func TestMap_Value(t *testing.T) { } } +func TestNullDuration_Scan(t *testing.T) { + type args struct { + src any + } + type res struct { + want NullDuration + err bool + } + type testCase struct { + name string + args args + res res + } + tests := []testCase{ + { + "invalid", + args{src: "invalid"}, + res{ + want: NullDuration{ + Valid: false, + }, + err: true, + }, + }, + { + "null", + args{src: nil}, + res{ + want: NullDuration{ + Valid: false, + }, + err: false, + }, + }, + { + "valid", + args{src: "1:0:0"}, + res{ + want: NullDuration{ + Valid: true, + Duration: time.Hour, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := new(NullDuration) + if err := d.Scan(tt.args.src); (err != nil) != tt.res.err { + t.Errorf("Scan() error = %v, wantErr %v", err, tt.res.err) + } + assert.Equal(t, tt.res.want, *d) + }) + } +} + func TestArray_ScanInt32(t *testing.T) { type args struct { src any diff --git a/internal/query/event.go b/internal/query/event.go index 63ffa3fe4a..c387c045a8 100644 --- a/internal/query/event.go +++ b/internal/query/event.go @@ -4,7 +4,9 @@ import ( "context" "time" + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/call" + "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/telemetry/tracing" ) @@ -26,33 +28,45 @@ type EventEditor struct { AvatarKey string } -func (q *Queries) SearchEvents(ctx context.Context, query *eventstore.SearchQueryBuilder, auditLogRetention time.Duration) (_ []*Event, err error) { - ctx, span := tracing.NewSpan(ctx) - defer func() { span.EndWithError(err) }() - events, err := q.eventstore.Filter(ctx, query.AllowTimeTravel()) - if err != nil { - return nil, err - } - - if auditLogRetention != 0 { - events = filterAuditLogRetention(ctx, events, auditLogRetention) - } - - return q.convertEvents(ctx, events), nil +type eventsReducer struct { + ctx context.Context + q *Queries + events []*Event } -func filterAuditLogRetention(ctx context.Context, events []eventstore.Event, auditLogRetention time.Duration) []eventstore.Event { +func (r *eventsReducer) AppendEvents(events ...eventstore.Event) { + r.events = append(r.events, r.q.convertEvents(r.ctx, events)...) +} + +func (r *eventsReducer) Reduce() error { return nil } + +func (q *Queries) SearchEvents(ctx context.Context, query *eventstore.SearchQueryBuilder) (_ []*Event, err error) { + ctx, span := tracing.NewSpan(ctx) + defer func() { span.EndWithError(err) }() + auditLogRetention := q.defaultAuditLogRetention + instanceLimits, err := q.Limits(ctx, authz.GetInstance(ctx).InstanceID()) + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + if instanceLimits != nil && instanceLimits.AuditLogRetention != nil { + auditLogRetention = *instanceLimits.AuditLogRetention + } + if auditLogRetention != 0 { + query = filterAuditLogRetention(ctx, auditLogRetention, query) + } + reducer := &eventsReducer{ctx: ctx, q: q} + if err = q.eventstore.FilterToReducer(ctx, query, reducer); err != nil { + return nil, err + } + return reducer.events, nil +} + +func filterAuditLogRetention(ctx context.Context, auditLogRetention time.Duration, builder *eventstore.SearchQueryBuilder) *eventstore.SearchQueryBuilder { callTime := call.FromContext(ctx) if callTime.IsZero() { callTime = time.Now() } - filteredEvents := make([]eventstore.Event, 0, len(events)) - for _, event := range events { - if event.CreatedAt().After(callTime.Add(-auditLogRetention)) { - filteredEvents = append(filteredEvents, event) - } - } - return filteredEvents + return builder.CreationDateAfter(callTime.Add(-auditLogRetention)) } func (q *Queries) SearchEventTypes(ctx context.Context) []string { diff --git a/internal/query/limits.go b/internal/query/limits.go new file mode 100644 index 0000000000..cf9635026d --- /dev/null +++ b/internal/query/limits.go @@ -0,0 +1,119 @@ +package query + +import ( + "context" + "database/sql" + errs "errors" + "time" + + sq "github.com/Masterminds/squirrel" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/api/call" + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/telemetry/tracing" +) + +var ( + limitSettingsTable = table{ + name: projection.LimitsProjectionTable, + instanceIDCol: projection.LimitsColumnInstanceID, + } + LimitsColumnAggregateID = Column{ + name: projection.LimitsColumnAggregateID, + table: limitSettingsTable, + } + LimitsColumnCreationDate = Column{ + name: projection.LimitsColumnCreationDate, + table: limitSettingsTable, + } + LimitsColumnChangeDate = Column{ + name: projection.LimitsColumnChangeDate, + table: limitSettingsTable, + } + LimitsColumnResourceOwner = Column{ + name: projection.LimitsColumnResourceOwner, + table: limitSettingsTable, + } + LimitsColumnInstanceID = Column{ + name: projection.LimitsColumnInstanceID, + table: limitSettingsTable, + } + LimitsColumnSequence = Column{ + name: projection.LimitsColumnSequence, + table: limitSettingsTable, + } + LimitsColumnAuditLogRetention = Column{ + name: projection.LimitsColumnAuditLogRetention, + table: limitSettingsTable, + } +) + +type Limits struct { + AggregateID string + CreationDate time.Time + ChangeDate time.Time + ResourceOwner string + Sequence uint64 + + AuditLogRetention *time.Duration +} + +func (q *Queries) Limits(ctx context.Context, resourceOwner string) (limits *Limits, err error) { + ctx, span := tracing.NewSpan(ctx) + defer func() { span.EndWithError(err) }() + + stmt, scan := prepareLimitsQuery(ctx, q.client) + query, args, err := stmt.Where(sq.Eq{ + LimitsColumnInstanceID.identifier(): authz.GetInstance(ctx).InstanceID(), + LimitsColumnResourceOwner.identifier(): resourceOwner, + }).ToSql() + if err != nil { + return nil, errors.ThrowInternal(err, "QUERY-jJe80", "Errors.Query.SQLStatment") + } + + err = q.client.QueryRowContext(ctx, func(row *sql.Row) error { + limits, err = scan(row) + return err + }, query, args...) + return limits, err +} + +func prepareLimitsQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(*sql.Row) (*Limits, error)) { + return sq.Select( + LimitsColumnAggregateID.identifier(), + LimitsColumnCreationDate.identifier(), + LimitsColumnChangeDate.identifier(), + LimitsColumnResourceOwner.identifier(), + LimitsColumnSequence.identifier(), + LimitsColumnAuditLogRetention.identifier(), + ). + From(limitSettingsTable.identifier() + db.Timetravel(call.Took(ctx))). + PlaceholderFormat(sq.Dollar), + func(row *sql.Row) (*Limits, error) { + var ( + limits = new(Limits) + auditLogRetention database.NullDuration + ) + err := row.Scan( + &limits.AggregateID, + &limits.CreationDate, + &limits.ChangeDate, + &limits.ResourceOwner, + &limits.Sequence, + &auditLogRetention, + ) + if err != nil { + if errs.Is(err, sql.ErrNoRows) { + return nil, errors.ThrowNotFound(err, "QUERY-GU1em", "Errors.Limits.NotFound") + } + return nil, errors.ThrowInternal(err, "QUERY-00jgy", "Errors.Internal") + } + if auditLogRetention.Valid { + limits.AuditLogRetention = &auditLogRetention.Duration + } + return limits, nil + } +} diff --git a/internal/query/limits_test.go b/internal/query/limits_test.go new file mode 100644 index 0000000000..84e6e70e52 --- /dev/null +++ b/internal/query/limits_test.go @@ -0,0 +1,116 @@ +package query + +import ( + "database/sql" + "database/sql/driver" + "errors" + "fmt" + "regexp" + "testing" + "time" + + "github.com/muhlemmer/gu" + + errs "github.com/zitadel/zitadel/internal/errors" +) + +var ( + expectedLimitsQuery = regexp.QuoteMeta("SELECT projections.limits.aggregate_id," + + " projections.limits.creation_date," + + " projections.limits.change_date," + + " projections.limits.resource_owner," + + " projections.limits.sequence," + + " projections.limits.audit_log_retention" + + " FROM projections.limits" + + " AS OF SYSTEM TIME '-1 ms'", + ) + + limitsCols = []string{ + "aggregate_id", + "creation_date", + "change_date", + "resource_owner", + "sequence", + "audit_log_retention", + } +) + +func Test_LimitsPrepare(t *testing.T) { + type want struct { + sqlExpectations sqlExpectation + err checkErr + } + tests := []struct { + name string + prepare interface{} + want want + object interface{} + }{ + { + name: "prepareLimitsQuery no result", + prepare: prepareLimitsQuery, + want: want{ + sqlExpectations: mockQueriesScanErr( + expectedLimitsQuery, + nil, + nil, + ), + err: func(err error) (error, bool) { + if !errs.IsNotFound(err) { + return fmt.Errorf("err should be zitadel.NotFoundError got: %w", err), false + } + return nil, true + }, + }, + object: (*Limits)(nil), + }, + { + name: "prepareLimitsQuery", + prepare: prepareLimitsQuery, + want: want{ + sqlExpectations: mockQuery( + expectedLimitsQuery, + limitsCols, + []driver.Value{ + "limits1", + testNow, + testNow, + "instance1", + 0, + intervalDriverValue(t, time.Hour), + }, + ), + }, + object: &Limits{ + AggregateID: "limits1", + CreationDate: testNow, + ChangeDate: testNow, + ResourceOwner: "instance1", + Sequence: 0, + AuditLogRetention: gu.Ptr(time.Hour), + }, + }, + { + name: "prepareLimitsQuery sql err", + prepare: prepareLimitsQuery, + want: want{ + sqlExpectations: mockQueryErr( + expectedLimitsQuery, + sql.ErrConnDone, + ), + err: func(err error) (error, bool) { + if !errors.Is(err, sql.ErrConnDone) { + return fmt.Errorf("err should be sql.ErrConnDone got: %w", err), false + } + return nil, true + }, + }, + object: (*Limits)(nil), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assertPrepare(t, tt.prepare, tt.object, tt.want.sqlExpectations, tt.want.err, defaultPrepareArgs...) + }) + } +} diff --git a/internal/query/prepare_test.go b/internal/query/prepare_test.go index c9770c0dd1..dadf95f000 100644 --- a/internal/query/prepare_test.go +++ b/internal/query/prepare_test.go @@ -13,13 +13,16 @@ import ( "github.com/DATA-DOG/go-sqlmock" sq "github.com/Masterminds/squirrel" + "github.com/jackc/pgtype" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/zitadel/zitadel/internal/database" ) var ( testNow = time.Now() + dayNow = testNow.Truncate(24 * time.Hour) ) // assertPrepare checks if the prepare func executes the correct sql query and returns the correct object @@ -385,6 +388,15 @@ func TestValidatePrepare(t *testing.T) { } } +func intervalDriverValue(t *testing.T, src time.Duration) pgtype.Interval { + interval := pgtype.Interval{} + err := interval.Set(src) + if err != nil { + t.Fatal(err) + } + return interval +} + type prepareDB struct{} const asOfSystemTime = " AS OF SYSTEM TIME '-1 ms' " diff --git a/internal/query/projection/limits.go b/internal/query/projection/limits.go new file mode 100644 index 0000000000..65487a9429 --- /dev/null +++ b/internal/query/projection/limits.go @@ -0,0 +1,114 @@ +package projection + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" + old_handler "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/handler/v2" + "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/repository/limits" +) + +const ( + LimitsProjectionTable = "projections.limits" + + LimitsColumnAggregateID = "aggregate_id" + LimitsColumnCreationDate = "creation_date" + LimitsColumnChangeDate = "change_date" + LimitsColumnResourceOwner = "resource_owner" + LimitsColumnInstanceID = "instance_id" + LimitsColumnSequence = "sequence" + + LimitsColumnAuditLogRetention = "audit_log_retention" +) + +type limitsProjection struct{} + +func newLimitsProjection(ctx context.Context, config handler.Config) *handler.Handler { + return handler.NewHandler(ctx, &config, &limitsProjection{}) +} + +func (*limitsProjection) Name() string { + return LimitsProjectionTable +} + +func (*limitsProjection) Init() *old_handler.Check { + return handler.NewTableCheck( + handler.NewTable([]*handler.InitColumn{ + handler.NewColumn(LimitsColumnAggregateID, handler.ColumnTypeText), + handler.NewColumn(LimitsColumnCreationDate, handler.ColumnTypeTimestamp), + handler.NewColumn(LimitsColumnChangeDate, handler.ColumnTypeTimestamp), + handler.NewColumn(LimitsColumnResourceOwner, handler.ColumnTypeText), + handler.NewColumn(LimitsColumnInstanceID, handler.ColumnTypeText), + handler.NewColumn(LimitsColumnSequence, handler.ColumnTypeInt64), + handler.NewColumn(LimitsColumnAuditLogRetention, handler.ColumnTypeInterval, handler.Nullable()), + }, + handler.NewPrimaryKey(LimitsColumnInstanceID, LimitsColumnResourceOwner), + ), + ) +} + +func (p *limitsProjection) Reducers() []handler.AggregateReducer { + return []handler.AggregateReducer{ + { + Aggregate: limits.AggregateType, + EventReducers: []handler.EventReducer{ + { + Event: limits.SetEventType, + Reduce: p.reduceLimitsSet, + }, + { + Event: limits.ResetEventType, + Reduce: p.reduceLimitsReset, + }, + }, + }, + { + Aggregate: instance.AggregateType, + EventReducers: []handler.EventReducer{ + { + Event: instance.InstanceRemovedEventType, + Reduce: reduceInstanceRemovedHelper(LimitsColumnInstanceID), + }, + }, + }, + } +} + +func (p *limitsProjection) reduceLimitsSet(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*limits.SetEvent](event) + if err != nil { + return nil, err + } + conflictCols := []handler.Column{ + handler.NewCol(LimitsColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(LimitsColumnResourceOwner, e.Aggregate().ResourceOwner), + } + updateCols := []handler.Column{ + handler.NewCol(LimitsColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(LimitsColumnResourceOwner, e.Aggregate().ResourceOwner), + handler.NewCol(LimitsColumnCreationDate, e.CreationDate()), + handler.NewCol(LimitsColumnChangeDate, e.CreationDate()), + handler.NewCol(LimitsColumnSequence, e.Sequence()), + handler.NewCol(LimitsColumnAggregateID, e.Aggregate().ID), + } + if e.AuditLogRetention != nil { + updateCols = append(updateCols, handler.NewCol(LimitsColumnAuditLogRetention, *e.AuditLogRetention)) + } + return handler.NewUpsertStatement(e, conflictCols, updateCols), nil +} + +func (p *limitsProjection) reduceLimitsReset(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*limits.ResetEvent](event) + if err != nil { + return nil, err + } + return handler.NewDeleteStatement( + e, + []handler.Condition{ + handler.NewCond(LimitsColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCond(LimitsColumnResourceOwner, e.Aggregate().ResourceOwner), + }, + ), nil +} diff --git a/internal/query/projection/limits_test.go b/internal/query/projection/limits_test.go new file mode 100644 index 0000000000..0277e29243 --- /dev/null +++ b/internal/query/projection/limits_test.go @@ -0,0 +1,96 @@ +package projection + +import ( + "testing" + "time" + + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler/v2" + "github.com/zitadel/zitadel/internal/repository/limits" +) + +func TestLimitsProjection_reduces(t *testing.T) { + type args struct { + event func(t *testing.T) eventstore.Event + } + tests := []struct { + name string + args args + reduce func(event eventstore.Event) (*handler.Statement, error) + want wantReduce + }{ + { + name: "reduceLimitsSet", + args: args{ + event: getEvent(testEvent( + limits.SetEventType, + limits.AggregateType, + []byte(`{ + "auditLogRetention": 300000000000 + }`), + ), limits.SetEventMapper), + }, + reduce: (&limitsProjection{}).reduceLimitsSet, + want: wantReduce{ + aggregateType: eventstore.AggregateType("limits"), + sequence: 15, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "INSERT INTO projections.limits (instance_id, resource_owner, creation_date, change_date, sequence, aggregate_id, audit_log_retention) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, resource_owner) DO UPDATE SET (creation_date, change_date, sequence, aggregate_id, audit_log_retention) = (EXCLUDED.creation_date, EXCLUDED.change_date, EXCLUDED.sequence, EXCLUDED.aggregate_id, EXCLUDED.audit_log_retention)", + expectedArgs: []interface{}{ + "instance-id", + "ro-id", + anyArg{}, + anyArg{}, + uint64(15), + "agg-id", + time.Minute * 5, + }, + }, + }, + }, + }, + }, + + { + name: "reduceLimitsReset", + args: args{ + event: getEvent(testEvent( + limits.ResetEventType, + limits.AggregateType, + []byte(`{}`), + ), limits.ResetEventMapper), + }, + reduce: (&limitsProjection{}).reduceLimitsReset, + want: wantReduce{ + aggregateType: eventstore.AggregateType("limits"), + sequence: 15, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "DELETE FROM projections.limits WHERE (instance_id = $1) AND (resource_owner = $2)", + expectedArgs: []interface{}{ + "instance-id", + "ro-id", + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + event := baseEvent(t) + got, err := tt.reduce(event) + if !errors.IsErrorInvalidArgument(err) { + t.Errorf("no wrong event mapping: %v, got: %v", err, got) + } + event = tt.args.event(t) + got, err = tt.reduce(event) + assertReduce(t, got, err, LimitsProjectionTable, tt.want) + }) + } +} diff --git a/internal/query/projection/main_test.go b/internal/query/projection/main_test.go index 09754b140e..460e8f07d5 100644 --- a/internal/query/projection/main_test.go +++ b/internal/query/projection/main_test.go @@ -11,6 +11,7 @@ import ( action_repo "github.com/zitadel/zitadel/internal/repository/action" iam_repo "github.com/zitadel/zitadel/internal/repository/instance" key_repo "github.com/zitadel/zitadel/internal/repository/keypair" + "github.com/zitadel/zitadel/internal/repository/limits" "github.com/zitadel/zitadel/internal/repository/org" proj_repo "github.com/zitadel/zitadel/internal/repository/project" quota_repo "github.com/zitadel/zitadel/internal/repository/quota" @@ -36,6 +37,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore { usr_repo.RegisterEventMappers(es) proj_repo.RegisterEventMappers(es) quota_repo.RegisterEventMappers(es) + limits.RegisterEventMappers(es) usergrant.RegisterEventMappers(es) key_repo.RegisterEventMappers(es) action_repo.RegisterEventMappers(es) diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 845395ec27..c95c60c6d5 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -69,6 +69,7 @@ var ( AuthRequestProjection *handler.Handler MilestoneProjection *handler.Handler QuotaProjection *quotaProjection + LimitsProjection *handler.Handler ) type projection interface { @@ -141,6 +142,7 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, AuthRequestProjection = newAuthRequestProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["auth_requests"])) MilestoneProjection = newMilestoneProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["milestones"]), systemUsers) QuotaProjection = newQuotaProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["quotas"])) + LimitsProjection = newLimitsProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["limits"])) newProjectionsList() return nil } @@ -244,5 +246,6 @@ func newProjectionsList() { AuthRequestProjection, MilestoneProjection, QuotaProjection.handler, + LimitsProjection, } } diff --git a/internal/query/query.go b/internal/query/query.go index 2265183e1a..cb50ce1fcd 100644 --- a/internal/query/query.go +++ b/internal/query/query.go @@ -24,6 +24,7 @@ import ( "github.com/zitadel/zitadel/internal/repository/idpintent" iam_repo "github.com/zitadel/zitadel/internal/repository/instance" "github.com/zitadel/zitadel/internal/repository/keypair" + "github.com/zitadel/zitadel/internal/repository/limits" "github.com/zitadel/zitadel/internal/repository/oidcsession" "github.com/zitadel/zitadel/internal/repository/org" "github.com/zitadel/zitadel/internal/repository/project" @@ -50,6 +51,7 @@ type Queries struct { supportedLangs []language.Tag zitadelRoles []authz.RoleMapping multifactors domain.MultifactorConfigs + defaultAuditLogRetention time.Duration } func StartQueries( @@ -62,6 +64,7 @@ func StartQueries( zitadelRoles []authz.RoleMapping, sessionTokenVerifier func(ctx context.Context, sessionToken string, sessionID string, tokenID string) (err error), permissionCheck func(q *Queries) domain.PermissionCheck, + defaultAuditLogRetention time.Duration, systemAPIUsers map[string]*internal_authz.SystemAPIUser, ) (repo *Queries, err error) { statikLoginFS, err := fs.NewWithNamespace("login") @@ -84,6 +87,7 @@ func StartQueries( NotificationTranslationFileContents: make(map[string][]byte), zitadelRoles: zitadelRoles, sessionTokenVerifier: sessionTokenVerifier, + defaultAuditLogRetention: defaultAuditLogRetention, } iam_repo.RegisterEventMappers(repo.eventstore) usr_repo.RegisterEventMappers(repo.eventstore) @@ -97,6 +101,7 @@ func StartQueries( authrequest.RegisterEventMappers(repo.eventstore) oidcsession.RegisterEventMappers(repo.eventstore) quota.RegisterEventMappers(repo.eventstore) + limits.RegisterEventMappers(repo.eventstore) repo.idpConfigEncryption = idpConfigEncryption repo.multifactors = domain.MultifactorConfigs{ diff --git a/internal/query/quota_test.go b/internal/query/quota_test.go index 05bddc8031..c96af861a0 100644 --- a/internal/query/quota_test.go +++ b/internal/query/quota_test.go @@ -9,8 +9,6 @@ import ( "testing" "time" - "github.com/jackc/pgtype" - errs "github.com/zitadel/zitadel/internal/errors" ) @@ -33,19 +31,6 @@ var ( } ) -func dayNow() time.Time { - return time.Now().Truncate(24 * time.Hour) -} - -func interval(t *testing.T, src time.Duration) pgtype.Interval { - interval := pgtype.Interval{} - err := interval.Set(src) - if err != nil { - t.Fatal(err) - } - return interval -} - func Test_QuotaPrepare(t *testing.T) { type want struct { sqlExpectations sqlExpectation @@ -84,8 +69,8 @@ func Test_QuotaPrepare(t *testing.T) { quotaCols, []driver.Value{ "quota-id", - dayNow(), - interval(t, time.Hour*24), + dayNow, + intervalDriverValue(t, time.Hour*24), uint64(1000), true, testNow, @@ -94,9 +79,9 @@ func Test_QuotaPrepare(t *testing.T) { }, object: &Quota{ ID: "quota-id", - From: dayNow(), + From: dayNow, ResetInterval: time.Hour * 24, - CurrentPeriodStart: dayNow(), + CurrentPeriodStart: dayNow, Amount: 1000, Limit: true, }, diff --git a/internal/repository/limits/aggregate.go b/internal/repository/limits/aggregate.go new file mode 100644 index 0000000000..37aa38618f --- /dev/null +++ b/internal/repository/limits/aggregate.go @@ -0,0 +1,26 @@ +package limits + +import ( + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + AggregateType = "limits" + AggregateVersion = "v1" +) + +type Aggregate struct { + eventstore.Aggregate +} + +func NewAggregate(id, instanceId, resourceOwner string) *Aggregate { + return &Aggregate{ + Aggregate: eventstore.Aggregate{ + Type: AggregateType, + Version: AggregateVersion, + ID: id, + InstanceID: instanceId, + ResourceOwner: resourceOwner, + }, + } +} diff --git a/internal/repository/limits/events.go b/internal/repository/limits/events.go new file mode 100644 index 0000000000..47bde7cffc --- /dev/null +++ b/internal/repository/limits/events.go @@ -0,0 +1,86 @@ +package limits + +import ( + "context" + "time" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + eventTypePrefix = eventstore.EventType("limits.") + SetEventType = eventTypePrefix + "set" + ResetEventType = eventTypePrefix + "reset" +) + +// SetEvent describes that limits are added or modified and contains only changed properties +type SetEvent struct { + *eventstore.BaseEvent `json:"-"` + AuditLogRetention *time.Duration `json:"auditLogRetention,omitempty"` +} + +func (e *SetEvent) Payload() any { + return e +} + +func (e *SetEvent) UniqueConstraints() []*eventstore.UniqueConstraint { + return nil +} + +func (e *SetEvent) SetBaseEvent(b *eventstore.BaseEvent) { + e.BaseEvent = b +} + +func NewSetEvent( + base *eventstore.BaseEvent, + changes ...LimitsChange, +) *SetEvent { + changedEvent := &SetEvent{ + BaseEvent: base, + } + for _, change := range changes { + change(changedEvent) + } + return changedEvent +} + +type LimitsChange func(*SetEvent) + +func ChangeAuditLogRetention(auditLogRetention *time.Duration) LimitsChange { + return func(e *SetEvent) { + e.AuditLogRetention = auditLogRetention + } +} + +var SetEventMapper = eventstore.GenericEventMapper[SetEvent] + +type ResetEvent struct { + *eventstore.BaseEvent `json:"-"` +} + +func (e *ResetEvent) Payload() any { + return e +} + +func (e *ResetEvent) UniqueConstraints() []*eventstore.UniqueConstraint { + return nil +} + +func (e *ResetEvent) SetBaseEvent(b *eventstore.BaseEvent) { + e.BaseEvent = b +} + +func NewResetEvent( + ctx context.Context, + aggregate *eventstore.Aggregate, +) *ResetEvent { + return &ResetEvent{ + BaseEvent: eventstore.NewBaseEventForPush( + ctx, + aggregate, + ResetEventType, + ), + } +} + +var ResetEventMapper = eventstore.GenericEventMapper[ResetEvent] diff --git a/internal/repository/limits/eventstore.go b/internal/repository/limits/eventstore.go new file mode 100644 index 0000000000..5c2ab16c4f --- /dev/null +++ b/internal/repository/limits/eventstore.go @@ -0,0 +1,10 @@ +package limits + +import ( + "github.com/zitadel/zitadel/internal/eventstore" +) + +func RegisterEventMappers(es *eventstore.Eventstore) { + es.RegisterFilterEventMapper(AggregateType, SetEventType, SetEventMapper). + RegisterFilterEventMapper(AggregateType, ResetEventType, ResetEventMapper) +} diff --git a/internal/repository/quota/events.go b/internal/repository/quota/events.go index b52226689a..616f61faed 100644 --- a/internal/repository/quota/events.go +++ b/internal/repository/quota/events.go @@ -27,14 +27,6 @@ const ( ActionsAllRunsSeconds ) -func NewAddQuotaUnitUniqueConstraint(unit Unit) *eventstore.UniqueConstraint { - return eventstore.NewAddEventUniqueConstraint( - UniqueQuotaNameType, - strconv.FormatUint(uint64(unit), 10), - "Errors.Quota.AlreadyExists", - ) -} - func NewRemoveQuotaNameUniqueConstraint(unit Unit) *eventstore.UniqueConstraint { return eventstore.NewRemoveUniqueConstraint( UniqueQuotaNameType, diff --git a/internal/static/i18n/bg.yaml b/internal/static/i18n/bg.yaml index b516fcadb6..9c5784e3bd 100644 --- a/internal/static/i18n/bg.yaml +++ b/internal/static/i18n/bg.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: Обектът не можа да бъде премахнат Limit: ExceedsDefault: Лимитът надвишава лимита по подразбиране + Limits: + NotFound: Лимитът не е намерен + NoneSpecified: Не са посочени лимити Language: NotParsed: Езикът не можа да бъде анализиран синтактично OIDCSettings: diff --git a/internal/static/i18n/de.yaml b/internal/static/i18n/de.yaml index 7610d69401..3fe394ce08 100644 --- a/internal/static/i18n/de.yaml +++ b/internal/static/i18n/de.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: Objekt konnte nicht gelöscht werden Limit: ExceedsDefault: Limit überschreitet default Limit + Limits: + NotFound: Limits konnten nicht gefunden werden + NoneSpecified: Keine Limits angegeben Language: NotParsed: Sprache konnte nicht gemapped werden OIDCSettings: diff --git a/internal/static/i18n/en.yaml b/internal/static/i18n/en.yaml index 0c694f463e..17a7dea05e 100644 --- a/internal/static/i18n/en.yaml +++ b/internal/static/i18n/en.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: Object could not be removed Limit: ExceedsDefault: Limit exceeds default limit + Limits: + NotFound: Limits not found + NoneSpecified: No limits specified Language: NotParsed: Could not parse language OIDCSettings: diff --git a/internal/static/i18n/es.yaml b/internal/static/i18n/es.yaml index 9776a61f23..e4e62c72af 100644 --- a/internal/static/i18n/es.yaml +++ b/internal/static/i18n/es.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: El objeto no pudo eliminarse Limit: ExceedsDefault: El límite excede el límite por defecto + Limits: + NotFound: Límite no encontrado + NoneSpecified: No se especificaron límites Language: NotParsed: No pude analizar el idioma OIDCSettings: diff --git a/internal/static/i18n/fr.yaml b/internal/static/i18n/fr.yaml index b72a5ba1f3..4defb4e449 100644 --- a/internal/static/i18n/fr.yaml +++ b/internal/static/i18n/fr.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: L'objet n'a pas pu être retiré Limit: ExceedsDefault: La limite dépasse la limite par défaut + Limits: + NotFound: Limites non trouvée + NoneSpecified: Aucune limite spécifiée Language: NotParsed: Impossible d'analyser la langue OIDCSettings: diff --git a/internal/static/i18n/it.yaml b/internal/static/i18n/it.yaml index 43891039ea..b8be789620 100644 --- a/internal/static/i18n/it.yaml +++ b/internal/static/i18n/it.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: L'oggetto non può essere rimosso Limit: ExceedsDefault: Il limite supera quello predefinito + Limits: + NotFound: Limite non trovato + NoneSpecified: Nessun limite specificato Language: NotParsed: Impossibile analizzare la lingua OIDCSettings: diff --git a/internal/static/i18n/ja.yaml b/internal/static/i18n/ja.yaml index 72a7aa9d32..d615c1c9b5 100644 --- a/internal/static/i18n/ja.yaml +++ b/internal/static/i18n/ja.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: オブジェクトの削除に失敗しました Limit: ExceedsDefault: デフォルトの制限を超えています + Limits: + NotFound: 制限が見つかりません + NoneSpecified: 制限が指定されていません Language: NotParsed: 言語のパースに失敗しました OIDCSettings: diff --git a/internal/static/i18n/mk.yaml b/internal/static/i18n/mk.yaml index ae27d818b2..e6ee3052d7 100644 --- a/internal/static/i18n/mk.yaml +++ b/internal/static/i18n/mk.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: Објектот не може да се отстрани Limit: ExceedsDefault: Лимитот го надминува стандардниот лимит + Limits: + NotFound: Лимитот не е пронајден + NoneSpecified: Не се наведени лимити Language: NotParsed: Јазикот не може да се парсира OIDCSettings: diff --git a/internal/static/i18n/pl.yaml b/internal/static/i18n/pl.yaml index 9e7a55ef0f..600c3feac4 100644 --- a/internal/static/i18n/pl.yaml +++ b/internal/static/i18n/pl.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: Obiekt nie mógł zostać usunięty Limit: ExceedsDefault: Limit przekracza domyślny limit + Limits: + NotFound: Limit nie znaleziony + NoneSpecified: Nie określono limitów Language: NotParsed: Nie można przeanalizować języka OIDCSettings: diff --git a/internal/static/i18n/pt.yaml b/internal/static/i18n/pt.yaml index 71448e1a89..ae911dcc6a 100644 --- a/internal/static/i18n/pt.yaml +++ b/internal/static/i18n/pt.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: Não foi possível remover o objeto Limit: ExceedsDefault: Limite excede o limite padrão + Limits: + NotFound: Limite não encontrado + NoneSpecified: Nenhum limite especificado Language: NotParsed: Não foi possível analisar o idioma OIDCSettings: diff --git a/internal/static/i18n/zh.yaml b/internal/static/i18n/zh.yaml index 526e67f055..28631eba4e 100644 --- a/internal/static/i18n/zh.yaml +++ b/internal/static/i18n/zh.yaml @@ -28,6 +28,9 @@ Errors: RemoveFailed: 无法移除对象 Limit: ExceedsDefault: 超出默认限制 + Limits: + NotFound: 未找到限制 + NoneSpecified: 未指定限制 Language: NotParsed: 无法解析语言 OIDCSettings: diff --git a/proto/zitadel/system.proto b/proto/zitadel/system.proto index ed2e73d29d..80fc89076d 100644 --- a/proto/zitadel/system.proto +++ b/proto/zitadel/system.proto @@ -365,6 +365,10 @@ service SystemService { // Returns an error if the quota already exists for the specified unit // Deprecated: use SetQuota instead rpc AddQuota(AddQuotaRequest) returns (AddQuotaResponse) { + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: ["Usage Control", "Quotas"]; + }; + option (google.api.http) = { post: "/instances/{instance_id}/quotas" body: "*" @@ -378,6 +382,10 @@ service SystemService { // Sets quota configuration properties // Creates a new quota if it doesn't exist for the specified unit rpc SetQuota(SetQuotaRequest) returns (SetQuotaResponse) { + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: ["Usage Control", "Quotas"]; + }; + option (google.api.http) = { put: "/instances/{instance_id}/quotas" body: "*" @@ -390,6 +398,10 @@ service SystemService { // Removes a quota rpc RemoveQuota(RemoveQuotaRequest) returns (RemoveQuotaResponse) { + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: ["Usage Control", "Quotas"]; + }; + option (google.api.http) = { delete: "/instances/{instance_id}/quotas/{unit}" }; @@ -410,6 +422,71 @@ service SystemService { permission: "authenticated"; }; } + + // Sets instance level limits + rpc SetLimits(SetLimitsRequest) returns (SetLimitsResponse) { + option (google.api.http) = { + put: "/instances/{instance_id}/limits" + body: "*" + }; + + option (zitadel.v1.auth_option) = { + permission: "authenticated"; + }; + + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: ["Usage Control", "Limits"]; + responses: { + key: "200"; + value: { + description: "Instance limits set"; + }; + }; + responses: { + key: "400"; + value: { + description: "At least one limit must be specified"; + schema: { + json_schema: { + ref: "#/definitions/rpcStatus"; + }; + }; + }; + }; + }; + } + + // Resets instance level limits + rpc ResetLimits(ResetLimitsRequest) returns (ResetLimitsResponse) { + option (google.api.http) = { + delete: "/instances/{instance_id}/limits" + }; + + option (zitadel.v1.auth_option) = { + permission: "authenticated"; + }; + + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: ["Usage Control", "Limits"]; + responses: { + key: "200"; + value: { + description: "Limits are reset to the system defaults"; + }; + }; + responses: { + key: "404"; + value: { + description: "Limits are already set to the system defaults"; + schema: { + json_schema: { + ref: "#/definitions/rpcStatus"; + }; + }; + }; + }; + }; + } } @@ -683,6 +760,27 @@ message RemoveQuotaResponse { zitadel.v1.ObjectDetails details = 1; } +message SetLimitsRequest { + string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}]; + google.protobuf.Duration audit_log_retention = 2 [ + (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { + description: "AuditLogRetention limits the number of events that can be queried via the events API by their age. A value of '0s' means that all events are available. If this value is set, it overwrites the system default."; + } + ]; +} + +message SetLimitsResponse { + zitadel.v1.ObjectDetails details = 1; +} + +message ResetLimitsRequest { + string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}]; +} + +message ResetLimitsResponse { + zitadel.v1.ObjectDetails details = 1; +} + message ExistsDomainRequest { string domain = 1 [(validate.rules).string = {min_len: 1, max_len: 200}]; } @@ -906,4 +1004,4 @@ message SetInstanceFeatureRequest { message SetInstanceFeatureResponse { zitadel.v1.ObjectDetails details = 1; -} \ No newline at end of file +}