feat: block instances (#7129)

* docs: fix init description typos

* feat: block instances using limits

* translate

* unit tests

* fix translations

* redirect /ui/login

* fix http interceptor

* cleanup

* fix http interceptor

* fix: delete cookies on gateway 200

* add integration tests

* add command test

* docs

* fix integration tests

* add bulk api and integration test

* optimize bulk set limits

* unit test bulk limits

* fix broken link

* fix assets middleware

* fix broken link

* validate instance id format

* Update internal/eventstore/search_query.go

Co-authored-by: Livio Spring <livio.a@gmail.com>

* remove support for owner bulk limit commands

* project limits to instances

* migrate instances projection

* Revert "migrate instances projection"

This reverts commit 214218732a.

* join limits, remove owner

* remove todo

* use optional bool

* normally validate instance ids

* use 302

* cleanup

* cleanup

* Update internal/api/grpc/system/limits_converter.go

Co-authored-by: Livio Spring <livio.a@gmail.com>

* remove owner

* remove owner from reset

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Elio Bischof 2024-01-17 11:16:48 +01:00 committed by GitHub
parent d9d376a275
commit ed0bc39ea4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
80 changed files with 1609 additions and 438 deletions

View File

@ -830,6 +830,9 @@ DefaultInstance:
# A value of "0s" means that all events are available.
# If this value is set, it overwrites the system default unless it is not reset via the admin API.
AuditLogRetention: # ZITADEL_DEFAULTINSTANCE_LIMITS_AUDITLOGRETENTION
# If Block is true, all requests except to /ui/console or the system API are blocked and /ui/login is redirected to /ui/console.
# /ui/console shows a message that the instance is blocked with a link to Console.InstanceManagementURL
Block: # ZITADEL_DEFAULTINSTANCE_LIMITS_BLOCK
Restrictions:
# DisallowPublicOrgRegistration defines if ZITADEL should expose the endpoint /ui/login/register/org
# If it is true, the endpoint returns the HTTP status 404 on GET requests, and 409 on POST requests.
@ -862,7 +865,8 @@ DefaultInstance:
# ResetInterval: 720h # 30 days
# # Amount defines the number of units for this quota
# Amount: 25000
# # Limit defines whether ZITADEL should block further usage when the configured amount is used
# # Limit defines whether ZITADEL should block further authenticated requests when the configured amount is used.
# # If you not only want to block authenticated requests but also authentication itself, consider using the system APIs SetLimits method.
# Limit: false
# # Notifications are emitted by ZITADEL when certain quota percentages are reached
# Notifications:

View File

@ -39,7 +39,7 @@ func New() *cobra.Command {
Long: `Sets up the minimum requirements to start ZITADEL.
Prerequisites:
- cockroachdb
- cockroachDB
The user provided by flags needs privileges to
- create the database if it does not exist

View File

@ -17,10 +17,10 @@ func newDatabase() *cobra.Command {
Short: "initialize only the database",
Long: `Sets up the ZITADEL database.
Prereqesits:
Prerequisites:
- cockroachDB or postgreSQL
The user provided by flags needs priviledge to
The user provided by flags needs privileges to
- create the database if it does not exist
- see other users and create a new one if the user does not exist
- grant all rights of the ZITADEL database to the user created if not yet set

View File

@ -17,7 +17,7 @@ func newGrant() *cobra.Command {
Short: "set ALL grant to user",
Long: `Sets ALL grant to the database user.
Prereqesits:
Prerequisites:
- cockroachDB or postgreSQL
`,
Run: func(cmd *cobra.Command, args []string) {

View File

@ -17,10 +17,10 @@ func newUser() *cobra.Command {
Short: "initialize only the database user",
Long: `Sets up the ZITADEL database user.
Prereqesits:
- cockroachDB or postreSQL
Prerequisites:
- cockroachDB or postgreSQL
The user provided by flags needs priviledge to
The user provided by flags needs privileges to
- create the database if it does not exist
- see other users and create a new one if the user does not exist
- grant all rights of the ZITADEL database to the user created if not yet set

View File

@ -19,7 +19,7 @@ func newZitadel() *cobra.Command {
Short: "initialize ZITADEL internals",
Long: `initialize ZITADEL internals.
Prereqesits:
Prerequisites:
- cockroachDB or postgreSQL with user and database
`,
Run: func(cmd *cobra.Command, args []string) {

26
cmd/setup/21.go Normal file
View File

@ -0,0 +1,26 @@
package setup
import (
"context"
_ "embed"
"github.com/zitadel/zitadel/internal/database"
)
var (
//go:embed 21.sql
addBlockFieldToLimits string
)
type AddBlockFieldToLimits struct {
dbClient *database.DB
}
func (mig *AddBlockFieldToLimits) Execute(ctx context.Context) error {
_, err := mig.dbClient.ExecContext(ctx, addBlockFieldToLimits)
return err
}
func (mig *AddBlockFieldToLimits) String() string {
return "21_add_block_field_to_limits"
}

1
cmd/setup/21.sql Normal file
View File

@ -0,0 +1 @@
ALTER TABLE IF EXISTS projections.limits ADD COLUMN IF NOT EXISTS block BOOLEAN;

View File

@ -78,6 +78,7 @@ type Steps struct {
s18AddLowerFieldsToLoginNames *AddLowerFieldsToLoginNames
s19AddCurrentStatesIndex *AddCurrentSequencesIndex
s20AddByUserSessionIndex *AddByUserIndexToSession
s21AddBlockFieldToLimits *AddBlockFieldToLimits
}
type encryptionKeyConfig struct {

View File

@ -111,6 +111,7 @@ func Setup(config *Config, steps *Steps, masterKey string) {
steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: queryDBClient}
steps.s19AddCurrentStatesIndex = &AddCurrentSequencesIndex{dbClient: queryDBClient}
steps.s20AddByUserSessionIndex = &AddByUserIndexToSession{dbClient: queryDBClient}
steps.s21AddBlockFieldToLimits = &AddBlockFieldToLimits{dbClient: queryDBClient}
err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections")
@ -165,9 +166,11 @@ func Setup(config *Config, steps *Steps, masterKey string) {
logging.OnError(err).Fatalf("unable to migrate repeatable step: %s", repeatableStep.String())
}
// This step is executed after the repeatable steps because it adds fields to the login_names3 projection
// These steps are executed after the repeatable steps because they add fields projections
err = migration.Migrate(ctx, eventstoreClient, steps.s18AddLowerFieldsToLoginNames)
logging.WithFields("name", steps.s18AddLowerFieldsToLoginNames.String()).OnError(err).Fatal("migration failed")
err = migration.Migrate(ctx, eventstoreClient, steps.s21AddBlockFieldToLimits)
logging.WithFields("name", steps.s21AddBlockFieldToLimits.String()).OnError(err).Fatal("migration failed")
}
func readStmt(fs embed.FS, folder, typ, filename string) (string, error) {

View File

@ -439,14 +439,14 @@ func startAPIs(
return fmt.Errorf("unable to start console: %w", err)
}
apis.RegisterHandlerOnPrefix(console.HandlerPrefix, c)
consolePath := console.HandlerPrefix + "/"
l, err := login.CreateLogin(
config.Login,
commands,
queries,
authRepo,
store,
console.HandlerPrefix+"/",
consolePath,
oidcServer.AuthCallbackURL(),
provider.AuthCallbackURL(samlProvider),
config.ExternalSecure,
@ -455,7 +455,7 @@ func startAPIs(
provider.NewIssuerInterceptor(samlProvider.IssuerFromRequest).Handler,
instanceInterceptor.Handler,
assetsCache.Handler,
limitingAccessInterceptor.WithoutLimiting().Handle,
limitingAccessInterceptor.WithRedirect(consolePath).Handle,
keys.User,
keys.IDPConfig,
keys.CSRFCookieKey,

View File

@ -261,8 +261,8 @@
"DESCRIPTION": "Щракнете върху бутона по-долу, за да влезете отново."
},
"EXHAUSTED": {
"TITLE": "Вашата квота за удостоверени заявки е изчерпана.",
"DESCRIPTION": ремахнете или увеличете ограничението на квотата за този екземпляр на ZITADEL."
"TITLE": "Вашият екземпляр е блокиран.",
"DESCRIPTION": опитайте администратора на вашия екземпляр ZITADEL да актуализира абонамента."
},
"INVALID_FORMAT": "Форматирането е невалидно.",
"NOTANEMAIL": "Дадената стойност не е имейл адрес.",

View File

@ -268,8 +268,8 @@
"DESCRIPTION": "Klikněte na tlačítko níže pro opětovné přihlášení."
},
"EXHAUSTED": {
"TITLE": "Vyčerpali jste kvótu pro autentizované požadavky.",
"DESCRIPTION": "Odstraňte nebo zvyšte limit kvóty pro tuto instanci ZITADEL."
"TITLE": "Vaše instance je blokována.",
"DESCRIPTION": "Požádejte svého správce instance ZITADEL, aby aktualizoval předplatné."
},
"INVALID_FORMAT": "Formát je neplatný.",
"NOTANEMAIL": "Zadaná hodnota není e-mailová adresa.",

View File

@ -267,8 +267,8 @@
"DESCRIPTION": "Klicke auf \"Einloggen\", um Dich erneut anzumelden."
},
"EXHAUSTED": {
"TITLE": "Dein Kontingent an authentifizierten Anfragen is erschöpft.",
"DESCRIPTION": "Lösche oder erhöhe die Grenze für diese ZITADEL Instanz."
"TITLE": "Deine Instanz ist blockiert.",
"DESCRIPTION": "Bitte kontaktiere den Administrator deiner ZITADEL Instanz."
},
"INVALID_FORMAT": "Das Format is ungültig.",
"NOTANEMAIL": "Der eingegebene Wert ist keine E-Mail Adresse.",

View File

@ -268,8 +268,8 @@
"DESCRIPTION": "Click the button below to log in again."
},
"EXHAUSTED": {
"TITLE": "Your quota for authenticated requests is exhausted.",
"DESCRIPTION": "Remove or increase the quota limit for this ZITADEL instance."
"TITLE": "Your instance is blocked.",
"DESCRIPTION": "Ask your ZITADEL instance administrator to update the subscription."
},
"INVALID_FORMAT": "The formatting is invalid.",
"NOTANEMAIL": "The given value is not an e-mail address.",

View File

@ -268,8 +268,8 @@
"DESCRIPTION": "Haz clic en el botón más abajo para iniciar sesión otra vez."
},
"EXHAUSTED": {
"TITLE": "Su cuota de solicitudes autenticadas se ha agotado.",
"DESCRIPTION": "Borrar o aumentar el límite de esta instancia de ZITADEL."
"TITLE": "Tu instancia está bloqueada.",
"DESCRIPTION": "Pide a tu administrador de instancia de ZITADEL que actualice la suscripción."
},
"INVALID_FORMAT": "El formato no es valido.",
"NOTANEMAIL": "El valor proporcionado no es una dirección de email.",

View File

@ -267,8 +267,8 @@
"DESCRIPTION": "Cliquez sur le bouton ci-dessous pour vous reconnecter."
},
"EXHAUSTED": {
"TITLE": "Ton quota de demandes authentifiées est épuisé.",
"DESCRIPTION": "Supprimez ou augmentez la limite de cette instance ZITADEL."
"TITLE": "Votre instance est bloquée.",
"DESCRIPTION": "Demandez à votre administrateur d'instance ZITADEL de mettre à jour l'abonnement."
},
"INVALID_FORMAT": "Le format n'est pas valide",
"NOTANEMAIL": "La valeur donnée n'est pas une adresse e-mail",

View File

@ -266,8 +266,8 @@
"DESCRIPTION": "Clicca il pulsante per richiedere una nuova sessione."
},
"EXHAUSTED": {
"TITLE": "La quota di richieste autenticate è esaurita.",
"DESCRIPTION": "Cancellare o aumentare il limite per questa istanza ZITADEL."
"TITLE": "La tua istanza è bloccata.",
"DESCRIPTION": "Chiedi all'amministratore dell'istanza ZITADEL di aggiornare l'abbonamento."
},
"INVALID_FORMAT": "Il formato non è valido.",
"NOTANEMAIL": "Il valore dato non \u00e8 un indirizzo e-mail.",

View File

@ -268,8 +268,8 @@
"DESCRIPTION": "下のボタンをクリックして、もう一度ログインする。"
},
"EXHAUSTED": {
"TITLE": "認証されたリクエストのクォータを使い果たしました",
"DESCRIPTION": "このZITADELインスタンスの制限を削除または増加させる"
"TITLE": "あなたのインスタンスはブロックされています。",
"DESCRIPTION": "ZITADELインスタンス管理者にサブスクリプションの更新を依頼してください。"
},
"INVALID_FORMAT": "不正なフォーマットです",
"NOTANEMAIL": "入力された値がメールアドレスではありません。",

View File

@ -268,8 +268,8 @@
"DESCRIPTION": "Кликнете на копчето подолу за повторна најава."
},
"EXHAUSTED": {
"TITLE": "Вашиот квота за автентицирани барања е надмината.",
"DESCRIPTION": "Отстранете или зголемете ја квотата за оваа ZITADEL инстанца."
"TITLE": "Вашиот авторизациски токен е истечен.",
"DESCRIPTION": "Кликнете на копчето подолу за повторна најава."
},
"INVALID_FORMAT": "Невалиден формат.",
"NOTANEMAIL": "Внесената вредност не е е-пошта.",

View File

@ -268,8 +268,8 @@
"DESCRIPTION": "Klik op de knop hieronder om opnieuw in te loggen."
},
"EXHAUSTED": {
"TITLE": "Uw quotum voor geauthenticeerde aanvragen is opgebruikt.",
"DESCRIPTION": "Verwijder of verhoog het quotumlimiet voor deze ZITADEL-instantie."
"TITLE": "Uw instantie is geblokkeerd.",
"DESCRIPTION": "Vraag uw ZITADEL instantiebeheerder om het abonnement bij te werken."
},
"INVALID_FORMAT": "De opmaak is ongeldig.",
"NOTANEMAIL": "De opgegeven waarde is geen e-mailadres.",

View File

@ -267,8 +267,8 @@
"DESCRIPTION": "Kliknij przycisk poniżej, aby ponownie się zalogować."
},
"EXHAUSTED": {
"TITLE": "Twój limit uwierzytelnionych wniosków został wyczerpany.",
"DESCRIPTION": "Usuń lub zwiększ limit dla tej instancji ZITADEL."
"TITLE": "Twoja instancja jest zablokowana.",
"DESCRIPTION": "Poproś administratora swojej instancji ZITADEL o aktualizację subskrypcji."
},
"INVALID_FORMAT": "Format jest nieprawidłowy.",
"NOTANEMAIL": "Podana wartość nie jest adresem e-mail.",

View File

@ -268,8 +268,8 @@
"DESCRIPTION": "Clique no botão abaixo para fazer login novamente."
},
"EXHAUSTED": {
"TITLE": "Sua cota para solicitações autenticadas está esgotada.",
"DESCRIPTION": "Remova ou aumente o limite de cota para esta instância ZITADEL."
"TITLE": "Sua instância está bloqueada.",
"DESCRIPTION": "Peça ao administrador da sua instância ZITADEL para atualizar a assinatura."
},
"INVALID_FORMAT": "O formato é inválido.",
"NOTANEMAIL": "O valor fornecido não é um endereço de e-mail.",

View File

@ -264,8 +264,8 @@
"DESCRIPTION": "Нажмите кнопку ниже, чтобы войти снова."
},
"EXHAUSTED": {
"TITLE": "Ваша квота на аутентифицированные запросы исчерпана.",
"DESCRIPTION": "Удалите или увеличьте лимит квоты для этого экземпляра ZITADEL."
"TITLE": "Ваш экземпляр заблокирован.",
"DESCRIPTION": "Попросите администратора вашего экземпляра ZITADEL обновить подписку."
},
"INVALID_FORMAT": "Форматирование неверно.",
"NOTANEMAIL": "Данное значение не является адресом электронной почты.",

View File

@ -267,8 +267,8 @@
"DESCRIPTION": "点击下方按钮再次登录。"
},
"EXHAUSTED": {
"TITLE": "你的认证请求配额已用完.",
"DESCRIPTION": "删除或增加这个ZITADEL实例的限制。"
"TITLE": "您的实例已被阻止。",
"DESCRIPTION": "请联系您的 ZITADEL 实例管理员以更新订阅。"
},
"INVALID_FORMAT": "格式是无效的。",
"NOTANEMAIL": "给定的值不是合法电子邮件地址。",

View File

@ -7,6 +7,26 @@ If you have a self-hosted ZITADEL environment, you can limit the usage of your [
For example, if you provide your customers [their own virtual instances](/concepts/structure/instance#multiple-virtual-instances) with access on their own domains, you can design a pricing model based on the usage of their instances.
The usage control features are currently limited to the instance level only.
## Block Instances
You can block an instance using the [system API](/category/apis/resources/system/limits).
Most requests to a blocked instance are rejected with the HTTP status *429 Too Many Requests* or the gRPC status *8 Resource Exhausted*.
However, requests to the [system API](/apis/introduction#system) are still allowed.
Requests to paths with the prefix */ui/login* return a redirect with HTTP status *302 Found* to */ui/console*, where the user is guided to *InstanceManagementURL*.
Blocked HTTP requests additionally set a cookie to make it easy to block traffic before it reaches your ZITADEL runtime, for example with a WAF rule.
You can block new instances by default using the *DefaultInstance.Limits.Block* runtime configuration.
The following snippets shows the default YAML:
```yaml
DefaultInstance:
Limits:
# If Block is true, all requests except to /ui/console or the system API are blocked and /ui/login is redirected to /ui/console.
# /ui/console shows a message that the instance is blocked with a link to Console.InstanceManagementURL
Block: # ZITADEL_DEFAULTINSTANCE_LIMITS_BLOCK
```
## Limit Audit Trails
You can restrict the maximum age of events returned by the following APIs:
@ -107,8 +127,9 @@ DefaultInstance:
### Exhausted Authenticated Requests
If a quota is configured to limit requests and the quotas amount is exhausted, all further requests are blocked except requests to the System API.
Also, a cookie is set, to make it easier to block further traffic before it reaches your ZITADEL runtime.
If a quota is configured to limit requests and the quotas amount is exhausted, all further authenticated requests are blocked except requests to the [system API](/apis/introduction#system).
Also, a cookie is set, to make it easier to block further traffic before it reaches your ZITADEL runtime, for example with a WAF rule.
The console is still served, but it only shows a dialog that says that the instance is blocked with a link to *InstanceManagementURL*.
### Exhausted Action Run Seconds

View File

@ -94,7 +94,6 @@ func NewHandler(commands *command.Commands, verifier authz.APITokenVerifier, aut
router := mux.NewRouter()
csp := http_mw.SecurityHeaders(&http_mw.DefaultSCP, nil)
router.Use(callDurationInterceptor, instanceInterceptor, assetCacheInterceptor, accessInterceptor, csp)
router.Use(callDurationInterceptor, instanceInterceptor, assetCacheInterceptor, accessInterceptor)
RegisterRoutes(router, h)
router.PathPrefix("/{owner}").Methods("GET").HandlerFunc(DownloadHandleFunc(h, h.GetFile()))
return http_util.CopyHeadersToContext(http_mw.CORSInterceptor(router))

View File

@ -2,6 +2,7 @@ package authz
import (
"context"
"time"
"golang.org/x/text/language"
)
@ -20,6 +21,8 @@ type Instance interface {
DefaultLanguage() language.Tag
DefaultOrganisationID() string
SecurityPolicyAllowedOrigins() []string
Block() *bool
AuditLogRetention() *time.Duration
}
type InstanceVerifier interface {
@ -36,6 +39,14 @@ type instance struct {
orgID string
}
func (i *instance) Block() *bool {
return nil
}
func (i *instance) AuditLogRetention() *time.Duration {
return nil
}
func (i *instance) InstanceID() string {
return i.id
}

View File

@ -3,6 +3,7 @@ package authz
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"golang.org/x/text/language"
@ -68,6 +69,14 @@ func Test_Instance(t *testing.T) {
type mockInstance struct{}
func (m *mockInstance) Block() *bool {
panic("shouldn't be called here")
}
func (m *mockInstance) AuditLogRetention() *time.Duration {
panic("shouldn't be called here")
}
func (m *mockInstance) InstanceID() string {
return "instanceID"
}

View File

@ -185,7 +185,7 @@ func addInterceptors(
handler = http_mw.ActivityHandler(handler)
// For some non-obvious reason, the exhaustedCookieInterceptor sends the SetCookie header
// only if it follows the http_mw.DefaultTelemetryHandler
handler = exhaustedCookieInterceptor(handler, accessInterceptor, queries)
handler = exhaustedCookieInterceptor(handler, accessInterceptor)
handler = http_mw.DefaultMetricsHandler(handler)
return handler
}
@ -205,14 +205,12 @@ func http1Host(next http.Handler, http1HostName string) http.Handler {
func exhaustedCookieInterceptor(
next http.Handler,
accessInterceptor *http_mw.AccessInterceptor,
queries *query.Queries,
) http.Handler {
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
next.ServeHTTP(&cookieResponseWriter{
ResponseWriter: writer,
accessInterceptor: accessInterceptor,
request: request,
queries: queries,
}, request)
})
}
@ -221,7 +219,7 @@ type cookieResponseWriter struct {
http.ResponseWriter
accessInterceptor *http_mw.AccessInterceptor
request *http.Request
queries *query.Queries
headerWritten bool
}
func (r *cookieResponseWriter) WriteHeader(status int) {
@ -231,9 +229,18 @@ func (r *cookieResponseWriter) WriteHeader(status int) {
if status == http.StatusTooManyRequests {
r.accessInterceptor.SetExhaustedCookie(r.ResponseWriter, r.request)
}
r.headerWritten = true
r.ResponseWriter.WriteHeader(status)
}
func (r *cookieResponseWriter) Write(bytes []byte) (int, error) {
if !r.headerWritten {
// If no header was written before the data, the status code is 200 and we can delete the cookie
r.accessInterceptor.DeleteExhaustedCookie(r.ResponseWriter)
}
return r.ResponseWriter.Write(bytes)
}
func grpcCredentials(tlsConfig *tls.Config) credentials.TransportCredentials {
creds := insecure.NewCredentials()
if tlsConfig != nil {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"testing"
"time"
"golang.org/x/text/language"
"google.golang.org/grpc"
@ -164,6 +165,14 @@ func (m *mockInstanceVerifier) InstanceByID(context.Context) (authz.Instance, er
type mockInstance struct{}
func (m *mockInstance) Block() *bool {
panic("shouldn't be called here")
}
func (m *mockInstance) AuditLogRetention() *time.Duration {
panic("shouldn't be called here")
}
func (m *mockInstance) InstanceID() string {
return "instanceID"
}

View File

@ -0,0 +1,31 @@
package middleware
import (
"context"
"strings"
"google.golang.org/grpc"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/zerrors"
)
func LimitsInterceptor(ignoreService ...string) grpc.UnaryServerInterceptor {
for idx, service := range ignoreService {
if !strings.HasPrefix(service, "/") {
ignoreService[idx] = "/" + service
}
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
for _, service := range ignoreService {
if strings.HasPrefix(info.FullMethod, service) {
return handler(ctx, req)
}
}
instance := authz.GetInstance(ctx)
if block := instance.Block(); block != nil && *block {
return nil, zerrors.ThrowResourceExhausted(nil, "LIMITS-molsj", "Errors.Limits.Instance.Blocked")
}
return handler(ctx, req)
}
}

View File

@ -53,9 +53,10 @@ func CreateServer(
middleware.InstanceInterceptor(queries, hostHeaderName, system_pb.SystemService_ServiceDesc.ServiceName, healthpb.Health_ServiceDesc.ServiceName),
middleware.AccessStorageInterceptor(accessSvc),
middleware.ErrorHandler(),
middleware.LimitsInterceptor(system_pb.SystemService_ServiceDesc.ServiceName),
middleware.AuthorizationInterceptor(verifier, authConfig),
middleware.QuotaExhaustedInterceptor(accessSvc, system_pb.SystemService_ServiceDesc.ServiceName),
middleware.TranslationHandler(),
middleware.QuotaExhaustedInterceptor(accessSvc, system_pb.SystemService_ServiceDesc.ServiceName),
middleware.ValidationHandler(),
middleware.ServiceHandler(),
middleware.ActivityInterceptor(),

View File

@ -4,15 +4,12 @@ import (
"context"
"github.com/zitadel/zitadel/internal/api/grpc/object"
objectpb "github.com/zitadel/zitadel/pkg/grpc/object"
"github.com/zitadel/zitadel/pkg/grpc/system"
)
func (s *Server) SetLimits(ctx context.Context, req *system.SetLimitsRequest) (*system.SetLimitsResponse, error) {
details, err := s.command.SetLimits(
ctx,
req.GetInstanceId(),
instanceLimitsPbToCommand(req),
)
details, err := s.command.SetLimits(ctx, setInstanceLimitsPbToCommand(req))
if err != nil {
return nil, err
}
@ -21,8 +18,23 @@ func (s *Server) SetLimits(ctx context.Context, req *system.SetLimitsRequest) (*
}, nil
}
func (s *Server) ResetLimits(ctx context.Context, req *system.ResetLimitsRequest) (*system.ResetLimitsResponse, error) {
details, err := s.command.ResetLimits(ctx, req.GetInstanceId())
func (s *Server) BulkSetLimits(ctx context.Context, req *system.BulkSetLimitsRequest) (*system.BulkSetLimitsResponse, error) {
details, targetDetails, err := s.command.SetInstanceLimitsBulk(ctx, bulkSetInstanceLimitsPbToCommand(req))
if err != nil {
return nil, err
}
resp := &system.BulkSetLimitsResponse{
Details: object.AddToDetailsPb(details.Sequence, details.EventDate, details.ResourceOwner),
TargetDetails: make([]*objectpb.ObjectDetails, len(targetDetails)),
}
for i := range targetDetails {
resp.TargetDetails[i] = object.AddToDetailsPb(targetDetails[i].Sequence, targetDetails[i].EventDate, targetDetails[i].ResourceOwner)
}
return resp, nil
}
func (s *Server) ResetLimits(ctx context.Context, _ *system.ResetLimitsRequest) (*system.ResetLimitsResponse, error) {
details, err := s.command.ResetLimits(ctx)
if err != nil {
return nil, err
}

View File

@ -7,10 +7,23 @@ import (
"github.com/zitadel/zitadel/pkg/grpc/system"
)
func instanceLimitsPbToCommand(req *system.SetLimitsRequest) *command.SetLimits {
func setInstanceLimitsPbToCommand(req *system.SetLimitsRequest) *command.SetLimits {
var setLimits = new(command.SetLimits)
if req.AuditLogRetention != nil {
setLimits.AuditLogRetention = gu.Ptr(req.AuditLogRetention.AsDuration())
}
setLimits.Block = req.Block
return setLimits
}
func bulkSetInstanceLimitsPbToCommand(req *system.BulkSetLimitsRequest) []*command.SetInstanceLimitsBulk {
cmds := make([]*command.SetInstanceLimitsBulk, len(req.Limits))
for i := range req.Limits {
setLimitsReq := req.Limits[i]
cmds[i] = &command.SetInstanceLimitsBulk{
InstanceID: setLimitsReq.GetInstanceId(),
SetLimits: *setInstanceLimitsPbToCommand(req.Limits[i]),
}
}
return cmds
}

View File

@ -0,0 +1,287 @@
//go:build integration
package system_test
import (
"fmt"
"io"
"net"
"net/http"
"strings"
"testing"
"time"
"github.com/muhlemmer/gu"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/pkg/grpc/admin"
"github.com/zitadel/zitadel/pkg/grpc/system"
)
func TestServer_Limits_Block(t *testing.T) {
domain, instanceID, iamOwnerCtx := Tester.UseIsolatedInstance(t, CTX, SystemCTX)
tests := []*test{
publicAPIBlockingTest(domain),
{
name: "mutating API",
testGrpc: func(tt assert.TestingT, expectBlocked bool) {
randomGrpcIdpName := randomString("idp-grpc", 5)
_, err := Tester.Client.Admin.AddGitHubProvider(iamOwnerCtx, &admin.AddGitHubProviderRequest{
Name: randomGrpcIdpName,
ClientId: "client-id",
ClientSecret: "client-secret",
})
assertGrpcError(tt, err, expectBlocked)
//nolint:contextcheck
idpExists := idpExistsCondition(tt, instanceID, randomGrpcIdpName)
if expectBlocked {
// We ensure that the idp really is not created
assert.Neverf(tt, idpExists, 5*time.Second, 1*time.Second, "idp should never be created")
} else {
assert.Eventuallyf(tt, idpExists, 5*time.Second, 1*time.Second, "idp should be created")
}
},
testHttp: func(tt assert.TestingT) (*http.Request, error, func(assert.TestingT, *http.Response, bool)) {
randomHttpIdpName := randomString("idp-http", 5)
req, err := http.NewRequestWithContext(
CTX,
"POST",
fmt.Sprintf("http://%s/admin/v1/idps/github", net.JoinHostPort(domain, "8080")),
strings.NewReader(`{
"name": "`+randomHttpIdpName+`",
"clientId": "client-id",
"clientSecret": "client-secret"
}`),
)
if err != nil {
return nil, err, nil
}
req.Header.Set("Authorization", Tester.BearerToken(iamOwnerCtx))
return req, nil, func(ttt assert.TestingT, response *http.Response, expectBlocked bool) {
assertLimitResponse(ttt, response, expectBlocked)
assertSetLimitingCookie(ttt, response, expectBlocked)
}
},
}, {
name: "discovery",
testHttp: func(tt assert.TestingT) (*http.Request, error, func(assert.TestingT, *http.Response, bool)) {
req, err := http.NewRequestWithContext(
CTX,
"GET",
fmt.Sprintf("http://%s/.well-known/openid-configuration", net.JoinHostPort(domain, "8080")),
nil,
)
return req, err, func(ttt assert.TestingT, response *http.Response, expectBlocked bool) {
assertLimitResponse(ttt, response, expectBlocked)
assertSetLimitingCookie(ttt, response, expectBlocked)
}
},
}, {
name: "login",
testHttp: func(tt assert.TestingT) (*http.Request, error, func(assert.TestingT, *http.Response, bool)) {
req, err := http.NewRequestWithContext(
CTX,
"GET",
fmt.Sprintf("http://%s/ui/login/login/externalidp/callback", net.JoinHostPort(domain, "8080")),
nil,
)
return req, err, func(ttt assert.TestingT, response *http.Response, expectBlocked bool) {
// the login paths should return a redirect if the instance is blocked
if expectBlocked {
assert.Equal(ttt, http.StatusFound, response.StatusCode)
} else {
assertLimitResponse(ttt, response, false)
}
assertSetLimitingCookie(ttt, response, expectBlocked)
}
},
}, {
name: "console",
testHttp: func(tt assert.TestingT) (*http.Request, error, func(assert.TestingT, *http.Response, bool)) {
req, err := http.NewRequestWithContext(
CTX,
"GET",
fmt.Sprintf("http://%s/ui/console/", net.JoinHostPort(domain, "8080")),
nil,
)
return req, err, func(ttt assert.TestingT, response *http.Response, expectBlocked bool) {
// the console is not blocked so we can render a link to an instance management portal.
// A CDN can cache these assets easily
// We also don't care about a cookie because the environment.json already takes care of that.
assertLimitResponse(ttt, response, false)
}
},
}, {
name: "environment.json",
testHttp: func(tt assert.TestingT) (*http.Request, error, func(assert.TestingT, *http.Response, bool)) {
req, err := http.NewRequestWithContext(
CTX,
"GET",
fmt.Sprintf("http://%s/ui/console/assets/environment.json", net.JoinHostPort(domain, "8080")),
nil,
)
return req, err, func(ttt assert.TestingT, response *http.Response, expectBlocked bool) {
// the environment.json should always return successfully
assertLimitResponse(ttt, response, false)
assertSetLimitingCookie(ttt, response, expectBlocked)
body, err := io.ReadAll(response.Body)
assert.NoError(ttt, err)
var compFunc assert.ComparisonAssertionFunc = assert.NotContains
if expectBlocked {
compFunc = assert.Contains
}
compFunc(ttt, string(body), `"exhausted":true`)
}
},
}}
_, err := Tester.Client.System.SetLimits(SystemCTX, &system.SetLimitsRequest{
InstanceId: instanceID,
Block: gu.Ptr(true),
})
require.NoError(t, err)
// The following call ensures that an undefined bool is not deserialized to false
_, err = Tester.Client.System.SetLimits(SystemCTX, &system.SetLimitsRequest{
InstanceId: instanceID,
AuditLogRetention: durationpb.New(time.Hour),
})
require.NoError(t, err)
for _, tt := range tests {
var isFirst bool
t.Run(tt.name+" with blocking", func(t *testing.T) {
isFirst = isFirst || !t.Skipped()
testBlockingAPI(t, tt, true, isFirst)
})
}
_, err = Tester.Client.System.SetLimits(SystemCTX, &system.SetLimitsRequest{
InstanceId: instanceID,
Block: gu.Ptr(false),
})
require.NoError(t, err)
for _, tt := range tests {
var isFirst bool
t.Run(tt.name+" without blocking", func(t *testing.T) {
isFirst = isFirst || !t.Skipped()
testBlockingAPI(t, tt, false, isFirst)
})
}
}
type test struct {
name string
testHttp func(t assert.TestingT) (req *http.Request, err error, assertResponse func(t assert.TestingT, response *http.Response, expectBlocked bool))
testGrpc func(t assert.TestingT, expectBlocked bool)
}
func testBlockingAPI(t *testing.T, tt *test, expectBlocked bool, isFirst bool) {
req, err, assertResponse := tt.testHttp(t)
require.NoError(t, err)
testHTTP := func(tt assert.TestingT) {
resp, err := (&http.Client{
// Don't follow redirects
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}).Do(req)
defer func() {
require.NoError(t, resp.Body.Close())
}()
require.NoError(t, err)
assertResponse(t, resp, expectBlocked)
}
if isFirst {
// limits are eventually consistent, so we need to wait for the blocking to be set on the first test
assert.EventuallyWithT(t, func(c *assert.CollectT) {
testHTTP(c)
}, 15*time.Second, time.Second, "wait for blocking to be set")
} else {
testHTTP(t)
}
if tt.testGrpc != nil {
tt.testGrpc(t, expectBlocked)
}
}
func publicAPIBlockingTest(domain string) *test {
return &test{
name: "public API",
testGrpc: func(tt assert.TestingT, expectBlocked bool) {
conn, err := grpc.DialContext(CTX, net.JoinHostPort(domain, "8080"),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
assert.NoError(tt, err)
_, err = admin.NewAdminServiceClient(conn).Healthz(CTX, &admin.HealthzRequest{})
assertGrpcError(tt, err, expectBlocked)
},
testHttp: func(tt assert.TestingT) (*http.Request, error, func(assert.TestingT, *http.Response, bool)) {
req, err := http.NewRequestWithContext(
CTX,
"GET",
fmt.Sprintf("http://%s/admin/v1/healthz", net.JoinHostPort(domain, "8080")),
nil,
)
return req, err, func(ttt assert.TestingT, response *http.Response, expectBlocked bool) {
assertLimitResponse(ttt, response, expectBlocked)
assertSetLimitingCookie(ttt, response, expectBlocked)
}
},
}
}
// If expectSet is true, we expect the cookie to be set
// If expectSet is false, we expect the cookie to be deleted
func assertSetLimitingCookie(t assert.TestingT, response *http.Response, expectSet bool) {
for _, cookie := range response.Cookies() {
if cookie.Name == "zitadel.quota.exhausted" {
if expectSet {
assert.Greater(t, cookie.MaxAge, 0)
} else {
assert.LessOrEqual(t, cookie.MaxAge, 0)
}
return
}
}
assert.FailNow(t, "cookie not found")
}
func assertGrpcError(t assert.TestingT, err error, expectBlocked bool) {
if expectBlocked {
assert.Equal(t, codes.ResourceExhausted, status.Convert(err).Code())
return
}
assert.NoError(t, err)
}
func assertLimitResponse(t assert.TestingT, response *http.Response, expectBlocked bool) {
if expectBlocked {
assert.Equal(t, http.StatusTooManyRequests, response.StatusCode)
return
}
assert.GreaterOrEqual(t, response.StatusCode, 200)
assert.Less(t, response.StatusCode, 300)
}
func idpExistsCondition(t assert.TestingT, instanceID, idpName string) func() bool {
return func() bool {
nameQuery, err := query.NewIDPTemplateNameSearchQuery(query.TextEquals, idpName)
assert.NoError(t, err)
instanceQuery, err := query.NewIDPTemplateResourceOwnerSearchQuery(instanceID)
assert.NoError(t, err)
idps, err := Tester.Queries.IDPTemplates(authz.WithInstanceID(CTX, instanceID), &query.IDPTemplateSearchQueries{
Queries: []query.SearchQuery{
instanceQuery,
nameQuery,
},
}, false)
assert.NoError(t, err)
return len(idps.Templates) > 0
}
}

View File

@ -0,0 +1,75 @@
//go:build integration
package system_test
import (
"testing"
"github.com/muhlemmer/gu"
"github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/internal/integration"
"github.com/zitadel/zitadel/pkg/grpc/system"
)
func TestServer_Limits_Bulk(t *testing.T) {
const len = 5
type instance struct{ domain, id string }
instances := make([]*instance, len)
for i := 0; i < len; i++ {
domain := integration.RandString(5) + ".integration.localhost"
resp, err := Tester.Client.System.CreateInstance(SystemCTX, &system.CreateInstanceRequest{
InstanceName: "testinstance",
CustomDomain: domain,
Owner: &system.CreateInstanceRequest_Machine_{
Machine: &system.CreateInstanceRequest_Machine{
UserName: "owner",
Name: "owner",
},
},
})
require.NoError(t, err)
instances[i] = &instance{domain, resp.GetInstanceId()}
}
resp, err := Tester.Client.System.BulkSetLimits(SystemCTX, &system.BulkSetLimitsRequest{
Limits: []*system.SetLimitsRequest{{
InstanceId: instances[0].id,
Block: gu.Ptr(true),
}, {
InstanceId: instances[1].id,
Block: gu.Ptr(false),
}, {
InstanceId: instances[2].id,
Block: gu.Ptr(true),
}, {
InstanceId: instances[3].id,
Block: gu.Ptr(false),
}, {
InstanceId: instances[4].id,
Block: gu.Ptr(true),
}},
})
require.NoError(t, err)
details := resp.GetTargetDetails()
require.Len(t, details, len)
t.Run("the first instance is blocked", func(t *testing.T) {
require.Equal(t, instances[0].id, details[0].GetResourceOwner(), "resource owner must be instance id")
testBlockingAPI(t, publicAPIBlockingTest(instances[0].domain), true, true)
})
t.Run("the second instance isn't blocked", func(t *testing.T) {
require.Equal(t, instances[1].id, details[1].GetResourceOwner(), "resource owner must be instance id")
testBlockingAPI(t, publicAPIBlockingTest(instances[1].domain), false, true)
})
t.Run("the third instance is blocked", func(t *testing.T) {
require.Equal(t, instances[2].id, details[2].GetResourceOwner(), "resource owner must be instance id")
testBlockingAPI(t, publicAPIBlockingTest(instances[2].domain), true, true)
})
t.Run("the fourth instance isn't blocked", func(t *testing.T) {
require.Equal(t, instances[3].id, details[3].GetResourceOwner(), "resource owner must be instance id")
testBlockingAPI(t, publicAPIBlockingTest(instances[3].domain), false, true)
})
t.Run("the fifth instance is blocked", func(t *testing.T) {
require.Equal(t, instances[4].id, details[4].GetResourceOwner(), "resource owner must be instance id")
testBlockingAPI(t, publicAPIBlockingTest(instances[4].domain), true, true)
})
}

View File

@ -1,6 +1,6 @@
//go:build integration
package system_test
package quotas_enabled_test
import (
"bytes"
@ -23,13 +23,12 @@ import (
var callURL = "http://localhost:" + integration.PortQuotaServer
func TestServer_QuotaNotification_Limit(t *testing.T) {
_, instanceID, iamOwnerCtx := Tester.UseIsolatedInstance(t, CTX, SystemCTX)
amount := 10
percent := 50
percentAmount := amount * percent / 100
_, err := Tester.Client.System.SetQuota(SystemCTX, &system.SetQuotaRequest{
InstanceId: instanceID,
InstanceId: Tester.Instance.InstanceID(),
Unit: quota_pb.Unit_UNIT_REQUESTS_ALL_AUTHENTICATED,
From: timestamppb.Now(),
ResetInterval: durationpb.New(time.Minute * 5),
@ -51,23 +50,23 @@ func TestServer_QuotaNotification_Limit(t *testing.T) {
require.NoError(t, err)
for i := 0; i < percentAmount; i++ {
_, err := Tester.Client.Admin.GetDefaultOrg(iamOwnerCtx, &admin.GetDefaultOrgRequest{})
_, err := Tester.Client.Admin.GetDefaultOrg(IAMOwnerCTX, &admin.GetDefaultOrgRequest{})
require.NoErrorf(t, err, "error in %d call of %d", i, percentAmount)
}
awaitNotification(t, Tester.QuotaNotificationChan, quota.RequestsAllAuthenticated, percent)
for i := 0; i < (amount - percentAmount); i++ {
_, err := Tester.Client.Admin.GetDefaultOrg(iamOwnerCtx, &admin.GetDefaultOrgRequest{})
_, err := Tester.Client.Admin.GetDefaultOrg(IAMOwnerCTX, &admin.GetDefaultOrgRequest{})
require.NoErrorf(t, err, "error in %d call of %d", i, percentAmount)
}
awaitNotification(t, Tester.QuotaNotificationChan, quota.RequestsAllAuthenticated, 100)
_, limitErr := Tester.Client.Admin.GetDefaultOrg(iamOwnerCtx, &admin.GetDefaultOrgRequest{})
_, limitErr := Tester.Client.Admin.GetDefaultOrg(IAMOwnerCTX, &admin.GetDefaultOrgRequest{})
require.Error(t, limitErr)
}
func TestServer_QuotaNotification_NoLimit(t *testing.T) {
_, instanceID, iamOwnerCtx := Tester.UseIsolatedInstance(t, CTX, SystemCTX)
_, instanceID, IAMOwnerCTX := Tester.UseIsolatedInstance(t, CTX, SystemCTX)
amount := 10
percent := 50
percentAmount := amount * percent / 100
@ -95,24 +94,24 @@ func TestServer_QuotaNotification_NoLimit(t *testing.T) {
require.NoError(t, err)
for i := 0; i < percentAmount; i++ {
_, err := Tester.Client.Admin.GetDefaultOrg(iamOwnerCtx, &admin.GetDefaultOrgRequest{})
_, err := Tester.Client.Admin.GetDefaultOrg(IAMOwnerCTX, &admin.GetDefaultOrgRequest{})
require.NoErrorf(t, err, "error in %d call of %d", i, percentAmount)
}
awaitNotification(t, Tester.QuotaNotificationChan, quota.RequestsAllAuthenticated, percent)
for i := 0; i < (amount - percentAmount); i++ {
_, err := Tester.Client.Admin.GetDefaultOrg(iamOwnerCtx, &admin.GetDefaultOrgRequest{})
_, err := Tester.Client.Admin.GetDefaultOrg(IAMOwnerCTX, &admin.GetDefaultOrgRequest{})
require.NoErrorf(t, err, "error in %d call of %d", i, percentAmount)
}
awaitNotification(t, Tester.QuotaNotificationChan, quota.RequestsAllAuthenticated, 100)
for i := 0; i < amount; i++ {
_, err := Tester.Client.Admin.GetDefaultOrg(iamOwnerCtx, &admin.GetDefaultOrgRequest{})
_, err := Tester.Client.Admin.GetDefaultOrg(IAMOwnerCTX, &admin.GetDefaultOrgRequest{})
require.NoErrorf(t, err, "error in %d call of %d", i, percentAmount)
}
awaitNotification(t, Tester.QuotaNotificationChan, quota.RequestsAllAuthenticated, 200)
_, limitErr := Tester.Client.Admin.GetDefaultOrg(iamOwnerCtx, &admin.GetDefaultOrgRequest{})
_, limitErr := Tester.Client.Admin.GetDefaultOrg(IAMOwnerCTX, &admin.GetDefaultOrgRequest{})
require.NoError(t, limitErr)
}

View File

@ -0,0 +1,37 @@
//go:build integration
package quotas_enabled_test
import (
"context"
"os"
"testing"
"time"
"github.com/zitadel/zitadel/internal/integration"
)
var (
CTX context.Context
SystemCTX context.Context
IAMOwnerCTX context.Context
Tester *integration.Tester
)
func TestMain(m *testing.M) {
os.Exit(func() int {
ctx, _, cancel := integration.Contexts(5 * time.Minute)
defer cancel()
CTX = ctx
Tester = integration.NewTester(ctx, `
Quotas:
Access:
Enabled: true
`)
defer Tester.Done()
SystemCTX = Tester.WithAuthorization(ctx, integration.SystemUser)
IAMOwnerCTX = Tester.WithAuthorization(ctx, integration.IAMOwner)
return m.Run()
}())
}

View File

@ -19,10 +19,11 @@ import (
)
type AccessInterceptor struct {
svc *logstore.Service[*record.AccessLog]
logstoreSvc *logstore.Service[*record.AccessLog]
cookieHandler *http_utils.CookieHandler
limitConfig *AccessConfig
storeOnly bool
redirect string
}
type AccessConfig struct {
@ -32,10 +33,10 @@ type AccessConfig struct {
// NewAccessInterceptor intercepts all requests and stores them to the logstore.
// If storeOnly is false, it also checks if requests are exhausted.
// If requests are exhausted, it also returns http.StatusTooManyRequests and sets a cookie
// If requests are exhausted, it also returns http.StatusTooManyRequests or a redirect to the given path and sets a cookie
func NewAccessInterceptor(svc *logstore.Service[*record.AccessLog], cookieHandler *http_utils.CookieHandler, cookieConfig *AccessConfig) *AccessInterceptor {
return &AccessInterceptor{
svc: svc,
logstoreSvc: svc,
cookieHandler: cookieHandler,
limitConfig: cookieConfig,
}
@ -43,24 +44,61 @@ func NewAccessInterceptor(svc *logstore.Service[*record.AccessLog], cookieHandle
func (a *AccessInterceptor) WithoutLimiting() *AccessInterceptor {
return &AccessInterceptor{
svc: a.svc,
logstoreSvc: a.logstoreSvc,
cookieHandler: a.cookieHandler,
limitConfig: a.limitConfig,
storeOnly: true,
redirect: a.redirect,
}
}
func (a *AccessInterceptor) WithRedirect(redirect string) *AccessInterceptor {
return &AccessInterceptor{
logstoreSvc: a.logstoreSvc,
cookieHandler: a.cookieHandler,
limitConfig: a.limitConfig,
storeOnly: a.storeOnly,
redirect: redirect,
}
}
func (a *AccessInterceptor) AccessService() *logstore.Service[*record.AccessLog] {
return a.svc
return a.logstoreSvc
}
func (a *AccessInterceptor) Limit(ctx context.Context) bool {
if !a.svc.Enabled() || a.storeOnly {
func (a *AccessInterceptor) Limit(w http.ResponseWriter, r *http.Request, publicAuthPathPrefixes ...string) bool {
if a.storeOnly {
return false
}
ctx := r.Context()
instance := authz.GetInstance(ctx)
remaining := a.svc.Limit(ctx, instance.InstanceID())
return remaining != nil && *remaining <= 0
var deleteCookie bool
defer func() {
if deleteCookie {
a.DeleteExhaustedCookie(w)
}
}()
if block := instance.Block(); block != nil {
if *block {
a.SetExhaustedCookie(w, r)
return true
}
deleteCookie = true
}
for _, ignoredPathPrefix := range publicAuthPathPrefixes {
if strings.HasPrefix(r.RequestURI, ignoredPathPrefix) {
return false
}
}
remaining := a.logstoreSvc.Limit(ctx, instance.InstanceID())
if remaining != nil {
if remaining != nil && *remaining > 0 {
a.SetExhaustedCookie(w, r)
return true
}
deleteCookie = true
}
return false
}
func (a *AccessInterceptor) SetExhaustedCookie(writer http.ResponseWriter, request *http.Request) {
@ -81,42 +119,30 @@ func (a *AccessInterceptor) DeleteExhaustedCookie(writer http.ResponseWriter) {
a.cookieHandler.DeleteCookie(writer, a.limitConfig.ExhaustedCookieKey)
}
func (a *AccessInterceptor) HandleIgnorePathPrefixes(ignoredPathPrefixes []string) func(next http.Handler) http.Handler {
return a.handle(ignoredPathPrefixes...)
func (a *AccessInterceptor) HandleWithPublicAuthPathPrefixes(publicPathPrefixes []string) func(next http.Handler) http.Handler {
return a.handle(publicPathPrefixes...)
}
func (a *AccessInterceptor) Handle(next http.Handler) http.Handler {
return a.handle()(next)
}
func (a *AccessInterceptor) handle(ignoredPathPrefixes ...string) func(http.Handler) http.Handler {
func (a *AccessInterceptor) handle(publicAuthPathPrefixes ...string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
if !a.svc.Enabled() {
return next
}
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()
tracingCtx, checkSpan := tracing.NewNamedSpan(ctx, "checkAccessQuota")
wrappedWriter := &statusRecorder{ResponseWriter: writer, status: 0}
for _, ignoredPathPrefix := range ignoredPathPrefixes {
if !strings.HasPrefix(request.RequestURI, ignoredPathPrefix) {
continue
}
checkSpan.End()
next.ServeHTTP(wrappedWriter, request)
a.writeLog(tracingCtx, wrappedWriter, writer, request, true)
return
}
limited := a.Limit(tracingCtx)
limited := a.Limit(wrappedWriter, request.WithContext(tracingCtx), publicAuthPathPrefixes...)
checkSpan.End()
if limited {
a.SetExhaustedCookie(wrappedWriter, request)
http.Error(wrappedWriter, "quota for authenticated requests is exhausted", http.StatusTooManyRequests)
}
if !limited && !a.storeOnly {
a.DeleteExhaustedCookie(wrappedWriter)
}
if !limited {
if a.redirect != "" {
// The console guides the user when the cookie is set
http.Redirect(wrappedWriter, request, a.redirect, http.StatusFound)
} else {
http.Error(wrappedWriter, "Your ZITADEL instance is blocked.", http.StatusTooManyRequests)
}
} else {
next.ServeHTTP(wrappedWriter, request)
}
a.writeLog(tracingCtx, wrappedWriter, writer, request, a.storeOnly)
@ -125,6 +151,9 @@ func (a *AccessInterceptor) handle(ignoredPathPrefixes ...string) func(http.Hand
}
func (a *AccessInterceptor) writeLog(ctx context.Context, wrappedWriter *statusRecorder, writer http.ResponseWriter, request *http.Request, notCountable bool) {
if !a.logstoreSvc.Enabled() {
return
}
ctx, writeSpan := tracing.NewNamedSpan(ctx, "writeAccess")
defer writeSpan.End()
requestURL := request.RequestURI
@ -133,7 +162,7 @@ func (a *AccessInterceptor) writeLog(ctx context.Context, wrappedWriter *statusR
logging.WithError(err).WithField("url", requestURL).Warning("failed to unescape request url")
}
instance := authz.GetInstance(ctx)
a.svc.Handle(ctx, &record.AccessLog{
a.logstoreSvc.Handle(ctx, &record.AccessLog{
LogDate: time.Now(),
Protocol: record.HTTP,
RequestURL: unescapedURL,

View File

@ -7,6 +7,7 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"golang.org/x/text/language"
@ -299,6 +300,14 @@ func (m *mockInstanceVerifier) InstanceByID(context.Context) (authz.Instance, er
type mockInstance struct{}
func (m *mockInstance) Block() *bool {
panic("shouldn't be called here")
}
func (m *mockInstance) AuditLogRetention() *time.Duration {
panic("shouldn't be called here")
}
func (m *mockInstance) InstanceID() string {
return "instanceID"
}

View File

@ -148,14 +148,14 @@ func NewServer(
instanceHandler,
userAgentCookie,
http_utils.CopyHeadersToContext,
accessHandler.HandleIgnorePathPrefixes(ignoredQuotaLimitEndpoint(config.CustomEndpoints)),
accessHandler.HandleWithPublicAuthPathPrefixes(publicAuthPathPrefixes(config.CustomEndpoints)),
middleware.ActivityHandler,
))
return server, nil
}
func ignoredQuotaLimitEndpoint(endpoints *EndpointConfig) []string {
func publicAuthPathPrefixes(endpoints *EndpointConfig) []string {
authURL := op.DefaultEndpoints.Authorization.Relative()
keysURL := op.DefaultEndpoints.JwksURI.Relative()
if endpoints == nil {

View File

@ -63,7 +63,7 @@ func NewProvider(
middleware.NoCacheInterceptor().Handler,
instanceHandler,
userAgentCookie,
accessHandler.HandleIgnorePathPrefixes(ignoredQuotaLimitEndpoint(conf.ProviderConfig)),
accessHandler.HandleWithPublicAuthPathPrefixes(publicAuthPathPrefixes(conf.ProviderConfig)),
http_utils.CopyHeadersToContext,
middleware.ActivityHandler,
),
@ -102,7 +102,7 @@ func newStorage(
}, nil
}
func ignoredQuotaLimitEndpoint(config *provider.Config) []string {
func publicAuthPathPrefixes(config *provider.Config) []string {
metadataEndpoint := HandlerPrefix + provider.DefaultMetadataEndpoint
certificateEndpoint := HandlerPrefix + provider.DefaultCertificateEndpoint
ssoEndpoint := HandlerPrefix + provider.DefaultSingleSignOnEndpoint

View File

@ -116,17 +116,12 @@ func Start(config Config, externalSecure bool, issuer op.IssuerFromRequest, call
http.Error(w, fmt.Sprintf("unable to template instance management url for console: %v", err), http.StatusInternalServerError)
return
}
exhausted := limitingAccessInterceptor.Limit(ctx)
environmentJSON, err := createEnvironmentJSON(url, issuer(r), instance.ConsoleClientID(), customerPortal, instanceMgmtURL, exhausted)
limited := limitingAccessInterceptor.Limit(w, r)
environmentJSON, err := createEnvironmentJSON(url, issuer(r), instance.ConsoleClientID(), customerPortal, instanceMgmtURL, limited)
if err != nil {
http.Error(w, fmt.Sprintf("unable to marshal env for console: %v", err), http.StatusInternalServerError)
return
}
if exhausted {
limitingAccessInterceptor.SetExhaustedCookie(w, r)
} else {
limitingAccessInterceptor.DeleteExhaustedCookie(w)
}
_, err = w.Write(environmentJSON)
logging.OnError(err).Error("error serving environment.json")
})))

View File

@ -209,7 +209,7 @@ func (c *Commands) SetUpInstance(ctx context.Context, setup *InstanceSetup) (str
orgAgg := org.NewAggregate(orgID)
userAgg := user.NewAggregate(userID, orgID)
projectAgg := project.NewAggregate(setup.zitadel.projectID, orgID)
limitsAgg := limits.NewAggregate(setup.zitadel.limitsID, instanceID, instanceID)
limitsAgg := limits.NewAggregate(setup.zitadel.limitsID, instanceID)
restrictionsAgg := restrictions.NewAggregate(setup.zitadel.restrictionsID, instanceID, instanceID)
validations := []preparation.Validation{

View File

@ -2,6 +2,7 @@ package command
import (
"context"
"errors"
"time"
"github.com/zitadel/zitadel/internal/api/authz"
@ -14,31 +15,20 @@ import (
type SetLimits struct {
AuditLogRetention *time.Duration
Block *bool
}
// SetLimits creates new limits or updates existing limits.
func (c *Commands) SetLimits(
ctx context.Context,
resourceOwner string,
setLimits *SetLimits,
) (*domain.ObjectDetails, error) {
instanceId := authz.GetInstance(ctx).InstanceID()
wm, err := c.getLimitsWriteModel(ctx, instanceId, resourceOwner)
wm, err := c.getLimitsWriteModel(ctx, instanceId)
if err != nil {
return nil, err
}
aggregateId := wm.AggregateID
if aggregateId == "" {
aggregateId, err = c.idGenerator.Next()
if err != nil {
return nil, err
}
}
createCmds, err := c.SetLimitsCommand(limits.NewAggregate(aggregateId, instanceId, resourceOwner), wm, setLimits)()
if err != nil {
return nil, err
}
cmds, err := createCmds(ctx, nil)
cmds, err := c.setLimitsCommands(ctx, wm, setLimits)
if err != nil {
return nil, err
}
@ -52,19 +42,81 @@ func (c *Commands) SetLimits(
return nil, err
}
}
return writeModelToObjectDetails(&wm.WriteModel), nil
return writeModelToObjectDetails(&wm.WriteModel), err
}
func (c *Commands) ResetLimits(ctx context.Context, resourceOwner string) (*domain.ObjectDetails, error) {
type SetInstanceLimitsBulk struct {
InstanceID string
SetLimits
}
func (c *Commands) SetInstanceLimitsBulk(
ctx context.Context,
bulk []*SetInstanceLimitsBulk,
) (bulkDetails *domain.ObjectDetails, targetsDetails []*domain.ObjectDetails, err error) {
bulkWm, err := c.getBulkInstanceLimitsWriteModel(ctx, bulk)
if err != nil {
return nil, nil, err
}
cmds := make([]eventstore.Command, 0)
for _, t := range bulk {
targetWM, ok := bulkWm.writeModels[t.InstanceID]
if !ok {
return nil, nil, zerrors.ThrowInternal(nil, "COMMAND-5HWA9", "Errors.Limits.NotFound")
}
targetCMDs, setErr := c.setLimitsCommands(ctx, targetWM, &t.SetLimits)
err = errors.Join(err, setErr)
cmds = append(cmds, targetCMDs...)
}
if err != nil {
return nil, nil, err
}
if len(cmds) > 0 {
events, err := c.eventstore.Push(ctx, cmds...)
if err != nil {
return nil, nil, err
}
err = AppendAndReduce(bulkWm, events...)
if err != nil {
return nil, nil, err
}
}
targetDetails := make([]*domain.ObjectDetails, len(bulk))
for i, t := range bulk {
targetDetails[i] = writeModelToObjectDetails(&bulkWm.writeModels[t.InstanceID].WriteModel)
}
details := writeModelToObjectDetails(&bulkWm.WriteModel)
details.ResourceOwner = ""
return details, targetDetails, err
}
func (c *Commands) setLimitsCommands(ctx context.Context, wm *limitsWriteModel, setLimits *SetLimits) (cmds []eventstore.Command, err error) {
aggregateId := wm.AggregateID
if aggregateId == "" {
aggregateId, err = c.idGenerator.Next()
if err != nil {
return nil, err
}
}
aggregate := limits.NewAggregate(aggregateId, wm.InstanceID)
createCmds, err := c.SetLimitsCommand(aggregate, wm, setLimits)()
if err != nil {
return nil, err
}
cmds, err = createCmds(ctx, nil)
return cmds, err
}
func (c *Commands) ResetLimits(ctx context.Context) (*domain.ObjectDetails, error) {
instanceId := authz.GetInstance(ctx).InstanceID()
wm, err := c.getLimitsWriteModel(ctx, instanceId, resourceOwner)
wm, err := c.getLimitsWriteModel(ctx, instanceId)
if err != nil {
return nil, err
}
if wm.AggregateID == "" {
return nil, zerrors.ThrowNotFound(nil, "COMMAND-9JToT", "Errors.Limits.NotFound")
}
aggregate := limits.NewAggregate(wm.AggregateID, instanceId, resourceOwner)
aggregate := limits.NewAggregate(wm.AggregateID, instanceId)
events := []eventstore.Command{limits.NewResetEvent(ctx, &aggregate.Aggregate)}
pushedEvents, err := c.eventstore.Push(ctx, events...)
if err != nil {
@ -77,14 +129,22 @@ func (c *Commands) ResetLimits(ctx context.Context, resourceOwner string) (*doma
return writeModelToObjectDetails(&wm.WriteModel), nil
}
func (c *Commands) getLimitsWriteModel(ctx context.Context, instanceId, resourceOwner string) (*limitsWriteModel, error) {
wm := newLimitsWriteModel(instanceId, resourceOwner)
func (c *Commands) getLimitsWriteModel(ctx context.Context, instanceId string) (*limitsWriteModel, error) {
wm := newLimitsWriteModel(instanceId)
return wm, c.eventstore.FilterToQueryReducer(ctx, wm)
}
func (c *Commands) getBulkInstanceLimitsWriteModel(ctx context.Context, target []*SetInstanceLimitsBulk) (*limitsBulkWriteModel, error) {
wm := newLimitsBulkWriteModel()
for _, t := range target {
wm.addWriteModel(t.InstanceID)
}
return wm, c.eventstore.FilterToQueryReducer(ctx, wm)
}
func (c *Commands) SetLimitsCommand(a *limits.Aggregate, wm *limitsWriteModel, setLimits *SetLimits) preparation.Validation {
return func() (preparation.CreateCommands, error) {
if setLimits == nil || setLimits.AuditLogRetention == nil {
if setLimits == nil || (setLimits.AuditLogRetention == nil && setLimits.Block == nil) {
return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-4M9vs", "Errors.Limits.NoneSpecified")
}
return func(ctx context.Context, _ preparation.FilterToQueryReducer) ([]eventstore.Command, error) {

View File

@ -0,0 +1,55 @@
package command
import (
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/limits"
)
type limitsBulkWriteModel struct {
eventstore.WriteModel
writeModels map[string]*limitsWriteModel
filterInstanceIDs []string
}
// newLimitsBulkWriteModel should be followed by limitsBulkWriteModel.addWriteModel before querying and reducing it.
func newLimitsBulkWriteModel() *limitsBulkWriteModel {
return &limitsBulkWriteModel{
writeModels: make(map[string]*limitsWriteModel),
filterInstanceIDs: make([]string, 0),
}
}
func (wm *limitsBulkWriteModel) addWriteModel(instanceID string) {
if _, ok := wm.writeModels[instanceID]; !ok {
wm.writeModels[instanceID] = newLimitsWriteModel(instanceID)
}
wm.filterInstanceIDs = append(wm.filterInstanceIDs, instanceID)
}
func (wm *limitsBulkWriteModel) Query() *eventstore.SearchQueryBuilder {
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
InstanceIDs(wm.filterInstanceIDs).
AddQuery().
AggregateTypes(limits.AggregateType).
EventTypes(
limits.SetEventType,
limits.ResetEventType,
)
return query.Builder()
}
func (wm *limitsBulkWriteModel) Reduce() error {
for _, event := range wm.Events {
instanceID := event.Aggregate().InstanceID
limitsWm, ok := wm.writeModels[instanceID]
if !ok {
continue
}
limitsWm.AppendEvents(event)
if err := limitsWm.Reduce(); err != nil {
return err
}
}
return nil
}

View File

@ -11,14 +11,15 @@ type limitsWriteModel struct {
eventstore.WriteModel
rollingAggregateID string
auditLogRetention *time.Duration
block *bool
}
// newLimitsWriteModel aggregateId is filled by reducing unit matching events
func newLimitsWriteModel(instanceId, resourceOwner string) *limitsWriteModel {
func newLimitsWriteModel(instanceId string) *limitsWriteModel {
return &limitsWriteModel{
WriteModel: eventstore.WriteModel{
InstanceID: instanceId,
ResourceOwner: resourceOwner,
ResourceOwner: instanceId,
},
}
}
@ -46,9 +47,13 @@ func (wm *limitsWriteModel) Reduce() error {
if e.AuditLogRetention != nil {
wm.auditLogRetention = e.AuditLogRetention
}
if e.Block != nil {
wm.block = e.Block
}
case *limits.ResetEvent:
wm.rollingAggregateID = ""
wm.auditLogRetention = nil
wm.block = nil
}
}
if err := wm.WriteModel.Reduce(); err != nil {
@ -69,5 +74,8 @@ func (wm *limitsWriteModel) NewChanges(setLimits *SetLimits) (changes []limits.L
if setLimits.AuditLogRetention != nil && (wm.auditLogRetention == nil || *wm.auditLogRetention != *setLimits.AuditLogRetention) {
changes = append(changes, limits.ChangeAuditLogRetention(setLimits.AuditLogRetention))
}
if setLimits.Block != nil && (wm.block == nil || *wm.block != *setLimits.Block) {
changes = append(changes, limits.ChangeBlock(setLimits.Block))
}
return changes
}

View File

@ -21,9 +21,8 @@ import (
func TestLimits_SetLimits(t *testing.T) {
type fields func(*testing.T) (*eventstore.Eventstore, id.Generator)
type args struct {
ctx context.Context
resourceOwner string
setLimits *SetLimits
ctx context.Context
setLimits *SetLimits
}
type res struct {
want *domain.ObjectDetails
@ -47,7 +46,7 @@ func TestLimits_SetLimits(t *testing.T) {
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
@ -58,8 +57,7 @@ func TestLimits_SetLimits(t *testing.T) {
id_mock.NewIDGeneratorExpectIDs(t, "limits1")
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
resourceOwner: "instance1",
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimits: &SetLimits{
AuditLogRetention: gu.Ptr(time.Hour),
},
@ -71,7 +69,7 @@ func TestLimits_SetLimits(t *testing.T) {
},
},
{
name: "update limits, ok",
name: "update limits audit log retention, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
return eventstoreExpect(
t,
@ -80,7 +78,7 @@ func TestLimits_SetLimits(t *testing.T) {
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Minute)),
@ -93,7 +91,7 @@ func TestLimits_SetLimits(t *testing.T) {
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
@ -104,8 +102,7 @@ func TestLimits_SetLimits(t *testing.T) {
nil
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
resourceOwner: "instance1",
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimits: &SetLimits{
AuditLogRetention: gu.Ptr(time.Hour),
},
@ -116,6 +113,52 @@ func TestLimits_SetLimits(t *testing.T) {
},
},
},
{
name: "update limits unblock, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
return eventstoreExpect(
t,
expectFilter(
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Minute)),
limits.ChangeBlock(gu.Ptr(true)),
),
),
),
expectPush(
eventFromEventPusherWithInstanceID(
"instance1",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(false)),
),
),
),
),
nil
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimits: &SetLimits{
Block: gu.Ptr(false),
},
},
res: res{
want: &domain.ObjectDetails{
ResourceOwner: "instance1",
},
},
},
{
name: "set limits after resetting limits, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
@ -126,7 +169,7 @@ func TestLimits_SetLimits(t *testing.T) {
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
@ -135,7 +178,7 @@ func TestLimits_SetLimits(t *testing.T) {
eventFromEventPusher(
limits.NewResetEvent(
context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
),
),
),
@ -145,7 +188,7 @@ func TestLimits_SetLimits(t *testing.T) {
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits2", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits2", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
@ -156,8 +199,7 @@ func TestLimits_SetLimits(t *testing.T) {
id_mock.NewIDGeneratorExpectIDs(t, "limits2")
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
resourceOwner: "instance1",
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimits: &SetLimits{
AuditLogRetention: gu.Ptr(time.Hour),
},
@ -173,7 +215,7 @@ func TestLimits_SetLimits(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
r := new(Commands)
r.eventstore, r.idGenerator = tt.fields(t)
got, err := r.SetLimits(tt.args.ctx, tt.args.resourceOwner, tt.args.setLimits)
got, err := r.SetLimits(tt.args.ctx, tt.args.setLimits)
if tt.res.err == nil {
assert.NoError(t, err)
}
@ -187,11 +229,414 @@ func TestLimits_SetLimits(t *testing.T) {
}
}
func TestLimits_SetLimitsBulk(t *testing.T) {
type fields func(*testing.T) (*eventstore.Eventstore, id.Generator)
type args struct {
ctx context.Context
setLimitsBulk []*SetInstanceLimitsBulk
}
type res struct {
want *domain.ObjectDetails
wantTarget []*domain.ObjectDetails
err func(error) bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
name: "create limits, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
return eventstoreExpect(
t,
expectFilter(),
expectPush(
eventFromEventPusherWithInstanceID(
"instance1",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
),
),
),
),
id_mock.NewIDGeneratorExpectIDs(t, "limits1")
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimitsBulk: []*SetInstanceLimitsBulk{{
InstanceID: "instance1",
SetLimits: SetLimits{
AuditLogRetention: gu.Ptr(time.Hour),
},
}},
},
res: res{
want: &domain.ObjectDetails{},
wantTarget: []*domain.ObjectDetails{{
ResourceOwner: "instance1",
}},
},
},
{
name: "update limits audit log retention, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
return eventstoreExpect(
t,
expectFilter(
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Minute)),
),
),
),
expectPush(
eventFromEventPusherWithInstanceID(
"instance1",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
),
),
),
),
nil
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimitsBulk: []*SetInstanceLimitsBulk{{
InstanceID: "instance1",
SetLimits: SetLimits{
AuditLogRetention: gu.Ptr(time.Hour),
},
}},
},
res: res{
want: &domain.ObjectDetails{},
wantTarget: []*domain.ObjectDetails{{
ResourceOwner: "instance1",
}},
},
},
{
name: "update limits unblock, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
return eventstoreExpect(
t,
expectFilter(
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Minute)),
limits.ChangeBlock(gu.Ptr(true)),
),
),
),
expectPush(
eventFromEventPusherWithInstanceID(
"instance1",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(false)),
),
),
),
),
nil
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimitsBulk: []*SetInstanceLimitsBulk{{
InstanceID: "instance1",
SetLimits: SetLimits{
Block: gu.Ptr(false),
},
}},
},
res: res{
want: &domain.ObjectDetails{},
wantTarget: []*domain.ObjectDetails{{
ResourceOwner: "instance1",
}},
},
},
{
name: "set limits after resetting limits, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
return eventstoreExpect(
t,
expectFilter(
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
),
),
eventFromEventPusher(
limits.NewResetEvent(
context.Background(),
&limits.NewAggregate("limits1", "instance1").Aggregate,
),
),
),
expectPush(
eventFromEventPusherWithInstanceID(
"instance1",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits2", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
),
),
),
),
id_mock.NewIDGeneratorExpectIDs(t, "limits2")
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
setLimitsBulk: []*SetInstanceLimitsBulk{{
InstanceID: "instance1",
SetLimits: SetLimits{
AuditLogRetention: gu.Ptr(time.Hour),
},
},
},
},
res: res{
want: &domain.ObjectDetails{},
wantTarget: []*domain.ObjectDetails{{
ResourceOwner: "instance1",
}},
},
},
{
name: "set many limits, ok",
fields: func(*testing.T) (*eventstore.Eventstore, id.Generator) {
return eventstoreExpect(
t,
expectFilter(
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("add-block", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
),
),
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("switch-to-block", "instance2").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(false)),
),
),
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("already-blocked", "instance3").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(true)),
),
),
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("unblock", "instance4").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(true)),
),
),
eventFromEventPusher(
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("relimit", "instance5").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(true)),
),
),
eventFromEventPusher(
limits.NewResetEvent(
context.Background(),
&limits.NewAggregate("relimit", "instance5").Aggregate,
),
),
),
expectPush(
eventFromEventPusherWithInstanceID(
"instance0",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("create-limits", "instance0").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(true)),
),
),
eventFromEventPusherWithInstanceID(
"instance1",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("add-block", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(true)),
),
),
eventFromEventPusherWithInstanceID(
"instance2",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("switch-to-block", "instance2").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(true)),
),
),
eventFromEventPusherWithInstanceID(
"instance4",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("unblock", "instance4").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(false)),
),
),
eventFromEventPusherWithInstanceID(
"instance5",
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("relimit", "instance5").Aggregate,
limits.SetEventType,
),
limits.ChangeBlock(gu.Ptr(true)),
),
),
),
),
id_mock.NewIDGeneratorExpectIDs(t, "create-limits", "relimit")
},
args: args{
ctx: context.Background(),
setLimitsBulk: []*SetInstanceLimitsBulk{{
InstanceID: "instance0",
SetLimits: SetLimits{
Block: gu.Ptr(true),
},
}, {
InstanceID: "instance1",
SetLimits: SetLimits{
Block: gu.Ptr(true),
},
}, {
InstanceID: "instance2",
SetLimits: SetLimits{
Block: gu.Ptr(true),
},
}, {
InstanceID: "instance3",
SetLimits: SetLimits{
Block: gu.Ptr(true),
},
}, {
InstanceID: "instance4",
SetLimits: SetLimits{
Block: gu.Ptr(false),
},
}, {
InstanceID: "instance5",
SetLimits: SetLimits{
Block: gu.Ptr(true),
},
}},
},
res: res{
want: &domain.ObjectDetails{},
wantTarget: []*domain.ObjectDetails{{
ResourceOwner: "instance0",
}, {
ResourceOwner: "instance1",
}, {
ResourceOwner: "instance2",
}, {
ResourceOwner: "instance3",
}, {
ResourceOwner: "instance4",
}, {
ResourceOwner: "instance5",
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := new(Commands)
r.eventstore, r.idGenerator = tt.fields(t)
gotDetails, gotTargetDetails, err := r.SetInstanceLimitsBulk(tt.args.ctx, tt.args.setLimitsBulk)
if tt.res.err == nil {
assert.NoError(t, err)
}
if tt.res.err != nil && !tt.res.err(err) {
t.Errorf("got wrong err: %v ", err)
}
if tt.res.err == nil {
assert.Equal(t, tt.res.want, gotDetails)
assert.Equal(t, tt.res.wantTarget, gotTargetDetails)
}
})
}
}
func TestLimits_ResetLimits(t *testing.T) {
type fields func(*testing.T) *eventstore.Eventstore
type args struct {
ctx context.Context
resourceOwner string
ctx context.Context
}
type res struct {
want *domain.ObjectDetails
@ -212,8 +657,7 @@ func TestLimits_ResetLimits(t *testing.T) {
)
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
resourceOwner: "instance1",
ctx: authz.WithInstanceID(context.Background(), "instance1"),
},
res: res{
err: func(err error) bool {
@ -231,7 +675,7 @@ func TestLimits_ResetLimits(t *testing.T) {
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
@ -239,15 +683,14 @@ func TestLimits_ResetLimits(t *testing.T) {
),
eventFromEventPusher(
limits.NewResetEvent(context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
),
),
),
)
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
resourceOwner: "instance1",
ctx: authz.WithInstanceID(context.Background(), "instance1"),
},
res: res{
err: func(err error) bool {
@ -265,7 +708,7 @@ func TestLimits_ResetLimits(t *testing.T) {
limits.NewSetEvent(
eventstore.NewBaseEventForPush(
context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
limits.SetEventType,
),
limits.ChangeAuditLogRetention(gu.Ptr(time.Hour)),
@ -276,15 +719,14 @@ func TestLimits_ResetLimits(t *testing.T) {
eventFromEventPusherWithInstanceID(
"instance1",
limits.NewResetEvent(context.Background(),
&limits.NewAggregate("limits1", "instance1", "instance1").Aggregate,
&limits.NewAggregate("limits1", "instance1").Aggregate,
),
),
),
)
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance1"),
resourceOwner: "instance1",
ctx: authz.WithInstanceID(context.Background(), "instance1"),
},
res: res{
want: &domain.ObjectDetails{
@ -298,7 +740,7 @@ func TestLimits_ResetLimits(t *testing.T) {
r := &Commands{
eventstore: tt.fields(t),
}
got, err := r.ResetLimits(tt.args.ctx, tt.args.resourceOwner)
got, err := r.ResetLimits(tt.args.ctx)
if tt.res.err == nil {
assert.NoError(t, err)
}

View File

@ -203,6 +203,14 @@ func GetMockSecretGenerator(t *testing.T) crypto.Generator {
type mockInstance struct{}
func (m *mockInstance) Block() *bool {
panic("shouldn't be called here")
}
func (m *mockInstance) AuditLogRetention() *time.Duration {
panic("shouldn't be called here")
}
func (m *mockInstance) InstanceID() string {
return "INSTANCE"
}

View File

@ -21,6 +21,7 @@ type SearchQuery struct {
Desc bool
InstanceID *Filter
InstanceIDs *Filter
ExcludedInstances *Filter
Creator *Filter
Owner *Filter
@ -132,6 +133,7 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err
for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{
instanceIDFilter,
instanceIDsFilter,
excludedInstanceIDFilter,
editorUserFilter,
resourceOwnerFilter,
@ -230,6 +232,14 @@ func instanceIDFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery
return query.InstanceID
}
func instanceIDsFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetInstanceIDs() == nil {
return nil
}
query.InstanceIDs = NewFilter(FieldInstanceID, builder.GetInstanceIDs(), OperationIn)
return query.InstanceIDs
}
func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetPositionAfter() == 0 {
return nil

View File

@ -18,6 +18,7 @@ type SearchQueryBuilder struct {
desc bool
resourceOwner string
instanceID *string
instanceIDs []string
excludedInstanceIDs []string
editorUser string
queries []*SearchQuery
@ -54,6 +55,10 @@ func (b *SearchQueryBuilder) GetInstanceID() *string {
return b.instanceID
}
func (b *SearchQueryBuilder) GetInstanceIDs() []string {
return b.instanceIDs
}
func (b *SearchQueryBuilder) GetEditorUser() string {
return b.editorUser
}
@ -96,7 +101,7 @@ func (q SearchQueryBuilder) GetCreationDateBefore() time.Time {
// ensureInstanceID makes sure that the instance id is always set
func (b *SearchQueryBuilder) ensureInstanceID(ctx context.Context) {
if b.instanceID == nil && authz.GetInstance(ctx).InstanceID() != "" {
if b.instanceID == nil && len(b.instanceIDs) == 0 && authz.GetInstance(ctx).InstanceID() != "" {
b.InstanceID(authz.GetInstance(ctx).InstanceID())
}
}
@ -218,7 +223,7 @@ func (builder *SearchQueryBuilder) Offset(offset uint32) *SearchQueryBuilder {
return builder
}
// ResourceOwner defines the resource owner (org) of the events
// ResourceOwner defines the resource owner (org or instance) of the events
func (builder *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQueryBuilder {
builder.resourceOwner = resourceOwner
return builder
@ -230,6 +235,12 @@ func (builder *SearchQueryBuilder) InstanceID(instanceID string) *SearchQueryBui
return builder
}
// InstanceIDs defines the instanceIDs (system) of the events
func (builder *SearchQueryBuilder) InstanceIDs(instanceIDs []string) *SearchQueryBuilder {
builder.instanceIDs = instanceIDs
return builder
}
// OrderDesc changes the sorting order of the returned events to descending
func (builder *SearchQueryBuilder) OrderDesc() *SearchQueryBuilder {
builder.desc = true

View File

@ -14,6 +14,7 @@ services:
- PGUSER=zitadel
- POSTGRES_DB=zitadel
- POSTGRES_HOST_AUTH_METHOD=trust
command: postgres -c shared_preload_libraries=pg_stat_statements -c pg_stat_statements.track=all
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: '10s'

View File

@ -26,14 +26,6 @@ LogStore:
Stdout:
Enabled: true
Quotas:
Access:
Enabled: true
ExhaustedCookieKey: "zitadel.quota.limiting"
ExhaustedCookieMaxAge: "60s"
Execution:
Enabled: true
Projections:
HandleActiveInstances: 60s
Customizations:

View File

@ -261,6 +261,14 @@ func (s *Tester) WithAuthorizationToken(ctx context.Context, token string) conte
return metadata.NewOutgoingContext(ctx, md)
}
func (s *Tester) BearerToken(ctx context.Context) string {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
return ""
}
return md.Get("Authorization")[0]
}
func (s *Tester) ensureSystemUser() {
const ISSUER = "tester"
if s.Users.Get(FirstInstanceUsersKey, SystemUser) != nil {
@ -303,16 +311,18 @@ func (s *Tester) Done() {
//
// Note: the database must already be setup and initialized before
// using NewTester. See the CONTRIBUTING.md document for details.
func NewTester(ctx context.Context) *Tester {
func NewTester(ctx context.Context, zitadelConfigYAML ...string) *Tester {
args := strings.Split(commandLine, " ")
sc := make(chan *start.Server)
//nolint:contextcheck
cmd := cmd.New(os.Stdout, os.Stdin, args, sc)
cmd.SetArgs(args)
err := viper.MergeConfig(bytes.NewBuffer(zitadelYAML))
logging.OnError(err).Fatal()
for _, yaml := range append([]string{string(zitadelYAML)}, zitadelConfigYAML...) {
err := viper.MergeConfig(bytes.NewBuffer([]byte(yaml)))
logging.OnError(err).Fatal()
}
var err error
flavor := os.Getenv("INTEGRATION_DB_FLAVOR")
switch flavor {
case "cockroach", "":

View File

@ -23,7 +23,11 @@ func TestMain(m *testing.M) {
defer cancel()
CTX = ctx
Tester = integration.NewTester(ctx)
Tester = integration.NewTester(ctx, `
Quotas:
Access:
Enabled: true
`)
defer Tester.Done()
SystemCTX = Tester.WithAuthorization(ctx, integration.SystemUser)

View File

@ -8,7 +8,6 @@ import (
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
)
type Event struct {
@ -44,12 +43,8 @@ func (q *Queries) SearchEvents(ctx context.Context, query *eventstore.SearchQuer
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
auditLogRetention := q.defaultAuditLogRetention
instanceLimits, err := q.Limits(ctx, authz.GetInstance(ctx).InstanceID())
if err != nil && !zerrors.IsNotFound(err) {
return nil, err
}
if instanceLimits != nil && instanceLimits.AuditLogRetention != nil {
auditLogRetention = *instanceLimits.AuditLogRetention
if instanceAuditLogRetention := authz.GetInstance(ctx).AuditLogRetention(); instanceAuditLogRetention != nil {
auditLogRetention = *instanceAuditLogRetention
}
if auditLogRetention != 0 {
query = filterAuditLogRetention(ctx, auditLogRetention, query)

View File

@ -29,6 +29,10 @@ var (
name: projection.InstanceProjectionTable,
instanceIDCol: projection.InstanceColumnID,
}
limitsTable = table{
name: projection.LimitsProjectionTable,
instanceIDCol: projection.LimitsColumnInstanceID,
}
InstanceColumnID = Column{
name: projection.InstanceColumnID,
table: instanceTable,
@ -69,6 +73,18 @@ var (
name: projection.InstanceColumnDefaultLanguage,
table: instanceTable,
}
LimitsColumnInstanceID = Column{
name: projection.LimitsColumnInstanceID,
table: limitsTable,
}
LimitsColumnAuditLogRetention = Column{
name: projection.LimitsColumnAuditLogRetention,
table: limitsTable,
}
LimitsColumnBlock = Column{
name: projection.LimitsColumnBlock,
table: limitsTable,
}
)
type Instance struct {
@ -78,14 +94,16 @@ type Instance struct {
Sequence uint64
Name string
DefaultOrgID string
IAMProjectID string
ConsoleID string
ConsoleAppID string
DefaultLang language.Tag
Domains []*InstanceDomain
host string
csp csp
DefaultOrgID string
IAMProjectID string
ConsoleID string
ConsoleAppID string
DefaultLang language.Tag
Domains []*InstanceDomain
host string
csp csp
block *bool
auditLogRetention *time.Duration
}
type csp struct {
@ -137,6 +155,14 @@ func (i *Instance) SecurityPolicyAllowedOrigins() []string {
return i.csp.allowedOrigins
}
func (i *Instance) Block() *bool {
return i.block
}
func (i *Instance) AuditLogRetention() *time.Duration {
return i.auditLogRetention
}
type InstanceSearchQueries struct {
SearchRequest
Queries []SearchQuery
@ -260,8 +286,10 @@ func prepareInstanceQuery(ctx context.Context, db prepareDatabase, host string)
From(instanceTable.identifier() + db.Timetravel(call.Took(ctx))).
PlaceholderFormat(sq.Dollar),
func(row *sql.Row) (*Instance, error) {
instance := &Instance{host: host}
lang := ""
var (
instance = &Instance{host: host}
lang = ""
)
err := row.Scan(
&instance.ID,
&instance.CreationDate,
@ -491,10 +519,13 @@ func prepareAuthzInstanceQuery(ctx context.Context, db prepareDatabase, host str
InstanceDomainSequenceCol.identifier(),
SecurityPolicyColumnEnabled.identifier(),
SecurityPolicyColumnAllowedOrigins.identifier(),
LimitsColumnAuditLogRetention.identifier(),
LimitsColumnBlock.identifier(),
).
From(instanceTable.identifier()).
LeftJoin(join(InstanceDomainInstanceIDCol, InstanceColumnID)).
LeftJoin(join(SecurityPolicyColumnInstanceID, InstanceColumnID) + db.Timetravel(call.Took(ctx))).
LeftJoin(join(SecurityPolicyColumnInstanceID, InstanceColumnID)).
LeftJoin(join(LimitsColumnInstanceID, InstanceColumnID) + db.Timetravel(call.Took(ctx))).
PlaceholderFormat(sq.Dollar),
func(rows *sql.Rows) (*Instance, error) {
instance := &Instance{
@ -511,6 +542,8 @@ func prepareAuthzInstanceQuery(ctx context.Context, db prepareDatabase, host str
creationDate sql.NullTime
sequence sql.NullInt64
securityPolicyEnabled sql.NullBool
auditLogRetention database.NullDuration
block sql.NullBool
)
err := rows.Scan(
&instance.ID,
@ -531,6 +564,8 @@ func prepareAuthzInstanceQuery(ctx context.Context, db prepareDatabase, host str
&sequence,
&securityPolicyEnabled,
&instance.csp.allowedOrigins,
&auditLogRetention,
&block,
)
if err != nil {
return nil, zerrors.ThrowInternal(err, "QUERY-d3fas", "Errors.Internal")
@ -547,6 +582,12 @@ func prepareAuthzInstanceQuery(ctx context.Context, db prepareDatabase, host str
IsGenerated: isGenerated.Bool,
InstanceID: instance.ID,
})
if auditLogRetention.Valid {
instance.auditLogRetention = &auditLogRetention.Duration
}
if block.Valid {
instance.block = &block.Bool
}
instance.csp.enabled = securityPolicyEnabled.Bool
}
if instance.ID == "" {

View File

@ -1,119 +0,0 @@
package query
import (
"context"
"database/sql"
"errors"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
)
var (
limitSettingsTable = table{
name: projection.LimitsProjectionTable,
instanceIDCol: projection.LimitsColumnInstanceID,
}
LimitsColumnAggregateID = Column{
name: projection.LimitsColumnAggregateID,
table: limitSettingsTable,
}
LimitsColumnCreationDate = Column{
name: projection.LimitsColumnCreationDate,
table: limitSettingsTable,
}
LimitsColumnChangeDate = Column{
name: projection.LimitsColumnChangeDate,
table: limitSettingsTable,
}
LimitsColumnResourceOwner = Column{
name: projection.LimitsColumnResourceOwner,
table: limitSettingsTable,
}
LimitsColumnInstanceID = Column{
name: projection.LimitsColumnInstanceID,
table: limitSettingsTable,
}
LimitsColumnSequence = Column{
name: projection.LimitsColumnSequence,
table: limitSettingsTable,
}
LimitsColumnAuditLogRetention = Column{
name: projection.LimitsColumnAuditLogRetention,
table: limitSettingsTable,
}
)
type Limits struct {
AggregateID string
CreationDate time.Time
ChangeDate time.Time
ResourceOwner string
Sequence uint64
AuditLogRetention *time.Duration
}
func (q *Queries) Limits(ctx context.Context, resourceOwner string) (limits *Limits, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
stmt, scan := prepareLimitsQuery(ctx, q.client)
query, args, err := stmt.Where(sq.Eq{
LimitsColumnInstanceID.identifier(): authz.GetInstance(ctx).InstanceID(),
LimitsColumnResourceOwner.identifier(): resourceOwner,
}).ToSql()
if err != nil {
return nil, zerrors.ThrowInternal(err, "QUERY-jJe80", "Errors.Query.SQLStatment")
}
err = q.client.QueryRowContext(ctx, func(row *sql.Row) error {
limits, err = scan(row)
return err
}, query, args...)
return limits, err
}
func prepareLimitsQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(*sql.Row) (*Limits, error)) {
return sq.Select(
LimitsColumnAggregateID.identifier(),
LimitsColumnCreationDate.identifier(),
LimitsColumnChangeDate.identifier(),
LimitsColumnResourceOwner.identifier(),
LimitsColumnSequence.identifier(),
LimitsColumnAuditLogRetention.identifier(),
).
From(limitSettingsTable.identifier() + db.Timetravel(call.Took(ctx))).
PlaceholderFormat(sq.Dollar),
func(row *sql.Row) (*Limits, error) {
var (
limits = new(Limits)
auditLogRetention database.NullDuration
)
err := row.Scan(
&limits.AggregateID,
&limits.CreationDate,
&limits.ChangeDate,
&limits.ResourceOwner,
&limits.Sequence,
&auditLogRetention,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, zerrors.ThrowNotFound(err, "QUERY-GU1em", "Errors.Limits.NotFound")
}
return nil, zerrors.ThrowInternal(err, "QUERY-00jgy", "Errors.Internal")
}
if auditLogRetention.Valid {
limits.AuditLogRetention = &auditLogRetention.Duration
}
return limits, nil
}
}

View File

@ -1,116 +0,0 @@
package query
import (
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"regexp"
"testing"
"time"
"github.com/muhlemmer/gu"
"github.com/zitadel/zitadel/internal/zerrors"
)
var (
expectedLimitsQuery = regexp.QuoteMeta("SELECT projections.limits.aggregate_id," +
" projections.limits.creation_date," +
" projections.limits.change_date," +
" projections.limits.resource_owner," +
" projections.limits.sequence," +
" projections.limits.audit_log_retention" +
" FROM projections.limits" +
" AS OF SYSTEM TIME '-1 ms'",
)
limitsCols = []string{
"aggregate_id",
"creation_date",
"change_date",
"resource_owner",
"sequence",
"audit_log_retention",
}
)
func Test_LimitsPrepare(t *testing.T) {
type want struct {
sqlExpectations sqlExpectation
err checkErr
}
tests := []struct {
name string
prepare interface{}
want want
object interface{}
}{
{
name: "prepareLimitsQuery no result",
prepare: prepareLimitsQuery,
want: want{
sqlExpectations: mockQueriesScanErr(
expectedLimitsQuery,
nil,
nil,
),
err: func(err error) (error, bool) {
if !zerrors.IsNotFound(err) {
return fmt.Errorf("err should be zitadel.NotFoundError got: %w", err), false
}
return nil, true
},
},
object: (*Limits)(nil),
},
{
name: "prepareLimitsQuery",
prepare: prepareLimitsQuery,
want: want{
sqlExpectations: mockQuery(
expectedLimitsQuery,
limitsCols,
[]driver.Value{
"limits1",
testNow,
testNow,
"instance1",
0,
intervalDriverValue(t, time.Hour),
},
),
},
object: &Limits{
AggregateID: "limits1",
CreationDate: testNow,
ChangeDate: testNow,
ResourceOwner: "instance1",
Sequence: 0,
AuditLogRetention: gu.Ptr(time.Hour),
},
},
{
name: "prepareLimitsQuery sql err",
prepare: prepareLimitsQuery,
want: want{
sqlExpectations: mockQueryErr(
expectedLimitsQuery,
sql.ErrConnDone,
),
err: func(err error) (error, bool) {
if !errors.Is(err, sql.ErrConnDone) {
return fmt.Errorf("err should be sql.ErrConnDone got: %w", err), false
}
return nil, true
},
},
object: (*Limits)(nil),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assertPrepare(t, tt.prepare, tt.object, tt.want.sqlExpectations, tt.want.err, defaultPrepareArgs...)
})
}
}

View File

@ -21,6 +21,7 @@ const (
LimitsColumnSequence = "sequence"
LimitsColumnAuditLogRetention = "audit_log_retention"
LimitsColumnBlock = "block"
)
type limitsProjection struct{}
@ -43,6 +44,7 @@ func (*limitsProjection) Init() *old_handler.Check {
handler.NewColumn(LimitsColumnInstanceID, handler.ColumnTypeText),
handler.NewColumn(LimitsColumnSequence, handler.ColumnTypeInt64),
handler.NewColumn(LimitsColumnAuditLogRetention, handler.ColumnTypeInterval, handler.Nullable()),
handler.NewColumn(LimitsColumnBlock, handler.ColumnTypeBool, handler.Nullable()),
},
handler.NewPrimaryKey(LimitsColumnInstanceID, LimitsColumnResourceOwner),
),
@ -96,6 +98,9 @@ func (p *limitsProjection) reduceLimitsSet(event eventstore.Event) (*handler.Sta
if e.AuditLogRetention != nil {
updateCols = append(updateCols, handler.NewCol(LimitsColumnAuditLogRetention, *e.AuditLogRetention))
}
if e.Block != nil {
updateCols = append(updateCols, handler.NewCol(LimitsColumnBlock, *e.Block))
}
return handler.NewUpsertStatement(e, conflictCols, updateCols), nil
}

View File

@ -21,7 +21,7 @@ func TestLimitsProjection_reduces(t *testing.T) {
want wantReduce
}{
{
name: "reduceLimitsSet",
name: "reduceLimitsSet auditLogRetention",
args: args{
event: getEvent(testEvent(
limits.SetEventType,
@ -53,7 +53,107 @@ func TestLimitsProjection_reduces(t *testing.T) {
},
},
},
{
name: "reduceLimitsSet block true",
args: args{
event: getEvent(testEvent(
limits.SetEventType,
limits.AggregateType,
[]byte(`{
"block": true
}`),
), limits.SetEventMapper),
},
reduce: (&limitsProjection{}).reduceLimitsSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("limits"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.limits (instance_id, resource_owner, creation_date, change_date, sequence, aggregate_id, block) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, resource_owner) DO UPDATE SET (creation_date, change_date, sequence, aggregate_id, block) = (EXCLUDED.creation_date, EXCLUDED.change_date, EXCLUDED.sequence, EXCLUDED.aggregate_id, EXCLUDED.block)",
expectedArgs: []interface{}{
"instance-id",
"ro-id",
anyArg{},
anyArg{},
uint64(15),
"agg-id",
true,
},
},
},
},
},
},
{
name: "reduceLimitsSet block false",
args: args{
event: getEvent(testEvent(
limits.SetEventType,
limits.AggregateType,
[]byte(`{
"block": false
}`),
), limits.SetEventMapper),
},
reduce: (&limitsProjection{}).reduceLimitsSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("limits"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.limits (instance_id, resource_owner, creation_date, change_date, sequence, aggregate_id, block) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (instance_id, resource_owner) DO UPDATE SET (creation_date, change_date, sequence, aggregate_id, block) = (EXCLUDED.creation_date, EXCLUDED.change_date, EXCLUDED.sequence, EXCLUDED.aggregate_id, EXCLUDED.block)",
expectedArgs: []interface{}{
"instance-id",
"ro-id",
anyArg{},
anyArg{},
uint64(15),
"agg-id",
false,
},
},
},
},
},
},
{
name: "reduceLimitsSet all",
args: args{
event: getEvent(testEvent(
limits.SetEventType,
limits.AggregateType,
[]byte(`{
"auditLogRetention": 300000000000,
"block": true
}`),
), limits.SetEventMapper),
},
reduce: (&limitsProjection{}).reduceLimitsSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("limits"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.limits (instance_id, resource_owner, creation_date, change_date, sequence, aggregate_id, audit_log_retention, block) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (instance_id, resource_owner) DO UPDATE SET (creation_date, change_date, sequence, aggregate_id, audit_log_retention, block) = (EXCLUDED.creation_date, EXCLUDED.change_date, EXCLUDED.sequence, EXCLUDED.aggregate_id, EXCLUDED.audit_log_retention, EXCLUDED.block)",
expectedArgs: []interface{}{
"instance-id",
"ro-id",
anyArg{},
anyArg{},
uint64(15),
"agg-id",
time.Minute * 5,
true,
},
},
},
},
},
},
{
name: "reduceLimitsReset",
args: args{

View File

@ -13,14 +13,14 @@ type Aggregate struct {
eventstore.Aggregate
}
func NewAggregate(id, instanceId, resourceOwner string) *Aggregate {
func NewAggregate(id, instanceId string) *Aggregate {
return &Aggregate{
Aggregate: eventstore.Aggregate{
Type: AggregateType,
Version: AggregateVersion,
ID: id,
InstanceID: instanceId,
ResourceOwner: resourceOwner,
ResourceOwner: instanceId,
},
}
}

View File

@ -17,6 +17,7 @@ const (
type SetEvent struct {
*eventstore.BaseEvent `json:"-"`
AuditLogRetention *time.Duration `json:"auditLogRetention,omitempty"`
Block *bool `json:"block,omitempty"`
}
func (e *SetEvent) Payload() any {
@ -52,6 +53,12 @@ func ChangeAuditLogRetention(auditLogRetention *time.Duration) LimitsChange {
}
}
func ChangeBlock(block *bool) LimitsChange {
return func(e *SetEvent) {
e.Block = block
}
}
var SetEventMapper = eventstore.GenericEventMapper[SetEvent]
type ResetEvent struct {

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Лимитът не е намерен
NoneSpecified: Не са посочени лимити
Instance:
Blocked: Инстанцията е блокирана
Restrictions:
NoneSpecified: Не са посочени ограничения
DefaultLanguageMustBeAllowed: Езикът по подразбиране трябва да бъде разрешен

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limity nebyly nalezeny
NoneSpecified: Nebyly určeny žádné limity
Instance:
Blocked: Instance je blokována
Restrictions:
NoneSpecified: Nebyla určena žádná omezení
DefaultLanguageMustBeAllowed: Výchozí jazyk musí být povolen

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limits konnten nicht gefunden werden
NoneSpecified: Keine Limits angegeben
Instance:
Blocked: Instanz ist blockiert
Restrictions:
NoneSpecified: Keine Restriktionen angegeben
DefaultLanguageMustBeAllowed: Default Sprache muss erlaubt sein

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limits not found
NoneSpecified: No limits specified
Instance:
Blocked: Instance is blocked
Restrictions:
NoneSpecified: No restrictions specified
DefaultLanguageMustBeAllowed: The default language must be allowed

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Límite no encontrado
NoneSpecified: No se especificaron límites
Instance:
Blocked: La instancia está bloqueada
Restrictions:
NoneSpecified: No se especificaron restricciones
DefaultLanguageMustBeAllowed: El idioma por defecto debe estar permitido

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limites non trouvée
NoneSpecified: Aucune limite spécifiée
Instance:
Blocked: Instance bloquée
Restrictions:
NoneSpecified: Aucune restriction spécifiée
DefaultLanguageMustBeAllowed: La langue par défaut doit être autorisée

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limite non trovato
NoneSpecified: Nessun limite specificato
Instance:
Blocked: L'istanza è bloccata
Restrictions:
NoneSpecified: Nessuna restrizione specificata
DefaultLanguageMustBeAllowed: La lingua predefinita deve essere consentita

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: 制限が見つかりません
NoneSpecified: 制限が指定されていません
Instance:
Blocked: インスタンスはブロックされています
Restrictions:
NoneSpecified: 制限が指定されていません
DefaultLanguageMustBeAllowed: デフォルト言語は許可されている必要があります

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Лимитот не е пронајден
NoneSpecified: Не се наведени лимити
Instance:
Blocked: Инстанцата е блокирана
Restrictions:
NoneSpecified: Не се наведени ограничувања
DefaultLanguageMustBeAllowed: Стандардниот јазик мора да биде дозволен

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limieten niet gevonden
NoneSpecified: Geen limieten gespecificeerd
Instance:
Blocked: Instantie is geblokkeerd
Restrictions:
NoneSpecified: Geen beperkingen gespecificeerd
DefaultLanguageMustBeAllowed: De standaardtaal moet worden toegestaan

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limit nie znaleziony
NoneSpecified: Nie określono limitów
Instance:
Blocked: Instancja jest zablokowana
Restrictions:
NoneSpecified: Nie określono ograniczeń
DefaultLanguageMustBeAllowed: Domyślny język musi być dozwolony

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Limite não encontrado
NoneSpecified: Nenhum limite especificado
Instance:
Blocked: A instância está bloqueada
Restrictions:
NoneSpecified: Nenhuma restrição especificada
DefaultLanguageMustBeAllowed: O idioma padrão deve ser permitido

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: Лимиты не найдены
NoneSpecified: Не указаны лимиты
Instance:
Blocked: Экземпляр заблокирован
Restrictions:
NoneSpecified: Не указаны ограничения
DefaultLanguageMustBeAllowed: Язык по умолчанию должен быть разрешен

View File

@ -31,6 +31,8 @@ Errors:
Limits:
NotFound: 未找到限制
NoneSpecified: 未指定限制
Instance:
Blocked: 实例被阻止
Restrictions:
NoneSpecified: 未指定限制
DefaultLanguageMustBeAllowed: 默认语言必须被允许

View File

@ -458,6 +458,41 @@ service SystemService {
};
}
// Sets many instance level limits
rpc BulkSetLimits(BulkSetLimitsRequest) returns (BulkSetLimitsResponse) {
option (google.api.http) = {
put: "/instances/limits/_bulk"
body: "*"
};
option (zitadel.v1.auth_option) = {
permission: "system.limits.write";
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
tags: ["Usage Control", "Limits"];
responses: {
key: "200";
value: {
description: "Instance limits set";
};
};
responses: {
key: "400";
value: {
description: "At least one limit must be specified for each instance";
schema: {
json_schema: {
ref: "#/definitions/rpcStatus";
};
};
};
};
};
}
// Resets instance level limits
rpc ResetLimits(ResetLimitsRequest) returns (ResetLimitsResponse) {
option (google.api.http) = {
@ -769,12 +804,27 @@ message SetLimitsRequest {
description: "auditLogRetention limits the number of events that can be queried via the events API by their age. A value of '0s' means that all events are available. If this value is set, it overwrites the system default.";
}
];
optional bool block = 3 [
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "if block is true, requests are responded with a resource exhausted error code.";
}
];
}
message SetLimitsResponse {
zitadel.v1.ObjectDetails details = 1;
}
message BulkSetLimitsRequest {
repeated SetLimitsRequest limits = 1;
}
message BulkSetLimitsResponse {
zitadel.v1.ObjectDetails details = 1;
repeated zitadel.v1.ObjectDetails target_details = 2;
}
message ResetLimitsRequest {
string instance_id = 1 [(validate.rules).string = {min_len: 1, max_len: 200}];
}