mirror of
https://github.com/zitadel/zitadel.git
synced 2025-10-11 00:55:50 +00:00
project limits to instances
This commit is contained in:
@@ -45,7 +45,6 @@ import (
|
||||
http_util "github.com/zitadel/zitadel/internal/api/http"
|
||||
"github.com/zitadel/zitadel/internal/api/http/middleware"
|
||||
"github.com/zitadel/zitadel/internal/api/idp"
|
||||
"github.com/zitadel/zitadel/internal/api/limits"
|
||||
"github.com/zitadel/zitadel/internal/api/oidc"
|
||||
"github.com/zitadel/zitadel/internal/api/robots_txt"
|
||||
"github.com/zitadel/zitadel/internal/api/saml"
|
||||
@@ -353,9 +352,8 @@ func startAPIs(
|
||||
http_util.WithNonHttpOnly(),
|
||||
http_util.WithMaxAge(int(math.Floor(config.Quotas.Access.ExhaustedCookieMaxAge.Seconds()))),
|
||||
)
|
||||
limitsLoader := limits.NewLoader(queries)
|
||||
limitingAccessInterceptor := middleware.NewAccessInterceptor(accessSvc, limitsLoader, exhaustedCookieHandler, &config.Quotas.Access.AccessConfig)
|
||||
apis, err := api.New(ctx, config.Port, router, queries, verifier, config.InternalAuthZ, tlsConfig, config.HTTP2HostHeader, config.HTTP1HostHeader, limitingAccessInterceptor, limitsLoader)
|
||||
limitingAccessInterceptor := middleware.NewAccessInterceptor(accessSvc, exhaustedCookieHandler, &config.Quotas.Access.AccessConfig)
|
||||
apis, err := api.New(ctx, config.Port, router, queries, verifier, config.InternalAuthZ, tlsConfig, config.HTTP2HostHeader, config.HTTP1HostHeader, limitingAccessInterceptor)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating api %w", err)
|
||||
}
|
||||
|
@@ -18,7 +18,6 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/api/grpc/server"
|
||||
http_util "github.com/zitadel/zitadel/internal/api/http"
|
||||
http_mw "github.com/zitadel/zitadel/internal/api/http/middleware"
|
||||
"github.com/zitadel/zitadel/internal/api/limits"
|
||||
"github.com/zitadel/zitadel/internal/api/ui/login"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/telemetry/metrics"
|
||||
@@ -52,7 +51,6 @@ func New(
|
||||
authZ internal_authz.Config,
|
||||
tlsConfig *tls.Config, http2HostName, http1HostName string,
|
||||
accessInterceptor *http_mw.AccessInterceptor,
|
||||
limitsLoader *limits.Loader,
|
||||
) (_ *API, err error) {
|
||||
api := &API{
|
||||
port: port,
|
||||
@@ -64,7 +62,7 @@ func New(
|
||||
accessInterceptor: accessInterceptor,
|
||||
}
|
||||
|
||||
api.grpcServer = server.CreateServer(api.verifier, authZ, queries, http2HostName, tlsConfig, accessInterceptor.AccessService(), limitsLoader)
|
||||
api.grpcServer = server.CreateServer(api.verifier, authZ, queries, http2HostName, tlsConfig, accessInterceptor.AccessService())
|
||||
api.grpcGateway, err = server.CreateGateway(ctx, port, http1HostName, accessInterceptor, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -2,6 +2,7 @@ package authz
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/text/language"
|
||||
)
|
||||
@@ -20,6 +21,8 @@ type Instance interface {
|
||||
DefaultLanguage() language.Tag
|
||||
DefaultOrganisationID() string
|
||||
SecurityPolicyAllowedOrigins() []string
|
||||
Block() *bool
|
||||
AuditLogRetention() *time.Duration
|
||||
}
|
||||
|
||||
type InstanceVerifier interface {
|
||||
@@ -36,6 +39,14 @@ type instance struct {
|
||||
orgID string
|
||||
}
|
||||
|
||||
func (i *instance) Block() *bool {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *instance) AuditLogRetention() *time.Duration {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *instance) InstanceID() string {
|
||||
return i.id
|
||||
}
|
||||
|
@@ -7,31 +7,25 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/api/limits"
|
||||
"github.com/zitadel/zitadel/internal/telemetry/tracing"
|
||||
"github.com/zitadel/zitadel/internal/zerrors"
|
||||
)
|
||||
|
||||
func LimitsInterceptor(limitsLoader *limits.Loader, ignoreService ...string) grpc.UnaryServerInterceptor {
|
||||
func LimitsInterceptor(ignoreService ...string) grpc.UnaryServerInterceptor {
|
||||
for idx, service := range ignoreService {
|
||||
if !strings.HasPrefix(service, "/") {
|
||||
ignoreService[idx] = "/" + service
|
||||
}
|
||||
}
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
|
||||
interceptorCtx, span := tracing.NewServerInterceptorSpan(ctx)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
for _, service := range ignoreService {
|
||||
if strings.HasPrefix(info.FullMethod, service) {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
instance := authz.GetInstance(ctx)
|
||||
ctx, l := limitsLoader.Load(interceptorCtx, instance.InstanceID())
|
||||
if l.Block != nil && *l.Block {
|
||||
if block := instance.Block(); block != nil && *block {
|
||||
return nil, zerrors.ThrowResourceExhausted(nil, "LIMITS-molsj", "Errors.Limits.Instance.Blocked")
|
||||
}
|
||||
span.End()
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
@@ -11,7 +11,6 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
grpc_api "github.com/zitadel/zitadel/internal/api/grpc"
|
||||
"github.com/zitadel/zitadel/internal/api/grpc/server/middleware"
|
||||
"github.com/zitadel/zitadel/internal/api/limits"
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/logstore/record"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
@@ -42,7 +41,6 @@ func CreateServer(
|
||||
hostHeaderName string,
|
||||
tlsConfig *tls.Config,
|
||||
accessSvc *logstore.Service[*record.AccessLog],
|
||||
limitsLoader *limits.Loader,
|
||||
) *grpc.Server {
|
||||
metricTypes := []metrics.MetricType{metrics.MetricTypeTotalCount, metrics.MetricTypeRequestCount, metrics.MetricTypeStatusCode}
|
||||
serverOptions := []grpc.ServerOption{
|
||||
@@ -55,9 +53,9 @@ func CreateServer(
|
||||
middleware.InstanceInterceptor(queries, hostHeaderName, system_pb.SystemService_ServiceDesc.ServiceName, healthpb.Health_ServiceDesc.ServiceName),
|
||||
middleware.AccessStorageInterceptor(accessSvc),
|
||||
middleware.ErrorHandler(),
|
||||
middleware.TranslationHandler(),
|
||||
middleware.LimitsInterceptor(limitsLoader, system_pb.SystemService_ServiceDesc.ServiceName),
|
||||
middleware.LimitsInterceptor(system_pb.SystemService_ServiceDesc.ServiceName),
|
||||
middleware.AuthorizationInterceptor(verifier, authConfig),
|
||||
middleware.TranslationHandler(),
|
||||
middleware.QuotaExhaustedInterceptor(accessSvc, system_pb.SystemService_ServiceDesc.ServiceName),
|
||||
middleware.ValidationHandler(),
|
||||
middleware.ServiceHandler(),
|
||||
|
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/api/grpc/server/middleware"
|
||||
http_utils "github.com/zitadel/zitadel/internal/api/http"
|
||||
"github.com/zitadel/zitadel/internal/api/limits"
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/logstore/record"
|
||||
"github.com/zitadel/zitadel/internal/telemetry/tracing"
|
||||
@@ -23,7 +22,6 @@ type AccessInterceptor struct {
|
||||
logstoreSvc *logstore.Service[*record.AccessLog]
|
||||
cookieHandler *http_utils.CookieHandler
|
||||
limitConfig *AccessConfig
|
||||
limitsLoader *limits.Loader
|
||||
storeOnly bool
|
||||
redirect string
|
||||
}
|
||||
@@ -36,12 +34,11 @@ type AccessConfig struct {
|
||||
// NewAccessInterceptor intercepts all requests and stores them to the logstore.
|
||||
// If storeOnly is false, it also checks if requests are exhausted.
|
||||
// If requests are exhausted, it also returns http.StatusTooManyRequests or a redirect to the given path and sets a cookie
|
||||
func NewAccessInterceptor(svc *logstore.Service[*record.AccessLog], limitsLoader *limits.Loader, cookieHandler *http_utils.CookieHandler, cookieConfig *AccessConfig) *AccessInterceptor {
|
||||
func NewAccessInterceptor(svc *logstore.Service[*record.AccessLog], cookieHandler *http_utils.CookieHandler, cookieConfig *AccessConfig) *AccessInterceptor {
|
||||
return &AccessInterceptor{
|
||||
logstoreSvc: svc,
|
||||
cookieHandler: cookieHandler,
|
||||
limitConfig: cookieConfig,
|
||||
limitsLoader: limitsLoader,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +47,6 @@ func (a *AccessInterceptor) WithoutLimiting() *AccessInterceptor {
|
||||
logstoreSvc: a.logstoreSvc,
|
||||
cookieHandler: a.cookieHandler,
|
||||
limitConfig: a.limitConfig,
|
||||
limitsLoader: a.limitsLoader,
|
||||
storeOnly: true,
|
||||
redirect: a.redirect,
|
||||
}
|
||||
@@ -61,7 +57,6 @@ func (a *AccessInterceptor) WithRedirect(redirect string) *AccessInterceptor {
|
||||
logstoreSvc: a.logstoreSvc,
|
||||
cookieHandler: a.cookieHandler,
|
||||
limitConfig: a.limitConfig,
|
||||
limitsLoader: a.limitsLoader,
|
||||
storeOnly: a.storeOnly,
|
||||
redirect: redirect,
|
||||
}
|
||||
@@ -71,41 +66,39 @@ func (a *AccessInterceptor) AccessService() *logstore.Service[*record.AccessLog]
|
||||
return a.logstoreSvc
|
||||
}
|
||||
|
||||
func (a *AccessInterceptor) Limit(w http.ResponseWriter, r *http.Request, publicAuthPathPrefixes ...string) (*http.Request, bool) {
|
||||
func (a *AccessInterceptor) Limit(w http.ResponseWriter, r *http.Request, publicAuthPathPrefixes ...string) bool {
|
||||
if a.storeOnly {
|
||||
return r, false
|
||||
return false
|
||||
}
|
||||
ctx := r.Context()
|
||||
instanceID := authz.GetInstance(ctx).InstanceID()
|
||||
newCtx, instanceLimits := a.limitsLoader.Load(ctx, instanceID)
|
||||
newR := r.WithContext(newCtx)
|
||||
instance := authz.GetInstance(ctx)
|
||||
var deleteCookie bool
|
||||
defer func() {
|
||||
if deleteCookie {
|
||||
a.DeleteExhaustedCookie(w)
|
||||
}
|
||||
}()
|
||||
if instanceLimits.Block != nil {
|
||||
if *instanceLimits.Block {
|
||||
if block := instance.Block(); block != nil {
|
||||
if *block {
|
||||
a.SetExhaustedCookie(w, r)
|
||||
return newR, true
|
||||
return true
|
||||
}
|
||||
deleteCookie = true
|
||||
}
|
||||
for _, ignoredPathPrefix := range publicAuthPathPrefixes {
|
||||
if strings.HasPrefix(r.RequestURI, ignoredPathPrefix) {
|
||||
return newR, false
|
||||
return false
|
||||
}
|
||||
}
|
||||
remaining := a.logstoreSvc.Limit(ctx, instanceID)
|
||||
remaining := a.logstoreSvc.Limit(ctx, instance.InstanceID())
|
||||
if remaining != nil {
|
||||
if remaining != nil && *remaining > 0 {
|
||||
a.SetExhaustedCookie(w, r)
|
||||
return newR, true
|
||||
return true
|
||||
}
|
||||
deleteCookie = true
|
||||
}
|
||||
return newR, false
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *AccessInterceptor) SetExhaustedCookie(writer http.ResponseWriter, request *http.Request) {
|
||||
@@ -140,7 +133,7 @@ func (a *AccessInterceptor) handle(publicAuthPathPrefixes ...string) func(http.H
|
||||
ctx := request.Context()
|
||||
tracingCtx, checkSpan := tracing.NewNamedSpan(ctx, "checkAccessQuota")
|
||||
wrappedWriter := &statusRecorder{ResponseWriter: writer, status: 0}
|
||||
r, limited := a.Limit(wrappedWriter, request.WithContext(tracingCtx), publicAuthPathPrefixes...)
|
||||
limited := a.Limit(wrappedWriter, request.WithContext(tracingCtx), publicAuthPathPrefixes...)
|
||||
checkSpan.End()
|
||||
if limited {
|
||||
if a.redirect != "" {
|
||||
@@ -150,7 +143,7 @@ func (a *AccessInterceptor) handle(publicAuthPathPrefixes ...string) func(http.H
|
||||
http.Error(wrappedWriter, "Your ZITADEL instance is blocked.", http.StatusTooManyRequests)
|
||||
}
|
||||
} else {
|
||||
next.ServeHTTP(wrappedWriter, r)
|
||||
next.ServeHTTP(wrappedWriter, request)
|
||||
}
|
||||
a.writeLog(tracingCtx, wrappedWriter, writer, request, a.storeOnly)
|
||||
})
|
||||
|
@@ -1,3 +0,0 @@
|
||||
package limits
|
||||
|
||||
//go:generate mockgen -source limits.go -destination ./mock_limits/limits.go
|
@@ -1,60 +0,0 @@
|
||||
package limits
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/zerrors"
|
||||
)
|
||||
|
||||
type limitsKey struct{}
|
||||
|
||||
var key = (*limitsKey)(nil)
|
||||
|
||||
type Loader struct {
|
||||
querier Querier
|
||||
}
|
||||
|
||||
// NewLoader makes it easy to deduplicate multiple limit queries while handling a single request.
|
||||
// The returned limitsLoader itself is stateless.
|
||||
// Once the limits are loaded, they are attached to the context.
|
||||
// Within a contexts lifetime, the Loader also guarantees that even in error cases, the limits are only queried once.
|
||||
// Therefore, there won't be any circular calls as long as the passed context is a child of a previously passed context.
|
||||
func NewLoader(querier Querier) *Loader {
|
||||
return &Loader{querier}
|
||||
}
|
||||
|
||||
// Querier abstracts query.Queries.Limits to avoid circular dependencies.
|
||||
type Querier interface {
|
||||
Limits(ctx context.Context, resourceOwner string) (limits *Limits, err error)
|
||||
}
|
||||
|
||||
type Limits struct {
|
||||
AggregateID string
|
||||
CreationDate time.Time
|
||||
ChangeDate time.Time
|
||||
ResourceOwner string
|
||||
Sequence uint64
|
||||
|
||||
AuditLogRetention *time.Duration
|
||||
Block *bool
|
||||
}
|
||||
|
||||
// Load ensures that if limits are already attached to the context, they are not queried again.
|
||||
// Use the returned context for further calls to Load.
|
||||
func (l *Loader) Load(ctx context.Context, instanceID string) (context.Context, Limits) {
|
||||
ctxLimits, ok := ctx.Value(key).(*Limits)
|
||||
if ok {
|
||||
return ctx, *ctxLimits
|
||||
}
|
||||
queriedLimits, err := l.querier.Limits(ctx, instanceID)
|
||||
if err != nil && !zerrors.IsNotFound(err) {
|
||||
logging.WithFields("instance id", instanceID).OnError(err).Error("unable to load limits")
|
||||
}
|
||||
if queriedLimits == nil {
|
||||
queriedLimits = &Limits{}
|
||||
}
|
||||
return context.WithValue(ctx, key, queriedLimits), *queriedLimits
|
||||
}
|
@@ -1,76 +0,0 @@
|
||||
package limits_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/muhlemmer/gu"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/limits"
|
||||
"github.com/zitadel/zitadel/internal/api/limits/mock_limits"
|
||||
)
|
||||
|
||||
func TestLoader_Load(t *testing.T) {
|
||||
type fields struct {
|
||||
querier limits.Querier
|
||||
}
|
||||
type args struct {
|
||||
// If detachContext is false, the context returned from the last call to Load is passed with this call to Load.
|
||||
// If detachContext is true, a new context.Background() is passed with this call to Load.
|
||||
detachContext bool
|
||||
}
|
||||
minuteLimits := limits.Limits{AuditLogRetention: gu.Ptr(time.Minute), Block: gu.Ptr(true)}
|
||||
hourLimits := limits.Limits{AuditLogRetention: gu.Ptr(time.Hour), Block: gu.Ptr(false)}
|
||||
tests := []struct {
|
||||
name string
|
||||
// The load function is called for each entry in the returned args slice.
|
||||
input func(controller *gomock.Controller) (fields, []args)
|
||||
want []limits.Limits
|
||||
}{{
|
||||
name: "limits on the context are reused",
|
||||
input: func(controller *gomock.Controller) (fields, []args) {
|
||||
querier := mock_limits.NewMockQuerier(controller)
|
||||
querier.EXPECT().Limits(gomock.Any(), gomock.Any()).Return(&minuteLimits, nil)
|
||||
return fields{querier: querier}, []args{{detachContext: true}, {detachContext: false}}
|
||||
},
|
||||
want: []limits.Limits{minuteLimits, minuteLimits},
|
||||
}, {
|
||||
name: "limits are queried for each unrelated context",
|
||||
input: func(controller *gomock.Controller) (fields, []args) {
|
||||
querier := mock_limits.NewMockQuerier(controller)
|
||||
querier.EXPECT().Limits(gomock.Any(), gomock.Any()).Return(&minuteLimits, nil)
|
||||
querier.EXPECT().Limits(gomock.Any(), gomock.Any()).Return(&hourLimits, nil)
|
||||
return fields{querier: querier}, []args{{detachContext: true}, {detachContext: true}}
|
||||
},
|
||||
want: []limits.Limits{minuteLimits, hourLimits},
|
||||
}, {
|
||||
name: "limits are queried once per context even if the querier returns an error",
|
||||
input: func(controller *gomock.Controller) (fields, []args) {
|
||||
querier := mock_limits.NewMockQuerier(controller)
|
||||
querier.EXPECT().Limits(gomock.Any(), gomock.Any()).Return(nil, errors.New("error from querier"))
|
||||
return fields{querier: querier}, []args{{detachContext: true}, {detachContext: false}}
|
||||
},
|
||||
want: []limits.Limits{{}, {}},
|
||||
}}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
f, calls := tt.input(gomock.NewController(t))
|
||||
ll := limits.NewLoader(f.querier)
|
||||
var ctx context.Context
|
||||
for i, a := range calls {
|
||||
if a.detachContext {
|
||||
ctx = context.Background()
|
||||
}
|
||||
var l limits.Limits
|
||||
ctx, l = ll.Load(ctx, "instanceID")
|
||||
if !reflect.DeepEqual(l, tt.want[i]) {
|
||||
t.Errorf("Load() got = %v, want %v", l, tt.want[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@@ -1,55 +0,0 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: limits.go
|
||||
//
|
||||
// Generated by this command:
|
||||
//
|
||||
// mockgen -source limits.go -destination ./mock_limits/limits.go
|
||||
//
|
||||
// Package mock_limits is a generated GoMock package.
|
||||
package mock_limits
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
limits "github.com/zitadel/zitadel/internal/api/limits"
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
// MockQuerier is a mock of Querier interface.
|
||||
type MockQuerier struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockQuerierMockRecorder
|
||||
}
|
||||
|
||||
// MockQuerierMockRecorder is the mock recorder for MockQuerier.
|
||||
type MockQuerierMockRecorder struct {
|
||||
mock *MockQuerier
|
||||
}
|
||||
|
||||
// NewMockQuerier creates a new mock instance.
|
||||
func NewMockQuerier(ctrl *gomock.Controller) *MockQuerier {
|
||||
mock := &MockQuerier{ctrl: ctrl}
|
||||
mock.recorder = &MockQuerierMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockQuerier) EXPECT() *MockQuerierMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Limits mocks base method.
|
||||
func (m *MockQuerier) Limits(ctx context.Context, resourceOwner string) (*limits.Limits, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Limits", ctx, resourceOwner)
|
||||
ret0, _ := ret[0].(*limits.Limits)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Limits indicates an expected call of Limits.
|
||||
func (mr *MockQuerierMockRecorder) Limits(ctx, resourceOwner any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Limits", reflect.TypeOf((*MockQuerier)(nil).Limits), ctx, resourceOwner)
|
||||
}
|
@@ -116,7 +116,7 @@ func Start(config Config, externalSecure bool, issuer op.IssuerFromRequest, call
|
||||
http.Error(w, fmt.Sprintf("unable to template instance management url for console: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
r, limited := limitingAccessInterceptor.Limit(w, r)
|
||||
limited := limitingAccessInterceptor.Limit(w, r)
|
||||
environmentJSON, err := createEnvironmentJSON(url, issuer(r), instance.ConsoleClientID(), customerPortal, instanceMgmtURL, limited)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("unable to marshal env for console: %v", err), http.StatusInternalServerError)
|
||||
|
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/api/call"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/telemetry/tracing"
|
||||
"github.com/zitadel/zitadel/internal/zerrors"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
@@ -44,12 +43,8 @@ func (q *Queries) SearchEvents(ctx context.Context, query *eventstore.SearchQuer
|
||||
ctx, span := tracing.NewSpan(ctx)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
auditLogRetention := q.defaultAuditLogRetention
|
||||
ctx, instanceLimits := q.limitsLoader.Load(ctx, authz.GetInstance(ctx).InstanceID())
|
||||
if err != nil && !zerrors.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
if instanceLimits.AuditLogRetention != nil {
|
||||
auditLogRetention = *instanceLimits.AuditLogRetention
|
||||
if instanceAuditLogRetention := authz.GetInstance(ctx).AuditLogRetention(); instanceAuditLogRetention != nil {
|
||||
auditLogRetention = *instanceAuditLogRetention
|
||||
}
|
||||
if auditLogRetention != 0 {
|
||||
query = filterAuditLogRetention(ctx, auditLogRetention, query)
|
||||
|
@@ -69,6 +69,14 @@ var (
|
||||
name: projection.InstanceColumnDefaultLanguage,
|
||||
table: instanceTable,
|
||||
}
|
||||
InstanceColumnAuditLogRetention = Column{
|
||||
name: projection.InstanceColumnAuditLogRetention,
|
||||
table: instanceTable,
|
||||
}
|
||||
InstanceColumnBlock = Column{
|
||||
name: projection.InstanceColumnBlock,
|
||||
table: instanceTable,
|
||||
}
|
||||
)
|
||||
|
||||
type Instance struct {
|
||||
@@ -78,14 +86,16 @@ type Instance struct {
|
||||
Sequence uint64
|
||||
Name string
|
||||
|
||||
DefaultOrgID string
|
||||
IAMProjectID string
|
||||
ConsoleID string
|
||||
ConsoleAppID string
|
||||
DefaultLang language.Tag
|
||||
Domains []*InstanceDomain
|
||||
host string
|
||||
csp csp
|
||||
DefaultOrgID string
|
||||
IAMProjectID string
|
||||
ConsoleID string
|
||||
ConsoleAppID string
|
||||
DefaultLang language.Tag
|
||||
Domains []*InstanceDomain
|
||||
host string
|
||||
csp csp
|
||||
block *bool
|
||||
auditLogRetention *time.Duration
|
||||
}
|
||||
|
||||
type csp struct {
|
||||
@@ -137,6 +147,14 @@ func (i *Instance) SecurityPolicyAllowedOrigins() []string {
|
||||
return i.csp.allowedOrigins
|
||||
}
|
||||
|
||||
func (i *Instance) Block() *bool {
|
||||
return i.block
|
||||
}
|
||||
|
||||
func (i *Instance) AuditLogRetention() *time.Duration {
|
||||
return i.auditLogRetention
|
||||
}
|
||||
|
||||
type InstanceSearchQueries struct {
|
||||
SearchRequest
|
||||
Queries []SearchQuery
|
||||
@@ -256,12 +274,18 @@ func prepareInstanceQuery(ctx context.Context, db prepareDatabase, host string)
|
||||
InstanceColumnConsoleID.identifier(),
|
||||
InstanceColumnConsoleAppID.identifier(),
|
||||
InstanceColumnDefaultLanguage.identifier(),
|
||||
InstanceColumnAuditLogRetention.identifier(),
|
||||
InstanceColumnBlock.identifier(),
|
||||
).
|
||||
From(instanceTable.identifier() + db.Timetravel(call.Took(ctx))).
|
||||
PlaceholderFormat(sq.Dollar),
|
||||
func(row *sql.Row) (*Instance, error) {
|
||||
instance := &Instance{host: host}
|
||||
lang := ""
|
||||
var (
|
||||
instance = &Instance{host: host}
|
||||
lang = ""
|
||||
auditLogRetention database.NullDuration
|
||||
block sql.NullBool
|
||||
)
|
||||
err := row.Scan(
|
||||
&instance.ID,
|
||||
&instance.CreationDate,
|
||||
@@ -272,6 +296,8 @@ func prepareInstanceQuery(ctx context.Context, db prepareDatabase, host string)
|
||||
&instance.ConsoleID,
|
||||
&instance.ConsoleAppID,
|
||||
&lang,
|
||||
&auditLogRetention,
|
||||
&block,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
@@ -279,6 +305,12 @@ func prepareInstanceQuery(ctx context.Context, db prepareDatabase, host string)
|
||||
}
|
||||
return nil, zerrors.ThrowInternal(err, "QUERY-3j9sf", "Errors.Internal")
|
||||
}
|
||||
if auditLogRetention.Valid {
|
||||
instance.auditLogRetention = &auditLogRetention.Duration
|
||||
}
|
||||
if block.Valid {
|
||||
instance.block = &block.Bool
|
||||
}
|
||||
instance.DefaultLang = language.Make(lang)
|
||||
return instance, nil
|
||||
}
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package query
|
||||
|
||||
/*
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
@@ -46,11 +47,11 @@ var (
|
||||
table: limitSettingsTable,
|
||||
}
|
||||
LimitsColumnAuditLogRetention = Column{
|
||||
name: projection.LimitsColumnAuditLogRetention,
|
||||
name: projection.InstanceColumnAuditLogRetention,
|
||||
table: limitSettingsTable,
|
||||
}
|
||||
LimitsColumnBlock = Column{
|
||||
name: projection.LimitsColumnBlock,
|
||||
name: projection.InstanceColumnBlock,
|
||||
table: limitSettingsTable,
|
||||
}
|
||||
)
|
||||
@@ -117,3 +118,4 @@ func prepareLimitsQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuild
|
||||
return limits, nil
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
@@ -2,6 +2,7 @@ package projection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/zitadel/zitadel/internal/repository/limits"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
|
||||
@@ -13,16 +14,18 @@ import (
|
||||
const (
|
||||
InstanceProjectionTable = "projections.instances"
|
||||
|
||||
InstanceColumnID = "id"
|
||||
InstanceColumnName = "name"
|
||||
InstanceColumnChangeDate = "change_date"
|
||||
InstanceColumnCreationDate = "creation_date"
|
||||
InstanceColumnDefaultOrgID = "default_org_id"
|
||||
InstanceColumnProjectID = "iam_project_id"
|
||||
InstanceColumnConsoleID = "console_client_id"
|
||||
InstanceColumnConsoleAppID = "console_app_id"
|
||||
InstanceColumnSequence = "sequence"
|
||||
InstanceColumnDefaultLanguage = "default_language"
|
||||
InstanceColumnID = "id"
|
||||
InstanceColumnName = "name"
|
||||
InstanceColumnChangeDate = "change_date"
|
||||
InstanceColumnCreationDate = "creation_date"
|
||||
InstanceColumnDefaultOrgID = "default_org_id"
|
||||
InstanceColumnProjectID = "iam_project_id"
|
||||
InstanceColumnConsoleID = "console_client_id"
|
||||
InstanceColumnConsoleAppID = "console_app_id"
|
||||
InstanceColumnSequence = "sequence"
|
||||
InstanceColumnDefaultLanguage = "default_language"
|
||||
InstanceColumnAuditLogRetention = "audit_log_retention"
|
||||
InstanceColumnBlock = "block"
|
||||
)
|
||||
|
||||
type instanceProjection struct{}
|
||||
@@ -48,6 +51,8 @@ func (*instanceProjection) Init() *old_handler.Check {
|
||||
handler.NewColumn(InstanceColumnConsoleAppID, handler.ColumnTypeText, handler.Default("")),
|
||||
handler.NewColumn(InstanceColumnSequence, handler.ColumnTypeInt64),
|
||||
handler.NewColumn(InstanceColumnDefaultLanguage, handler.ColumnTypeText, handler.Default("")),
|
||||
handler.NewColumn(InstanceColumnAuditLogRetention, handler.ColumnTypeInterval, handler.Nullable()),
|
||||
handler.NewColumn(InstanceColumnBlock, handler.ColumnTypeBool, handler.Nullable()),
|
||||
},
|
||||
handler.NewPrimaryKey(InstanceColumnID),
|
||||
),
|
||||
@@ -89,6 +94,19 @@ func (p *instanceProjection) Reducers() []handler.AggregateReducer {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Aggregate: limits.AggregateType,
|
||||
EventReducers: []handler.EventReducer{
|
||||
{
|
||||
Event: limits.SetEventType,
|
||||
Reduce: p.reduceLimitsSet,
|
||||
},
|
||||
{
|
||||
Event: limits.ResetEventType,
|
||||
Reduce: p.reduceLimitsReset,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,3 +232,46 @@ func (p *instanceProjection) reduceDefaultLanguageSet(event eventstore.Event) (*
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *instanceProjection) reduceLimitsSet(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, err := assertEvent[*limits.SetEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
updateCols := []handler.Column{
|
||||
handler.NewCol(InstanceColumnChangeDate, e.CreationDate()),
|
||||
handler.NewCol(InstanceColumnSequence, e.Sequence()),
|
||||
}
|
||||
if e.AuditLogRetention != nil {
|
||||
updateCols = append(updateCols, handler.NewCol(InstanceColumnAuditLogRetention, *e.AuditLogRetention))
|
||||
}
|
||||
if e.Block != nil {
|
||||
updateCols = append(updateCols, handler.NewCol(InstanceColumnBlock, *e.Block))
|
||||
}
|
||||
return handler.NewUpdateStatement(
|
||||
e,
|
||||
updateCols,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(InstanceColumnID, e.Aggregate().InstanceID),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *instanceProjection) reduceLimitsReset(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, err := assertEvent[*limits.ResetEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return handler.NewUpdateStatement(
|
||||
e,
|
||||
[]handler.Column{
|
||||
handler.NewCol(InstanceColumnChangeDate, e.CreationDate()),
|
||||
handler.NewCol(InstanceColumnSequence, e.Sequence()),
|
||||
handler.NewCol(InstanceColumnAuditLogRetention, nil),
|
||||
handler.NewCol(InstanceColumnBlock, nil),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(InstanceColumnID, e.Aggregate().InstanceID),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
@@ -1,119 +0,0 @@
|
||||
package projection
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
|
||||
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
||||
"github.com/zitadel/zitadel/internal/repository/instance"
|
||||
"github.com/zitadel/zitadel/internal/repository/limits"
|
||||
)
|
||||
|
||||
const (
|
||||
LimitsProjectionTable = "projections.limits2"
|
||||
|
||||
LimitsColumnAggregateID = "aggregate_id"
|
||||
LimitsColumnCreationDate = "creation_date"
|
||||
LimitsColumnChangeDate = "change_date"
|
||||
LimitsColumnResourceOwner = "resource_owner"
|
||||
LimitsColumnInstanceID = "instance_id"
|
||||
LimitsColumnSequence = "sequence"
|
||||
|
||||
LimitsColumnAuditLogRetention = "audit_log_retention"
|
||||
LimitsColumnBlock = "block"
|
||||
)
|
||||
|
||||
type limitsProjection struct{}
|
||||
|
||||
func newLimitsProjection(ctx context.Context, config handler.Config) *handler.Handler {
|
||||
return handler.NewHandler(ctx, &config, &limitsProjection{})
|
||||
}
|
||||
|
||||
func (*limitsProjection) Name() string {
|
||||
return LimitsProjectionTable
|
||||
}
|
||||
|
||||
func (*limitsProjection) Init() *old_handler.Check {
|
||||
return handler.NewTableCheck(
|
||||
handler.NewTable([]*handler.InitColumn{
|
||||
handler.NewColumn(LimitsColumnAggregateID, handler.ColumnTypeText),
|
||||
handler.NewColumn(LimitsColumnCreationDate, handler.ColumnTypeTimestamp),
|
||||
handler.NewColumn(LimitsColumnChangeDate, handler.ColumnTypeTimestamp),
|
||||
handler.NewColumn(LimitsColumnResourceOwner, handler.ColumnTypeText),
|
||||
handler.NewColumn(LimitsColumnInstanceID, handler.ColumnTypeText),
|
||||
handler.NewColumn(LimitsColumnSequence, handler.ColumnTypeInt64),
|
||||
handler.NewColumn(LimitsColumnAuditLogRetention, handler.ColumnTypeInterval, handler.Nullable()),
|
||||
handler.NewColumn(LimitsColumnBlock, handler.ColumnTypeBool, handler.Nullable()),
|
||||
},
|
||||
handler.NewPrimaryKey(LimitsColumnInstanceID, LimitsColumnResourceOwner),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *limitsProjection) Reducers() []handler.AggregateReducer {
|
||||
return []handler.AggregateReducer{
|
||||
{
|
||||
Aggregate: limits.AggregateType,
|
||||
EventReducers: []handler.EventReducer{
|
||||
{
|
||||
Event: limits.SetEventType,
|
||||
Reduce: p.reduceLimitsSet,
|
||||
},
|
||||
{
|
||||
Event: limits.ResetEventType,
|
||||
Reduce: p.reduceLimitsReset,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Aggregate: instance.AggregateType,
|
||||
EventReducers: []handler.EventReducer{
|
||||
{
|
||||
Event: instance.InstanceRemovedEventType,
|
||||
Reduce: reduceInstanceRemovedHelper(LimitsColumnInstanceID),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *limitsProjection) reduceLimitsSet(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, err := assertEvent[*limits.SetEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conflictCols := []handler.Column{
|
||||
handler.NewCol(LimitsColumnInstanceID, e.Aggregate().InstanceID),
|
||||
handler.NewCol(LimitsColumnResourceOwner, e.Aggregate().ResourceOwner),
|
||||
}
|
||||
updateCols := []handler.Column{
|
||||
handler.NewCol(LimitsColumnInstanceID, e.Aggregate().InstanceID),
|
||||
handler.NewCol(LimitsColumnResourceOwner, e.Aggregate().ResourceOwner),
|
||||
handler.NewCol(LimitsColumnCreationDate, e.CreationDate()),
|
||||
handler.NewCol(LimitsColumnChangeDate, e.CreationDate()),
|
||||
handler.NewCol(LimitsColumnSequence, e.Sequence()),
|
||||
handler.NewCol(LimitsColumnAggregateID, e.Aggregate().ID),
|
||||
}
|
||||
if e.AuditLogRetention != nil {
|
||||
updateCols = append(updateCols, handler.NewCol(LimitsColumnAuditLogRetention, *e.AuditLogRetention))
|
||||
}
|
||||
if e.Block != nil {
|
||||
updateCols = append(updateCols, handler.NewCol(LimitsColumnBlock, *e.Block))
|
||||
}
|
||||
return handler.NewUpsertStatement(e, conflictCols, updateCols), nil
|
||||
}
|
||||
|
||||
func (p *limitsProjection) reduceLimitsReset(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, err := assertEvent[*limits.ResetEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return handler.NewDeleteStatement(
|
||||
e,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(LimitsColumnInstanceID, e.Aggregate().InstanceID),
|
||||
handler.NewCond(LimitsColumnResourceOwner, e.Aggregate().ResourceOwner),
|
||||
},
|
||||
), nil
|
||||
}
|
@@ -1,5 +1,6 @@
|
||||
package projection
|
||||
|
||||
/*
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
@@ -194,3 +195,5 @@ func TestLimitsProjection_reduces(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
@@ -143,7 +143,6 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore,
|
||||
AuthRequestProjection = newAuthRequestProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["auth_requests"]))
|
||||
MilestoneProjection = newMilestoneProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["milestones"]), systemUsers)
|
||||
QuotaProjection = newQuotaProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["quotas"]))
|
||||
LimitsProjection = newLimitsProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["limits"]))
|
||||
RestrictionsProjection = newRestrictionsProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["restrictions"]))
|
||||
newProjectionsList()
|
||||
return nil
|
||||
|
@@ -11,7 +11,6 @@ import (
|
||||
"golang.org/x/text/language"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
limits_loader "github.com/zitadel/zitadel/internal/api/limits"
|
||||
sd "github.com/zitadel/zitadel/internal/config/systemdefaults"
|
||||
"github.com/zitadel/zitadel/internal/crypto"
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
@@ -54,7 +53,6 @@ type Queries struct {
|
||||
zitadelRoles []authz.RoleMapping
|
||||
multifactors domain.MultifactorConfigs
|
||||
defaultAuditLogRetention time.Duration
|
||||
limitsLoader *limits_loader.Loader
|
||||
}
|
||||
|
||||
func StartQueries(
|
||||
|
Reference in New Issue
Block a user