mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 18:17:35 +00:00
perf: project quotas and usages (#6441)
* project quota added
* project quota removed
* add periods table
* make log record generic
* accumulate usage
* query usage
* count action run seconds
* fix filter in ReportQuotaUsage
* fix existing tests
* fix logstore tests
* fix typo
* fix: add quota unit tests command side
* fix: add quota unit tests command side
* fix: add quota unit tests command side
* move notifications into debouncer and improve limit querying
* cleanup
* comment
* fix: add quota unit tests command side
* fix remaining quota usage query
* implement InmemLogStorage
* cleanup and linting
* improve test
* fix: add quota unit tests command side
* fix: add quota unit tests command side
* fix: add quota unit tests command side
* fix: add quota unit tests command side
* action notifications and fixes for notifications query
* revert console prefix
* fix: add quota unit tests command side
* fix: add quota integration tests
* improve accountable requests
* improve accountable requests
* fix: add quota integration tests
* fix: add quota integration tests
* fix: add quota integration tests
* comment
* remove ability to store logs in db and other changes requested from review
* changes requested from review
* changes requested from review
* Update internal/api/http/middleware/access_interceptor.go
Co-authored-by: Silvan <silvan.reusser@gmail.com>
* tests: fix quotas integration tests
* improve incrementUsageStatement
* linting
* fix: delete e2e tests as intergation tests cover functionality
* Update internal/api/http/middleware/access_interceptor.go
Co-authored-by: Silvan <silvan.reusser@gmail.com>
* backup
* fix conflict
* create rc
* create prerelease
* remove issue release labeling
* fix tracing
---------
Co-authored-by: Livio Spring <livio.a@gmail.com>
Co-authored-by: Stefan Benz <stefan@caos.ch>
Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
(cherry picked from commit 1a49b7d298
)
This commit is contained in:

committed by
Livio Spring

parent
b688d6f842
commit
5823fdbef9
@@ -6,6 +6,9 @@ type Configs struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Database *EmitterConfig
|
||||
Stdout *EmitterConfig
|
||||
Stdout *StdConfig
|
||||
}
|
||||
|
||||
type StdConfig struct {
|
||||
Enabled bool
|
||||
}
|
||||
|
@@ -9,19 +9,17 @@ import (
|
||||
"github.com/zitadel/logging"
|
||||
)
|
||||
|
||||
type bulkSink interface {
|
||||
sendBulk(ctx context.Context, bulk []LogRecord) error
|
||||
type bulkSink[T LogRecord[T]] interface {
|
||||
SendBulk(ctx context.Context, bulk []T) error
|
||||
}
|
||||
|
||||
var _ bulkSink = bulkSinkFunc(nil)
|
||||
type bulkSinkFunc[T LogRecord[T]] func(ctx context.Context, bulk []T) error
|
||||
|
||||
type bulkSinkFunc func(ctx context.Context, items []LogRecord) error
|
||||
|
||||
func (s bulkSinkFunc) sendBulk(ctx context.Context, items []LogRecord) error {
|
||||
return s(ctx, items)
|
||||
func (s bulkSinkFunc[T]) SendBulk(ctx context.Context, bulk []T) error {
|
||||
return s(ctx, bulk)
|
||||
}
|
||||
|
||||
type debouncer struct {
|
||||
type debouncer[T LogRecord[T]] struct {
|
||||
// Storing context.Context in a struct is generally bad practice
|
||||
// https://go.dev/blog/context-and-structs
|
||||
// However, debouncer starts a go routine that triggers side effects itself.
|
||||
@@ -33,8 +31,8 @@ type debouncer struct {
|
||||
ticker *clock.Ticker
|
||||
mux sync.Mutex
|
||||
cfg DebouncerConfig
|
||||
storage bulkSink
|
||||
cache []LogRecord
|
||||
storage bulkSink[T]
|
||||
cache []T
|
||||
cacheLen uint
|
||||
}
|
||||
|
||||
@@ -43,8 +41,8 @@ type DebouncerConfig struct {
|
||||
MaxBulkSize uint
|
||||
}
|
||||
|
||||
func newDebouncer(binarySignaledCtx context.Context, cfg DebouncerConfig, clock clock.Clock, ship bulkSink) *debouncer {
|
||||
a := &debouncer{
|
||||
func newDebouncer[T LogRecord[T]](binarySignaledCtx context.Context, cfg DebouncerConfig, clock clock.Clock, ship bulkSink[T]) *debouncer[T] {
|
||||
a := &debouncer[T]{
|
||||
binarySignaledCtx: binarySignaledCtx,
|
||||
clock: clock,
|
||||
cfg: cfg,
|
||||
@@ -58,7 +56,7 @@ func newDebouncer(binarySignaledCtx context.Context, cfg DebouncerConfig, clock
|
||||
return a
|
||||
}
|
||||
|
||||
func (d *debouncer) add(item LogRecord) {
|
||||
func (d *debouncer[T]) add(item T) {
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.cache = append(d.cache, item)
|
||||
@@ -69,13 +67,13 @@ func (d *debouncer) add(item LogRecord) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *debouncer) ship() {
|
||||
func (d *debouncer[T]) ship() {
|
||||
if d.cacheLen == 0 {
|
||||
return
|
||||
}
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
if err := d.storage.sendBulk(d.binarySignaledCtx, d.cache); err != nil {
|
||||
if err := d.storage.SendBulk(d.binarySignaledCtx, d.cache); err != nil {
|
||||
logging.WithError(err).WithField("size", len(d.cache)).Error("storing bulk failed")
|
||||
}
|
||||
d.cache = nil
|
||||
@@ -85,7 +83,7 @@ func (d *debouncer) ship() {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *debouncer) shipOnTicks() {
|
||||
func (d *debouncer[T]) shipOnTicks() {
|
||||
for range d.ticker.C {
|
||||
d.ship()
|
||||
}
|
||||
|
@@ -2,56 +2,52 @@ package logstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/zitadel/logging"
|
||||
)
|
||||
|
||||
type EmitterConfig struct {
|
||||
Enabled bool
|
||||
Keep time.Duration
|
||||
CleanupInterval time.Duration
|
||||
Debounce *DebouncerConfig
|
||||
Enabled bool
|
||||
Debounce *DebouncerConfig
|
||||
}
|
||||
|
||||
type emitter struct {
|
||||
type emitter[T LogRecord[T]] struct {
|
||||
enabled bool
|
||||
ctx context.Context
|
||||
debouncer *debouncer
|
||||
emitter LogEmitter
|
||||
debouncer *debouncer[T]
|
||||
emitter LogEmitter[T]
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
type LogRecord interface {
|
||||
Normalize() LogRecord
|
||||
type LogRecord[T any] interface {
|
||||
Normalize() T
|
||||
}
|
||||
|
||||
type LogRecordFunc func() LogRecord
|
||||
type LogRecordFunc[T any] func() T
|
||||
|
||||
func (r LogRecordFunc) Normalize() LogRecord {
|
||||
func (r LogRecordFunc[T]) Normalize() T {
|
||||
return r()
|
||||
}
|
||||
|
||||
type LogEmitter interface {
|
||||
Emit(ctx context.Context, bulk []LogRecord) error
|
||||
type LogEmitter[T LogRecord[T]] interface {
|
||||
Emit(ctx context.Context, bulk []T) error
|
||||
}
|
||||
|
||||
type LogEmitterFunc func(ctx context.Context, bulk []LogRecord) error
|
||||
type LogEmitterFunc[T LogRecord[T]] func(ctx context.Context, bulk []T) error
|
||||
|
||||
func (l LogEmitterFunc) Emit(ctx context.Context, bulk []LogRecord) error {
|
||||
func (l LogEmitterFunc[T]) Emit(ctx context.Context, bulk []T) error {
|
||||
return l(ctx, bulk)
|
||||
}
|
||||
|
||||
type LogCleanupper interface {
|
||||
LogEmitter
|
||||
type LogCleanupper[T LogRecord[T]] interface {
|
||||
Cleanup(ctx context.Context, keep time.Duration) error
|
||||
LogEmitter[T]
|
||||
}
|
||||
|
||||
// NewEmitter accepts Clock from github.com/benbjohnson/clock so we can control timers and tickers in the unit tests
|
||||
func NewEmitter(ctx context.Context, clock clock.Clock, cfg *EmitterConfig, logger LogEmitter) (*emitter, error) {
|
||||
svc := &emitter{
|
||||
func NewEmitter[T LogRecord[T]](ctx context.Context, clock clock.Clock, cfg *EmitterConfig, logger LogEmitter[T]) (*emitter[T], error) {
|
||||
svc := &emitter[T]{
|
||||
enabled: cfg != nil && cfg.Enabled,
|
||||
ctx: ctx,
|
||||
emitter: logger,
|
||||
@@ -63,36 +59,12 @@ func NewEmitter(ctx context.Context, clock clock.Clock, cfg *EmitterConfig, logg
|
||||
}
|
||||
|
||||
if cfg.Debounce != nil && (cfg.Debounce.MinFrequency > 0 || cfg.Debounce.MaxBulkSize > 0) {
|
||||
svc.debouncer = newDebouncer(ctx, *cfg.Debounce, clock, newStorageBulkSink(svc.emitter))
|
||||
}
|
||||
|
||||
cleanupper, ok := logger.(LogCleanupper)
|
||||
if !ok {
|
||||
if cfg.Keep != 0 {
|
||||
return nil, fmt.Errorf("cleaning up for this storage type is not supported, so keep duration must be 0, but is %d", cfg.Keep)
|
||||
}
|
||||
if cfg.CleanupInterval != 0 {
|
||||
return nil, fmt.Errorf("cleaning up for this storage type is not supported, so cleanup interval duration must be 0, but is %d", cfg.Keep)
|
||||
}
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
if cfg.Keep != 0 && cfg.CleanupInterval != 0 {
|
||||
go svc.startCleanupping(cleanupper, cfg.CleanupInterval, cfg.Keep)
|
||||
svc.debouncer = newDebouncer[T](ctx, *cfg.Debounce, clock, newStorageBulkSink(svc.emitter))
|
||||
}
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
func (s *emitter) startCleanupping(cleanupper LogCleanupper, cleanupInterval, keep time.Duration) {
|
||||
for range s.clock.Tick(cleanupInterval) {
|
||||
if err := cleanupper.Cleanup(s.ctx, keep); err != nil {
|
||||
logging.WithError(err).Error("cleaning up logs failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *emitter) Emit(ctx context.Context, record LogRecord) (err error) {
|
||||
func (s *emitter[T]) Emit(ctx context.Context, record T) (err error) {
|
||||
if !s.enabled {
|
||||
return nil
|
||||
}
|
||||
@@ -102,11 +74,11 @@ func (s *emitter) Emit(ctx context.Context, record LogRecord) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.emitter.Emit(ctx, []LogRecord{record})
|
||||
return s.emitter.Emit(ctx, []T{record})
|
||||
}
|
||||
|
||||
func newStorageBulkSink(emitter LogEmitter) bulkSinkFunc {
|
||||
return func(ctx context.Context, bulk []LogRecord) error {
|
||||
func newStorageBulkSink[T LogRecord[T]](emitter LogEmitter[T]) bulkSinkFunc[T] {
|
||||
return func(ctx context.Context, bulk []T) error {
|
||||
return emitter.Emit(ctx, bulk)
|
||||
}
|
||||
}
|
||||
|
@@ -3,167 +3,91 @@ package access
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/zitadel/logging"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/call"
|
||||
zitadel_http "github.com/zitadel/zitadel/internal/api/http"
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/command"
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
caos_errors "github.com/zitadel/zitadel/internal/errors"
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/logstore/record"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/query/projection"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
"github.com/zitadel/zitadel/internal/telemetry/tracing"
|
||||
)
|
||||
|
||||
const (
|
||||
accessLogsTable = "logstore.access"
|
||||
accessTimestampCol = "log_date"
|
||||
accessProtocolCol = "protocol"
|
||||
accessRequestURLCol = "request_url"
|
||||
accessResponseStatusCol = "response_status"
|
||||
accessRequestHeadersCol = "request_headers"
|
||||
accessResponseHeadersCol = "response_headers"
|
||||
accessInstanceIdCol = "instance_id"
|
||||
accessProjectIdCol = "project_id"
|
||||
accessRequestedDomainCol = "requested_domain"
|
||||
accessRequestedHostCol = "requested_host"
|
||||
)
|
||||
|
||||
var _ logstore.UsageQuerier = (*databaseLogStorage)(nil)
|
||||
var _ logstore.LogCleanupper = (*databaseLogStorage)(nil)
|
||||
var _ logstore.UsageStorer[*record.AccessLog] = (*databaseLogStorage)(nil)
|
||||
|
||||
type databaseLogStorage struct {
|
||||
dbClient *database.DB
|
||||
commands *command.Commands
|
||||
queries *query.Queries
|
||||
}
|
||||
|
||||
func NewDatabaseLogStorage(dbClient *database.DB) *databaseLogStorage {
|
||||
return &databaseLogStorage{dbClient: dbClient}
|
||||
func NewDatabaseLogStorage(dbClient *database.DB, commands *command.Commands, queries *query.Queries) *databaseLogStorage {
|
||||
return &databaseLogStorage{dbClient: dbClient, commands: commands, queries: queries}
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) QuotaUnit() quota.Unit {
|
||||
return quota.RequestsAllAuthenticated
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []logstore.LogRecord) error {
|
||||
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []*record.AccessLog) error {
|
||||
if len(bulk) == 0 {
|
||||
return nil
|
||||
}
|
||||
builder := squirrel.Insert(accessLogsTable).
|
||||
Columns(
|
||||
accessTimestampCol,
|
||||
accessProtocolCol,
|
||||
accessRequestURLCol,
|
||||
accessResponseStatusCol,
|
||||
accessRequestHeadersCol,
|
||||
accessResponseHeadersCol,
|
||||
accessInstanceIdCol,
|
||||
accessProjectIdCol,
|
||||
accessRequestedDomainCol,
|
||||
accessRequestedHostCol,
|
||||
).
|
||||
PlaceholderFormat(squirrel.Dollar)
|
||||
|
||||
for idx := range bulk {
|
||||
item := bulk[idx].(*Record)
|
||||
builder = builder.Values(
|
||||
item.LogDate,
|
||||
item.Protocol,
|
||||
item.RequestURL,
|
||||
item.ResponseStatus,
|
||||
item.RequestHeaders,
|
||||
item.ResponseHeaders,
|
||||
item.InstanceID,
|
||||
item.ProjectID,
|
||||
item.RequestedDomain,
|
||||
item.RequestedHost,
|
||||
)
|
||||
}
|
||||
|
||||
stmt, args, err := builder.ToSql()
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "ACCESS-KOS7I", "Errors.Internal")
|
||||
}
|
||||
|
||||
result, err := l.dbClient.ExecContext(ctx, stmt, args...)
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "ACCESS-alnT9", "Errors.Access.StorageFailed")
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "ACCESS-7KIpL", "Errors.Internal")
|
||||
}
|
||||
|
||||
logging.WithFields("rows", rows).Debug("successfully stored access logs")
|
||||
return nil
|
||||
return l.incrementUsage(ctx, bulk)
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) QueryUsage(ctx context.Context, instanceId string, start time.Time) (uint64, error) {
|
||||
stmt, args, err := squirrel.Select(
|
||||
fmt.Sprintf("count(%s)", accessInstanceIdCol),
|
||||
).
|
||||
From(accessLogsTable + l.dbClient.Timetravel(call.Took(ctx))).
|
||||
Where(squirrel.And{
|
||||
squirrel.Eq{accessInstanceIdCol: instanceId},
|
||||
squirrel.GtOrEq{accessTimestampCol: start},
|
||||
squirrel.Expr(fmt.Sprintf(`%s #>> '{%s,0}' = '[REDACTED]'`, accessRequestHeadersCol, strings.ToLower(zitadel_http.Authorization))),
|
||||
squirrel.NotLike{accessRequestURLCol: "%/zitadel.system.v1.SystemService/%"},
|
||||
squirrel.NotLike{accessRequestURLCol: "%/system/v1/%"},
|
||||
squirrel.Or{
|
||||
squirrel.And{
|
||||
squirrel.Eq{accessProtocolCol: HTTP},
|
||||
squirrel.NotEq{accessResponseStatusCol: http.StatusForbidden},
|
||||
squirrel.NotEq{accessResponseStatusCol: http.StatusInternalServerError},
|
||||
squirrel.NotEq{accessResponseStatusCol: http.StatusTooManyRequests},
|
||||
},
|
||||
squirrel.And{
|
||||
squirrel.Eq{accessProtocolCol: GRPC},
|
||||
squirrel.NotEq{accessResponseStatusCol: codes.PermissionDenied},
|
||||
squirrel.NotEq{accessResponseStatusCol: codes.Internal},
|
||||
squirrel.NotEq{accessResponseStatusCol: codes.ResourceExhausted},
|
||||
},
|
||||
},
|
||||
}).
|
||||
PlaceholderFormat(squirrel.Dollar).
|
||||
ToSql()
|
||||
func (l *databaseLogStorage) incrementUsage(ctx context.Context, bulk []*record.AccessLog) (err error) {
|
||||
ctx, span := tracing.NewSpan(ctx)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
|
||||
if err != nil {
|
||||
return 0, caos_errors.ThrowInternal(err, "ACCESS-V9Sde", "Errors.Internal")
|
||||
byInstance := make(map[string][]*record.AccessLog)
|
||||
for _, r := range bulk {
|
||||
if r.InstanceID != "" {
|
||||
byInstance[r.InstanceID] = append(byInstance[r.InstanceID], r)
|
||||
}
|
||||
}
|
||||
|
||||
var count uint64
|
||||
err = l.dbClient.
|
||||
QueryRowContext(ctx,
|
||||
func(row *sql.Row) error {
|
||||
return row.Scan(&count)
|
||||
},
|
||||
stmt, args...,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return 0, caos_errors.ThrowInternal(err, "ACCESS-pBPrM", "Errors.Logstore.Access.ScanFailed")
|
||||
for instanceID, instanceBulk := range byInstance {
|
||||
q, getQuotaErr := l.queries.GetQuota(ctx, instanceID, quota.RequestsAllAuthenticated)
|
||||
if errors.Is(getQuotaErr, sql.ErrNoRows) {
|
||||
continue
|
||||
}
|
||||
err = errors.Join(err, getQuotaErr)
|
||||
if getQuotaErr != nil {
|
||||
continue
|
||||
}
|
||||
sum, incrementErr := l.incrementUsageFromAccessLogs(ctx, instanceID, q.CurrentPeriodStart, instanceBulk)
|
||||
err = errors.Join(err, incrementErr)
|
||||
if incrementErr != nil {
|
||||
continue
|
||||
}
|
||||
notifications, getNotificationErr := l.queries.GetDueQuotaNotifications(ctx, instanceID, quota.RequestsAllAuthenticated, q, q.CurrentPeriodStart, sum)
|
||||
err = errors.Join(err, getNotificationErr)
|
||||
if getNotificationErr != nil || len(notifications) == 0 {
|
||||
continue
|
||||
}
|
||||
ctx = authz.WithInstanceID(ctx, instanceID)
|
||||
reportErr := l.commands.ReportQuotaUsage(ctx, notifications)
|
||||
err = errors.Join(err, reportErr)
|
||||
if reportErr != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) Cleanup(ctx context.Context, keep time.Duration) error {
|
||||
stmt, args, err := squirrel.Delete(accessLogsTable).
|
||||
Where(squirrel.LtOrEq{accessTimestampCol: time.Now().Add(-keep)}).
|
||||
PlaceholderFormat(squirrel.Dollar).
|
||||
ToSql()
|
||||
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "ACCESS-2oTh6", "Errors.Internal")
|
||||
}
|
||||
|
||||
execCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
_, err = l.dbClient.ExecContext(execCtx, stmt, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) incrementUsageFromAccessLogs(ctx context.Context, instanceID string, periodStart time.Time, records []*record.AccessLog) (sum uint64, err error) {
|
||||
ctx, span := tracing.NewSpan(ctx)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
|
||||
var count uint64
|
||||
for _, r := range records {
|
||||
if r.IsAuthenticated() {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return projection.QuotaProjection.IncrementUsage(ctx, quota.RequestsAllAuthenticated, instanceID, periodStart, count)
|
||||
}
|
||||
|
@@ -1,97 +0,0 @@
|
||||
package access
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
zitadel_http "github.com/zitadel/zitadel/internal/api/http"
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
)
|
||||
|
||||
var _ logstore.LogRecord = (*Record)(nil)
|
||||
|
||||
type Record struct {
|
||||
LogDate time.Time `json:"logDate"`
|
||||
Protocol Protocol `json:"protocol"`
|
||||
RequestURL string `json:"requestUrl"`
|
||||
ResponseStatus uint32 `json:"responseStatus"`
|
||||
// RequestHeaders are plain maps so varying implementation
|
||||
// between HTTP and gRPC don't interfere with each other
|
||||
RequestHeaders map[string][]string `json:"requestHeaders"`
|
||||
// ResponseHeaders are plain maps so varying implementation
|
||||
// between HTTP and gRPC don't interfere with each other
|
||||
ResponseHeaders map[string][]string `json:"responseHeaders"`
|
||||
InstanceID string `json:"instanceId"`
|
||||
ProjectID string `json:"projectId"`
|
||||
RequestedDomain string `json:"requestedDomain"`
|
||||
RequestedHost string `json:"requestedHost"`
|
||||
}
|
||||
|
||||
type Protocol uint8
|
||||
|
||||
const (
|
||||
GRPC Protocol = iota
|
||||
HTTP
|
||||
|
||||
redacted = "[REDACTED]"
|
||||
)
|
||||
|
||||
func (a Record) Normalize() logstore.LogRecord {
|
||||
a.RequestedDomain = cutString(a.RequestedDomain, 200)
|
||||
a.RequestURL = cutString(a.RequestURL, 200)
|
||||
a.RequestHeaders = normalizeHeaders(a.RequestHeaders, strings.ToLower(zitadel_http.Authorization), "grpcgateway-authorization", "cookie", "grpcgateway-cookie")
|
||||
a.ResponseHeaders = normalizeHeaders(a.ResponseHeaders, "set-cookie")
|
||||
return &a
|
||||
}
|
||||
|
||||
// normalizeHeaders lowers all header keys and redacts secrets
|
||||
func normalizeHeaders(header map[string][]string, redactKeysLower ...string) map[string][]string {
|
||||
return pruneKeys(redactKeys(lowerKeys(header), redactKeysLower...))
|
||||
}
|
||||
|
||||
func lowerKeys(header map[string][]string) map[string][]string {
|
||||
lower := make(map[string][]string, len(header))
|
||||
for k, v := range header {
|
||||
lower[strings.ToLower(k)] = v
|
||||
}
|
||||
return lower
|
||||
}
|
||||
|
||||
func redactKeys(header map[string][]string, redactKeysLower ...string) map[string][]string {
|
||||
redactedKeys := make(map[string][]string, len(header))
|
||||
for k, v := range header {
|
||||
redactedKeys[k] = v
|
||||
}
|
||||
for _, redactKey := range redactKeysLower {
|
||||
if _, ok := redactedKeys[redactKey]; ok {
|
||||
redactedKeys[redactKey] = []string{redacted}
|
||||
}
|
||||
}
|
||||
return redactedKeys
|
||||
}
|
||||
|
||||
const maxValuesPerKey = 10
|
||||
|
||||
func pruneKeys(header map[string][]string) map[string][]string {
|
||||
prunedKeys := make(map[string][]string, len(header))
|
||||
for key, value := range header {
|
||||
valueItems := make([]string, 0, maxValuesPerKey)
|
||||
for i, valueItem := range value {
|
||||
// Max 10 header values per key
|
||||
if i > maxValuesPerKey {
|
||||
break
|
||||
}
|
||||
// Max 200 value length
|
||||
valueItems = append(valueItems, cutString(valueItem, 200))
|
||||
}
|
||||
prunedKeys[key] = valueItems
|
||||
}
|
||||
return prunedKeys
|
||||
}
|
||||
|
||||
func cutString(str string, pos int) string {
|
||||
if len(str) <= pos {
|
||||
return str
|
||||
}
|
||||
return str[:pos-1]
|
||||
}
|
@@ -3,142 +3,84 @@ package execution
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"errors"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/call"
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/command"
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
caos_errors "github.com/zitadel/zitadel/internal/errors"
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/logstore/record"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/query/projection"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
)
|
||||
|
||||
const (
|
||||
executionLogsTable = "logstore.execution"
|
||||
executionTimestampCol = "log_date"
|
||||
executionTookCol = "took"
|
||||
executionMessageCol = "message"
|
||||
executionLogLevelCol = "loglevel"
|
||||
executionInstanceIdCol = "instance_id"
|
||||
executionActionIdCol = "action_id"
|
||||
executionMetadataCol = "metadata"
|
||||
)
|
||||
|
||||
var _ logstore.UsageQuerier = (*databaseLogStorage)(nil)
|
||||
var _ logstore.LogCleanupper = (*databaseLogStorage)(nil)
|
||||
var _ logstore.UsageStorer[*record.ExecutionLog] = (*databaseLogStorage)(nil)
|
||||
|
||||
type databaseLogStorage struct {
|
||||
dbClient *database.DB
|
||||
commands *command.Commands
|
||||
queries *query.Queries
|
||||
}
|
||||
|
||||
func NewDatabaseLogStorage(dbClient *database.DB) *databaseLogStorage {
|
||||
return &databaseLogStorage{dbClient: dbClient}
|
||||
func NewDatabaseLogStorage(dbClient *database.DB, commands *command.Commands, queries *query.Queries) *databaseLogStorage {
|
||||
return &databaseLogStorage{dbClient: dbClient, commands: commands, queries: queries}
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) QuotaUnit() quota.Unit {
|
||||
return quota.ActionsAllRunsSeconds
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []logstore.LogRecord) error {
|
||||
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []*record.ExecutionLog) error {
|
||||
if len(bulk) == 0 {
|
||||
return nil
|
||||
}
|
||||
builder := squirrel.Insert(executionLogsTable).
|
||||
Columns(
|
||||
executionTimestampCol,
|
||||
executionTookCol,
|
||||
executionMessageCol,
|
||||
executionLogLevelCol,
|
||||
executionInstanceIdCol,
|
||||
executionActionIdCol,
|
||||
executionMetadataCol,
|
||||
).
|
||||
PlaceholderFormat(squirrel.Dollar)
|
||||
return l.incrementUsage(ctx, bulk)
|
||||
}
|
||||
|
||||
for idx := range bulk {
|
||||
item := bulk[idx].(*Record)
|
||||
|
||||
var took interface{}
|
||||
if item.Took > 0 {
|
||||
took = item.Took
|
||||
func (l *databaseLogStorage) incrementUsage(ctx context.Context, bulk []*record.ExecutionLog) (err error) {
|
||||
byInstance := make(map[string][]*record.ExecutionLog)
|
||||
for _, r := range bulk {
|
||||
if r.InstanceID != "" {
|
||||
byInstance[r.InstanceID] = append(byInstance[r.InstanceID], r)
|
||||
}
|
||||
}
|
||||
for instanceID, instanceBulk := range byInstance {
|
||||
q, getQuotaErr := l.queries.GetQuota(ctx, instanceID, quota.ActionsAllRunsSeconds)
|
||||
if errors.Is(getQuotaErr, sql.ErrNoRows) {
|
||||
continue
|
||||
}
|
||||
err = errors.Join(err, getQuotaErr)
|
||||
if getQuotaErr != nil {
|
||||
continue
|
||||
}
|
||||
sum, incrementErr := l.incrementUsageFromExecutionLogs(ctx, instanceID, q.CurrentPeriodStart, instanceBulk)
|
||||
err = errors.Join(err, incrementErr)
|
||||
if incrementErr != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
builder = builder.Values(
|
||||
item.LogDate,
|
||||
took,
|
||||
item.Message,
|
||||
item.LogLevel,
|
||||
item.InstanceID,
|
||||
item.ActionID,
|
||||
item.Metadata,
|
||||
)
|
||||
notifications, getNotificationErr := l.queries.GetDueQuotaNotifications(ctx, instanceID, quota.ActionsAllRunsSeconds, q, q.CurrentPeriodStart, sum)
|
||||
err = errors.Join(err, getNotificationErr)
|
||||
if getNotificationErr != nil || len(notifications) == 0 {
|
||||
continue
|
||||
}
|
||||
ctx = authz.WithInstanceID(ctx, instanceID)
|
||||
reportErr := l.commands.ReportQuotaUsage(ctx, notifications)
|
||||
err = errors.Join(err, reportErr)
|
||||
if reportErr != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
stmt, args, err := builder.ToSql()
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "EXEC-KOS7I", "Errors.Internal")
|
||||
}
|
||||
|
||||
result, err := l.dbClient.ExecContext(ctx, stmt, args...)
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "EXEC-0j6i5", "Errors.Access.StorageFailed")
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "EXEC-MGchJ", "Errors.Internal")
|
||||
}
|
||||
|
||||
logging.WithFields("rows", rows).Debug("successfully stored execution logs")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) QueryUsage(ctx context.Context, instanceId string, start time.Time) (uint64, error) {
|
||||
stmt, args, err := squirrel.Select(
|
||||
fmt.Sprintf("COALESCE(SUM(%s)::INT,0)", executionTookCol),
|
||||
).
|
||||
From(executionLogsTable + l.dbClient.Timetravel(call.Took(ctx))).
|
||||
Where(squirrel.And{
|
||||
squirrel.Eq{executionInstanceIdCol: instanceId},
|
||||
squirrel.GtOrEq{executionTimestampCol: start},
|
||||
squirrel.NotEq{executionTookCol: nil},
|
||||
}).
|
||||
PlaceholderFormat(squirrel.Dollar).
|
||||
ToSql()
|
||||
|
||||
if err != nil {
|
||||
return 0, caos_errors.ThrowInternal(err, "EXEC-DXtzg", "Errors.Internal")
|
||||
}
|
||||
|
||||
var durationSeconds uint64
|
||||
err = l.dbClient.
|
||||
QueryRowContext(ctx,
|
||||
func(row *sql.Row) error {
|
||||
return row.Scan(&durationSeconds)
|
||||
},
|
||||
stmt, args...,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, caos_errors.ThrowInternal(err, "EXEC-Ad8nP", "Errors.Logstore.Execution.ScanFailed")
|
||||
}
|
||||
return durationSeconds, nil
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) Cleanup(ctx context.Context, keep time.Duration) error {
|
||||
stmt, args, err := squirrel.Delete(executionLogsTable).
|
||||
Where(squirrel.LtOrEq{executionTimestampCol: time.Now().Add(-keep)}).
|
||||
PlaceholderFormat(squirrel.Dollar).
|
||||
ToSql()
|
||||
|
||||
if err != nil {
|
||||
return caos_errors.ThrowInternal(err, "EXEC-Bja8V", "Errors.Internal")
|
||||
}
|
||||
|
||||
execCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
_, err = l.dbClient.ExecContext(execCtx, stmt, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *databaseLogStorage) incrementUsageFromExecutionLogs(ctx context.Context, instanceID string, periodStart time.Time, records []*record.ExecutionLog) (sum uint64, err error) {
|
||||
var total time.Duration
|
||||
for _, r := range records {
|
||||
total += r.Took
|
||||
}
|
||||
return projection.QuotaProjection.IncrementUsage(ctx, quota.ActionsAllRunsSeconds, instanceID, periodStart, uint64(math.Floor(total.Seconds())))
|
||||
}
|
||||
|
@@ -1,89 +0,0 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
)
|
||||
|
||||
var _ logstore.UsageQuerier = (*InmemLogStorage)(nil)
|
||||
var _ logstore.LogCleanupper = (*InmemLogStorage)(nil)
|
||||
|
||||
type InmemLogStorage struct {
|
||||
mux sync.Mutex
|
||||
clock clock.Clock
|
||||
emitted []*record
|
||||
bulks []int
|
||||
}
|
||||
|
||||
func NewInMemoryStorage(clock clock.Clock) *InmemLogStorage {
|
||||
return &InmemLogStorage{
|
||||
clock: clock,
|
||||
emitted: make([]*record, 0),
|
||||
bulks: make([]int, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) QuotaUnit() quota.Unit {
|
||||
return quota.Unimplemented
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Emit(_ context.Context, bulk []logstore.LogRecord) error {
|
||||
if len(bulk) == 0 {
|
||||
return nil
|
||||
}
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
for idx := range bulk {
|
||||
l.emitted = append(l.emitted, bulk[idx].(*record))
|
||||
}
|
||||
l.bulks = append(l.bulks, len(bulk))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) QueryUsage(_ context.Context, _ string, start time.Time) (uint64, error) {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
var count uint64
|
||||
for _, r := range l.emitted {
|
||||
if r.ts.After(start) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Cleanup(_ context.Context, keep time.Duration) error {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
clean := make([]*record, 0)
|
||||
from := l.clock.Now().Add(-(keep + 1))
|
||||
for _, r := range l.emitted {
|
||||
if r.ts.After(from) {
|
||||
clean = append(clean, r)
|
||||
}
|
||||
}
|
||||
l.emitted = clean
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Bulks() []int {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
return l.bulks
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Len() int {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
return len(l.emitted)
|
||||
}
|
@@ -9,8 +9,8 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
)
|
||||
|
||||
func NewStdoutEmitter() logstore.LogEmitter {
|
||||
return logstore.LogEmitterFunc(func(ctx context.Context, bulk []logstore.LogRecord) error {
|
||||
func NewStdoutEmitter[T logstore.LogRecord[T]]() logstore.LogEmitter[T] {
|
||||
return logstore.LogEmitterFunc[T](func(ctx context.Context, bulk []T) error {
|
||||
for idx := range bulk {
|
||||
bytes, err := json.Marshal(bulk[idx])
|
||||
if err != nil {
|
||||
|
@@ -4,16 +4,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
)
|
||||
|
||||
type emitterOption func(config *logstore.EmitterConfig)
|
||||
|
||||
func emitterConfig(options ...emitterOption) *logstore.EmitterConfig {
|
||||
cfg := &logstore.EmitterConfig{
|
||||
Enabled: true,
|
||||
Keep: time.Hour,
|
||||
CleanupInterval: time.Hour,
|
||||
Enabled: true,
|
||||
Debounce: &logstore.DebouncerConfig{
|
||||
MinFrequency: 0,
|
||||
MaxBulkSize: 0,
|
||||
@@ -37,17 +35,10 @@ func withDisabled() emitterOption {
|
||||
}
|
||||
}
|
||||
|
||||
func withCleanupping(keep, interval time.Duration) emitterOption {
|
||||
return func(c *logstore.EmitterConfig) {
|
||||
c.Keep = keep
|
||||
c.CleanupInterval = interval
|
||||
}
|
||||
}
|
||||
type quotaOption func(config *query.Quota)
|
||||
|
||||
type quotaOption func(config *quota.AddedEvent)
|
||||
|
||||
func quotaConfig(quotaOptions ...quotaOption) quota.AddedEvent {
|
||||
q := "a.AddedEvent{
|
||||
func quotaConfig(quotaOptions ...quotaOption) *query.Quota {
|
||||
q := &query.Quota{
|
||||
Amount: 90,
|
||||
Limit: false,
|
||||
ResetInterval: 90 * time.Second,
|
||||
@@ -56,18 +47,18 @@ func quotaConfig(quotaOptions ...quotaOption) quota.AddedEvent {
|
||||
for _, opt := range quotaOptions {
|
||||
opt(q)
|
||||
}
|
||||
return *q
|
||||
return q
|
||||
}
|
||||
|
||||
func withAmountAndInterval(n uint64) quotaOption {
|
||||
return func(c *quota.AddedEvent) {
|
||||
return func(c *query.Quota) {
|
||||
c.Amount = n
|
||||
c.ResetInterval = time.Duration(n) * time.Second
|
||||
}
|
||||
}
|
||||
|
||||
func withLimiting() quotaOption {
|
||||
return func(c *quota.AddedEvent) {
|
||||
return func(c *query.Quota) {
|
||||
c.Limit = true
|
||||
}
|
||||
}
|
||||
|
120
internal/logstore/mock/inmem.go
Normal file
120
internal/logstore/mock/inmem.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
)
|
||||
|
||||
var _ logstore.UsageStorer[*Record] = (*InmemLogStorage)(nil)
|
||||
var _ logstore.LogCleanupper[*Record] = (*InmemLogStorage)(nil)
|
||||
var _ logstore.Queries = (*InmemLogStorage)(nil)
|
||||
|
||||
type InmemLogStorage struct {
|
||||
mux sync.Mutex
|
||||
clock clock.Clock
|
||||
emitted []*Record
|
||||
bulks []int
|
||||
quota *query.Quota
|
||||
}
|
||||
|
||||
func NewInMemoryStorage(clock clock.Clock, quota *query.Quota) *InmemLogStorage {
|
||||
return &InmemLogStorage{
|
||||
clock: clock,
|
||||
emitted: make([]*Record, 0),
|
||||
bulks: make([]int, 0),
|
||||
quota: quota,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) QuotaUnit() quota.Unit {
|
||||
return quota.Unimplemented
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Emit(_ context.Context, bulk []*Record) error {
|
||||
if len(bulk) == 0 {
|
||||
return nil
|
||||
}
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
l.emitted = append(l.emitted, bulk...)
|
||||
l.bulks = append(l.bulks, len(bulk))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) QueryUsage(_ context.Context, _ string, start time.Time) (uint64, error) {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
var count uint64
|
||||
for _, r := range l.emitted {
|
||||
if r.ts.After(start) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Cleanup(_ context.Context, keep time.Duration) error {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
clean := make([]*Record, 0)
|
||||
from := l.clock.Now().Add(-(keep + 1))
|
||||
for _, r := range l.emitted {
|
||||
if r.ts.After(from) {
|
||||
clean = append(clean, r)
|
||||
}
|
||||
}
|
||||
l.emitted = clean
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Bulks() []int {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
return l.bulks
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) Len() int {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
return len(l.emitted)
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) GetQuota(ctx context.Context, instanceID string, unit quota.Unit) (qu *query.Quota, err error) {
|
||||
return l.quota, nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) GetQuotaUsage(ctx context.Context, instanceID string, unit quota.Unit, periodStart time.Time) (usage uint64, err error) {
|
||||
return uint64(l.Len()), nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) GetRemainingQuotaUsage(ctx context.Context, instanceID string, unit quota.Unit) (remaining *uint64, err error) {
|
||||
if !l.quota.Limit {
|
||||
return nil, nil
|
||||
}
|
||||
var r uint64
|
||||
used := uint64(l.Len())
|
||||
if used > l.quota.Amount {
|
||||
return &r, nil
|
||||
}
|
||||
r = l.quota.Amount - used
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) GetDueQuotaNotifications(ctx context.Context, instanceID string, unit quota.Unit, qu *query.Quota, periodStart time.Time, usedAbs uint64) (dueNotifications []*quota.NotificationDueEvent, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (l *InmemLogStorage) ReportQuotaUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error {
|
||||
return nil
|
||||
}
|
@@ -8,18 +8,18 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
)
|
||||
|
||||
var _ logstore.LogRecord = (*record)(nil)
|
||||
var _ logstore.LogRecord[*Record] = (*Record)(nil)
|
||||
|
||||
func NewRecord(clock clock.Clock) *record {
|
||||
return &record{ts: clock.Now()}
|
||||
func NewRecord(clock clock.Clock) *Record {
|
||||
return &Record{ts: clock.Now()}
|
||||
}
|
||||
|
||||
type record struct {
|
||||
type Record struct {
|
||||
ts time.Time
|
||||
redacted bool
|
||||
}
|
||||
|
||||
func (r record) Normalize() logstore.LogRecord {
|
||||
func (r Record) Normalize() *Record {
|
||||
r.redacted = true
|
||||
return &r
|
||||
}
|
@@ -1,28 +0,0 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
)
|
||||
|
||||
var _ logstore.QuotaQuerier = (*inmemReporter)(nil)
|
||||
|
||||
type inmemReporter struct {
|
||||
config *quota.AddedEvent
|
||||
startPeriod time.Time
|
||||
}
|
||||
|
||||
func NewNoopQuerier(quota *quota.AddedEvent, startPeriod time.Time) *inmemReporter {
|
||||
return &inmemReporter{config: quota, startPeriod: startPeriod}
|
||||
}
|
||||
|
||||
func (i *inmemReporter) GetCurrentQuotaPeriod(context.Context, string, quota.Unit) (*quota.AddedEvent, time.Time, error) {
|
||||
return i.config, i.startPeriod, nil
|
||||
}
|
||||
|
||||
func (*inmemReporter) GetDueQuotaNotifications(context.Context, *quota.AddedEvent, time.Time, uint64) ([]*quota.NotificationDueEvent, error) {
|
||||
return nil, nil
|
||||
}
|
135
internal/logstore/record/access.go
Normal file
135
internal/logstore/record/access.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package record
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
zitadel_http "github.com/zitadel/zitadel/internal/api/http"
|
||||
)
|
||||
|
||||
type AccessLog struct {
|
||||
LogDate time.Time `json:"logDate"`
|
||||
Protocol AccessProtocol `json:"protocol"`
|
||||
RequestURL string `json:"requestUrl"`
|
||||
ResponseStatus uint32 `json:"responseStatus"`
|
||||
// RequestHeaders and ResponseHeaders are plain maps so varying implementations
|
||||
// between HTTP and gRPC don't interfere with each other
|
||||
RequestHeaders map[string][]string `json:"requestHeaders"`
|
||||
ResponseHeaders map[string][]string `json:"responseHeaders"`
|
||||
InstanceID string `json:"instanceId"`
|
||||
ProjectID string `json:"projectId"`
|
||||
RequestedDomain string `json:"requestedDomain"`
|
||||
RequestedHost string `json:"requestedHost"`
|
||||
// NotCountable can be used by the logging service to explicitly stating,
|
||||
// that the request must not increase the amount of countable (authenticated) requests
|
||||
NotCountable bool `json:"-"`
|
||||
normalized bool `json:"-"`
|
||||
}
|
||||
|
||||
type AccessProtocol uint8
|
||||
|
||||
const (
|
||||
GRPC AccessProtocol = iota
|
||||
HTTP
|
||||
|
||||
redacted = "[REDACTED]"
|
||||
)
|
||||
|
||||
var (
|
||||
unaccountableEndpoints = []string{
|
||||
"/zitadel.system.v1.SystemService/",
|
||||
"/zitadel.admin.v1.AdminService/Healthz",
|
||||
"/zitadel.management.v1.ManagementService/Healthz",
|
||||
"/zitadel.management.v1.ManagementService/GetOIDCInformation",
|
||||
"/zitadel.auth.v1.AuthService/Healthz",
|
||||
}
|
||||
)
|
||||
|
||||
func (a AccessLog) IsAuthenticated() bool {
|
||||
if a.NotCountable {
|
||||
return false
|
||||
}
|
||||
if !a.normalized {
|
||||
panic("access log not normalized, Normalize() must be called before IsAuthenticated()")
|
||||
}
|
||||
_, hasHTTPAuthHeader := a.RequestHeaders[strings.ToLower(zitadel_http.Authorization)]
|
||||
// ignore requests, which were unauthorized or do not require an authorization (even if one was sent)
|
||||
// also ignore if the limit was already reached or if the server returned an internal error
|
||||
// not that endpoints paths are only checked with the gRPC representation as HTTP (gateway) will not log them
|
||||
return hasHTTPAuthHeader &&
|
||||
(a.Protocol == HTTP &&
|
||||
a.ResponseStatus != http.StatusInternalServerError &&
|
||||
a.ResponseStatus != http.StatusTooManyRequests &&
|
||||
a.ResponseStatus != http.StatusUnauthorized) ||
|
||||
(a.Protocol == GRPC &&
|
||||
a.ResponseStatus != uint32(codes.Internal) &&
|
||||
a.ResponseStatus != uint32(codes.ResourceExhausted) &&
|
||||
a.ResponseStatus != uint32(codes.Unauthenticated) &&
|
||||
!a.isUnaccountableEndpoint())
|
||||
}
|
||||
|
||||
func (a AccessLog) isUnaccountableEndpoint() bool {
|
||||
for _, endpoint := range unaccountableEndpoints {
|
||||
if strings.HasPrefix(a.RequestURL, endpoint) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (a AccessLog) Normalize() *AccessLog {
|
||||
a.RequestedDomain = cutString(a.RequestedDomain, 200)
|
||||
a.RequestURL = cutString(a.RequestURL, 200)
|
||||
a.RequestHeaders = normalizeHeaders(a.RequestHeaders, strings.ToLower(zitadel_http.Authorization), "grpcgateway-authorization", "cookie", "grpcgateway-cookie")
|
||||
a.ResponseHeaders = normalizeHeaders(a.ResponseHeaders, "set-cookie")
|
||||
a.normalized = true
|
||||
return &a
|
||||
}
|
||||
|
||||
// normalizeHeaders lowers all header keys and redacts secrets
|
||||
func normalizeHeaders(header map[string][]string, redactKeysLower ...string) map[string][]string {
|
||||
return pruneKeys(redactKeys(lowerKeys(header), redactKeysLower...))
|
||||
}
|
||||
|
||||
func lowerKeys(header map[string][]string) map[string][]string {
|
||||
lower := make(map[string][]string, len(header))
|
||||
for k, v := range header {
|
||||
lower[strings.ToLower(k)] = v
|
||||
}
|
||||
return lower
|
||||
}
|
||||
|
||||
func redactKeys(header map[string][]string, redactKeysLower ...string) map[string][]string {
|
||||
redactedKeys := make(map[string][]string, len(header))
|
||||
for k, v := range header {
|
||||
redactedKeys[k] = v
|
||||
}
|
||||
for _, redactKey := range redactKeysLower {
|
||||
if _, ok := redactedKeys[redactKey]; ok {
|
||||
redactedKeys[redactKey] = []string{redacted}
|
||||
}
|
||||
}
|
||||
return redactedKeys
|
||||
}
|
||||
|
||||
const maxValuesPerKey = 10
|
||||
|
||||
func pruneKeys(header map[string][]string) map[string][]string {
|
||||
prunedKeys := make(map[string][]string, len(header))
|
||||
for key, value := range header {
|
||||
valueItems := make([]string, 0, maxValuesPerKey)
|
||||
for i, valueItem := range value {
|
||||
// Max 10 header values per key
|
||||
if i > maxValuesPerKey {
|
||||
break
|
||||
}
|
||||
// Max 200 value length
|
||||
valueItems = append(valueItems, cutString(valueItem, 200))
|
||||
}
|
||||
prunedKeys[key] = valueItems
|
||||
}
|
||||
return prunedKeys
|
||||
}
|
@@ -1,20 +1,18 @@
|
||||
package access_test
|
||||
package record
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/logstore/emitters/access"
|
||||
)
|
||||
|
||||
func TestRecord_Normalize(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
record access.Record
|
||||
want *access.Record
|
||||
record AccessLog
|
||||
want *AccessLog
|
||||
}{{
|
||||
name: "headers with certain keys should be redacted",
|
||||
record: access.Record{
|
||||
record: AccessLog{
|
||||
RequestHeaders: map[string][]string{
|
||||
"authorization": {"AValue"},
|
||||
"grpcgateway-authorization": {"AValue"},
|
||||
@@ -24,7 +22,7 @@ func TestRecord_Normalize(t *testing.T) {
|
||||
"set-cookie": {"AValue"},
|
||||
},
|
||||
},
|
||||
want: &access.Record{
|
||||
want: &AccessLog{
|
||||
RequestHeaders: map[string][]string{
|
||||
"authorization": {"[REDACTED]"},
|
||||
"grpcgateway-authorization": {"[REDACTED]"},
|
||||
@@ -36,22 +34,22 @@ func TestRecord_Normalize(t *testing.T) {
|
||||
},
|
||||
}, {
|
||||
name: "header keys should be lower cased",
|
||||
record: access.Record{
|
||||
record: AccessLog{
|
||||
RequestHeaders: map[string][]string{"AKey": {"AValue"}},
|
||||
ResponseHeaders: map[string][]string{"AKey": {"AValue"}}},
|
||||
want: &access.Record{
|
||||
want: &AccessLog{
|
||||
RequestHeaders: map[string][]string{"akey": {"AValue"}},
|
||||
ResponseHeaders: map[string][]string{"akey": {"AValue"}}},
|
||||
}, {
|
||||
name: "an already prune record should stay unchanged",
|
||||
record: access.Record{
|
||||
record: AccessLog{
|
||||
RequestURL: "https://my.zitadel.cloud/",
|
||||
RequestHeaders: map[string][]string{
|
||||
"authorization": {"[REDACTED]"},
|
||||
},
|
||||
ResponseHeaders: map[string][]string{},
|
||||
},
|
||||
want: &access.Record{
|
||||
want: &AccessLog{
|
||||
RequestURL: "https://my.zitadel.cloud/",
|
||||
RequestHeaders: map[string][]string{
|
||||
"authorization": {"[REDACTED]"},
|
||||
@@ -60,17 +58,18 @@ func TestRecord_Normalize(t *testing.T) {
|
||||
},
|
||||
}, {
|
||||
name: "empty record should stay empty",
|
||||
record: access.Record{
|
||||
record: AccessLog{
|
||||
RequestHeaders: map[string][]string{},
|
||||
ResponseHeaders: map[string][]string{},
|
||||
},
|
||||
want: &access.Record{
|
||||
want: &AccessLog{
|
||||
RequestHeaders: map[string][]string{},
|
||||
ResponseHeaders: map[string][]string{},
|
||||
},
|
||||
}}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
tt.want.normalized = true
|
||||
if got := tt.record.Normalize(); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Normalize() = %v, want %v", got, tt.want)
|
||||
}
|
@@ -1,16 +1,12 @@
|
||||
package execution
|
||||
package record
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
)
|
||||
|
||||
var _ logstore.LogRecord = (*Record)(nil)
|
||||
|
||||
type Record struct {
|
||||
type ExecutionLog struct {
|
||||
LogDate time.Time `json:"logDate"`
|
||||
Took time.Duration `json:"took"`
|
||||
Message string `json:"message"`
|
||||
@@ -20,14 +16,7 @@ type Record struct {
|
||||
Metadata map[string]interface{} `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
func (e Record) Normalize() logstore.LogRecord {
|
||||
func (e ExecutionLog) Normalize() *ExecutionLog {
|
||||
e.Message = cutString(e.Message, 2000)
|
||||
return &e
|
||||
}
|
||||
|
||||
func cutString(str string, pos int) string {
|
||||
if len(str) <= pos {
|
||||
return str
|
||||
}
|
||||
return str[:pos]
|
||||
}
|
8
internal/logstore/record/prune.go
Normal file
8
internal/logstore/record/prune.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package record
|
||||
|
||||
func cutString(str string, pos int) string {
|
||||
if len(str) <= pos {
|
||||
return str
|
||||
}
|
||||
return str[:pos-1]
|
||||
}
|
@@ -2,124 +2,70 @@ package logstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
)
|
||||
|
||||
const handleThresholdTimeout = time.Minute
|
||||
|
||||
type QuotaQuerier interface {
|
||||
GetCurrentQuotaPeriod(ctx context.Context, instanceID string, unit quota.Unit) (config *quota.AddedEvent, periodStart time.Time, err error)
|
||||
GetDueQuotaNotifications(ctx context.Context, config *quota.AddedEvent, periodStart time.Time, used uint64) ([]*quota.NotificationDueEvent, error)
|
||||
}
|
||||
|
||||
type UsageQuerier interface {
|
||||
LogEmitter
|
||||
type UsageStorer[T LogRecord[T]] interface {
|
||||
LogEmitter[T]
|
||||
QuotaUnit() quota.Unit
|
||||
QueryUsage(ctx context.Context, instanceId string, start time.Time) (uint64, error)
|
||||
}
|
||||
|
||||
type UsageReporter interface {
|
||||
Report(ctx context.Context, notifications []*quota.NotificationDueEvent) (err error)
|
||||
}
|
||||
|
||||
type UsageReporterFunc func(context.Context, []*quota.NotificationDueEvent) (err error)
|
||||
|
||||
func (u UsageReporterFunc) Report(ctx context.Context, notifications []*quota.NotificationDueEvent) (err error) {
|
||||
return u(ctx, notifications)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
usageQuerier UsageQuerier
|
||||
quotaQuerier QuotaQuerier
|
||||
usageReporter UsageReporter
|
||||
enabledSinks []*emitter
|
||||
type Service[T LogRecord[T]] struct {
|
||||
queries Queries
|
||||
usageStorer UsageStorer[T]
|
||||
enabledSinks []*emitter[T]
|
||||
sinkEnabled bool
|
||||
reportingEnabled bool
|
||||
}
|
||||
|
||||
func New(quotaQuerier QuotaQuerier, usageReporter UsageReporter, usageQuerierSink *emitter, additionalSink ...*emitter) *Service {
|
||||
var usageQuerier UsageQuerier
|
||||
type Queries interface {
|
||||
GetRemainingQuotaUsage(ctx context.Context, instanceID string, unit quota.Unit) (remaining *uint64, err error)
|
||||
}
|
||||
|
||||
func New[T LogRecord[T]](queries Queries, usageQuerierSink *emitter[T], additionalSink ...*emitter[T]) *Service[T] {
|
||||
var usageStorer UsageStorer[T]
|
||||
if usageQuerierSink != nil {
|
||||
usageQuerier = usageQuerierSink.emitter.(UsageQuerier)
|
||||
usageStorer = usageQuerierSink.emitter.(UsageStorer[T])
|
||||
}
|
||||
|
||||
svc := &Service{
|
||||
svc := &Service[T]{
|
||||
queries: queries,
|
||||
reportingEnabled: usageQuerierSink != nil && usageQuerierSink.enabled,
|
||||
usageQuerier: usageQuerier,
|
||||
quotaQuerier: quotaQuerier,
|
||||
usageReporter: usageReporter,
|
||||
usageStorer: usageStorer,
|
||||
}
|
||||
|
||||
for _, s := range append([]*emitter{usageQuerierSink}, additionalSink...) {
|
||||
for _, s := range append([]*emitter[T]{usageQuerierSink}, additionalSink...) {
|
||||
if s != nil && s.enabled {
|
||||
svc.enabledSinks = append(svc.enabledSinks, s)
|
||||
}
|
||||
}
|
||||
|
||||
svc.sinkEnabled = len(svc.enabledSinks) > 0
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
func (s *Service) Enabled() bool {
|
||||
func (s *Service[T]) Enabled() bool {
|
||||
return s.sinkEnabled
|
||||
}
|
||||
|
||||
func (s *Service) Handle(ctx context.Context, record LogRecord) {
|
||||
func (s *Service[T]) Handle(ctx context.Context, record T) {
|
||||
for _, sink := range s.enabledSinks {
|
||||
logging.OnError(sink.Emit(ctx, record.Normalize())).WithField("record", record).Warn("failed to emit log record")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Limit(ctx context.Context, instanceID string) *uint64 {
|
||||
func (s *Service[T]) Limit(ctx context.Context, instanceID string) *uint64 {
|
||||
var err error
|
||||
defer func() {
|
||||
logging.OnError(err).Warn("failed to check is usage should be limited")
|
||||
logging.OnError(err).Warn("failed to check if usage should be limited")
|
||||
}()
|
||||
|
||||
if !s.reportingEnabled || instanceID == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
quota, periodStart, err := s.quotaQuerier.GetCurrentQuotaPeriod(ctx, instanceID, s.usageQuerier.QuotaUnit())
|
||||
if err != nil || quota == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
usage, err := s.usageQuerier.QueryUsage(ctx, instanceID, periodStart)
|
||||
remaining, err := s.queries.GetRemainingQuotaUsage(ctx, instanceID, s.usageStorer.QuotaUnit())
|
||||
if err != nil {
|
||||
// TODO: shouldn't we just limit then or return the error and decide there?
|
||||
return nil
|
||||
}
|
||||
|
||||
go s.handleThresholds(ctx, quota, periodStart, usage)
|
||||
|
||||
var remaining *uint64
|
||||
if quota.Limit {
|
||||
r := uint64(math.Max(0, float64(quota.Amount)-float64(usage)))
|
||||
remaining = &r
|
||||
}
|
||||
return remaining
|
||||
}
|
||||
|
||||
func (s *Service) handleThresholds(ctx context.Context, quota *quota.AddedEvent, periodStart time.Time, usage uint64) {
|
||||
var err error
|
||||
defer func() {
|
||||
logging.OnError(err).Warn("handling quota thresholds failed")
|
||||
}()
|
||||
|
||||
detatchedCtx, cancel := context.WithTimeout(authz.Detach(ctx), handleThresholdTimeout)
|
||||
defer cancel()
|
||||
|
||||
notifications, err := s.quotaQuerier.GetDueQuotaNotifications(detatchedCtx, quota, periodStart, usage)
|
||||
if err != nil || len(notifications) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
err = s.usageReporter.Report(detatchedCtx, notifications)
|
||||
}
|
||||
|
@@ -14,9 +14,8 @@ import (
|
||||
"github.com/benbjohnson/clock"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/logstore"
|
||||
emittermock "github.com/zitadel/zitadel/internal/logstore/emitters/mock"
|
||||
quotaqueriermock "github.com/zitadel/zitadel/internal/logstore/quotaqueriers/mock"
|
||||
"github.com/zitadel/zitadel/internal/repository/quota"
|
||||
emittermock "github.com/zitadel/zitadel/internal/logstore/mock"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -27,7 +26,7 @@ const (
|
||||
type args struct {
|
||||
mainSink *logstore.EmitterConfig
|
||||
secondarySink *logstore.EmitterConfig
|
||||
config quota.AddedEvent
|
||||
config *query.Quota
|
||||
}
|
||||
|
||||
type want struct {
|
||||
@@ -137,28 +136,6 @@ func TestService(t *testing.T) {
|
||||
len: 0,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
name: "cleanupping works",
|
||||
args: args{
|
||||
mainSink: emitterConfig(withCleanupping(17*time.Second, 28*time.Second)),
|
||||
secondarySink: emitterConfig(withDebouncerConfig(&logstore.DebouncerConfig{
|
||||
MinFrequency: 0,
|
||||
MaxBulkSize: 15,
|
||||
}), withCleanupping(5*time.Second, 47*time.Second)),
|
||||
config: quotaConfig(),
|
||||
},
|
||||
want: want{
|
||||
enabled: true,
|
||||
remaining: nil,
|
||||
mainSink: wantSink{
|
||||
bulks: repeat(1, 60),
|
||||
len: 21,
|
||||
},
|
||||
secondarySink: wantSink{
|
||||
bulks: repeat(15, 4),
|
||||
len: 18,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
name: "when quota has a limit of 90, 30 are remaining",
|
||||
args: args{
|
||||
@@ -232,27 +209,24 @@ func runTest(t *testing.T, name string, args args, want want) bool {
|
||||
})
|
||||
}
|
||||
|
||||
func given(t *testing.T, args args, want want) (context.Context, *clock.Mock, *emittermock.InmemLogStorage, *emittermock.InmemLogStorage, *logstore.Service) {
|
||||
func given(t *testing.T, args args, want want) (context.Context, *clock.Mock, *emittermock.InmemLogStorage, *emittermock.InmemLogStorage, *logstore.Service[*emittermock.Record]) {
|
||||
ctx := context.Background()
|
||||
clock := clock.NewMock()
|
||||
|
||||
periodStart := time.Time{}
|
||||
clock.Set(args.config.From)
|
||||
|
||||
mainStorage := emittermock.NewInMemoryStorage(clock)
|
||||
mainEmitter, err := logstore.NewEmitter(ctx, clock, args.mainSink, mainStorage)
|
||||
mainStorage := emittermock.NewInMemoryStorage(clock, args.config)
|
||||
mainEmitter, err := logstore.NewEmitter[*emittermock.Record](ctx, clock, args.mainSink, mainStorage)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error but got %v", err)
|
||||
}
|
||||
secondaryStorage := emittermock.NewInMemoryStorage(clock)
|
||||
secondaryEmitter, err := logstore.NewEmitter(ctx, clock, args.secondarySink, secondaryStorage)
|
||||
secondaryStorage := emittermock.NewInMemoryStorage(clock, args.config)
|
||||
secondaryEmitter, err := logstore.NewEmitter[*emittermock.Record](ctx, clock, args.secondarySink, secondaryStorage)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error but got %v", err)
|
||||
}
|
||||
|
||||
svc := logstore.New(
|
||||
quotaqueriermock.NewNoopQuerier(&args.config, periodStart),
|
||||
logstore.UsageReporterFunc(func(context.Context, []*quota.NotificationDueEvent) error { return nil }),
|
||||
svc := logstore.New[*emittermock.Record](
|
||||
mainStorage,
|
||||
mainEmitter,
|
||||
secondaryEmitter)
|
||||
|
||||
@@ -262,7 +236,7 @@ func given(t *testing.T, args args, want want) (context.Context, *clock.Mock, *e
|
||||
return ctx, clock, mainStorage, secondaryStorage, svc
|
||||
}
|
||||
|
||||
func when(svc *logstore.Service, ctx context.Context, clock *clock.Mock) *uint64 {
|
||||
func when(svc *logstore.Service[*emittermock.Record], ctx context.Context, clock *clock.Mock) *uint64 {
|
||||
var remaining *uint64
|
||||
for i := 0; i < ticks; i++ {
|
||||
svc.Handle(ctx, emittermock.NewRecord(clock))
|
||||
|
Reference in New Issue
Block a user