perf: project quotas and usages (#6441)

* project quota added

* project quota removed

* add periods table

* make log record generic

* accumulate usage

* query usage

* count action run seconds

* fix filter in ReportQuotaUsage

* fix existing tests

* fix logstore tests

* fix typo

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* move notifications into debouncer and improve limit querying

* cleanup

* comment

* fix: add quota unit tests command side

* fix remaining quota usage query

* implement InmemLogStorage

* cleanup and linting

* improve test

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* action notifications and fixes for notifications query

* revert console prefix

* fix: add quota unit tests command side

* fix: add quota integration tests

* improve accountable requests

* improve accountable requests

* fix: add quota integration tests

* fix: add quota integration tests

* fix: add quota integration tests

* comment

* remove ability to store logs in db and other changes requested from review

* changes requested from review

* changes requested from review

* Update internal/api/http/middleware/access_interceptor.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* tests: fix quotas integration tests

* improve incrementUsageStatement

* linting

* fix: delete e2e tests as intergation tests cover functionality

* Update internal/api/http/middleware/access_interceptor.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* backup

* fix conflict

* create rc

* create prerelease

* remove issue release labeling

* fix tracing

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
Co-authored-by: Stefan Benz <stefan@caos.ch>
Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
This commit is contained in:
Elio Bischof
2023-09-15 16:58:45 +02:00
committed by GitHub
parent b4d0d2c9a7
commit 1a49b7d298
66 changed files with 3423 additions and 1413 deletions

View File

@@ -3,167 +3,91 @@ package access
import (
"context"
"database/sql"
"fmt"
"net/http"
"strings"
"errors"
"time"
"github.com/Masterminds/squirrel"
"github.com/zitadel/logging"
"google.golang.org/grpc/codes"
"github.com/zitadel/zitadel/internal/api/call"
zitadel_http "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/database"
caos_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/logstore"
"github.com/zitadel/zitadel/internal/logstore/record"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/quota"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
const (
accessLogsTable = "logstore.access"
accessTimestampCol = "log_date"
accessProtocolCol = "protocol"
accessRequestURLCol = "request_url"
accessResponseStatusCol = "response_status"
accessRequestHeadersCol = "request_headers"
accessResponseHeadersCol = "response_headers"
accessInstanceIdCol = "instance_id"
accessProjectIdCol = "project_id"
accessRequestedDomainCol = "requested_domain"
accessRequestedHostCol = "requested_host"
)
var _ logstore.UsageQuerier = (*databaseLogStorage)(nil)
var _ logstore.LogCleanupper = (*databaseLogStorage)(nil)
var _ logstore.UsageStorer[*record.AccessLog] = (*databaseLogStorage)(nil)
type databaseLogStorage struct {
dbClient *database.DB
commands *command.Commands
queries *query.Queries
}
func NewDatabaseLogStorage(dbClient *database.DB) *databaseLogStorage {
return &databaseLogStorage{dbClient: dbClient}
func NewDatabaseLogStorage(dbClient *database.DB, commands *command.Commands, queries *query.Queries) *databaseLogStorage {
return &databaseLogStorage{dbClient: dbClient, commands: commands, queries: queries}
}
func (l *databaseLogStorage) QuotaUnit() quota.Unit {
return quota.RequestsAllAuthenticated
}
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []logstore.LogRecord) error {
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []*record.AccessLog) error {
if len(bulk) == 0 {
return nil
}
builder := squirrel.Insert(accessLogsTable).
Columns(
accessTimestampCol,
accessProtocolCol,
accessRequestURLCol,
accessResponseStatusCol,
accessRequestHeadersCol,
accessResponseHeadersCol,
accessInstanceIdCol,
accessProjectIdCol,
accessRequestedDomainCol,
accessRequestedHostCol,
).
PlaceholderFormat(squirrel.Dollar)
for idx := range bulk {
item := bulk[idx].(*Record)
builder = builder.Values(
item.LogDate,
item.Protocol,
item.RequestURL,
item.ResponseStatus,
item.RequestHeaders,
item.ResponseHeaders,
item.InstanceID,
item.ProjectID,
item.RequestedDomain,
item.RequestedHost,
)
}
stmt, args, err := builder.ToSql()
if err != nil {
return caos_errors.ThrowInternal(err, "ACCESS-KOS7I", "Errors.Internal")
}
result, err := l.dbClient.ExecContext(ctx, stmt, args...)
if err != nil {
return caos_errors.ThrowInternal(err, "ACCESS-alnT9", "Errors.Access.StorageFailed")
}
rows, err := result.RowsAffected()
if err != nil {
return caos_errors.ThrowInternal(err, "ACCESS-7KIpL", "Errors.Internal")
}
logging.WithFields("rows", rows).Debug("successfully stored access logs")
return nil
return l.incrementUsage(ctx, bulk)
}
func (l *databaseLogStorage) QueryUsage(ctx context.Context, instanceId string, start time.Time) (uint64, error) {
stmt, args, err := squirrel.Select(
fmt.Sprintf("count(%s)", accessInstanceIdCol),
).
From(accessLogsTable + l.dbClient.Timetravel(call.Took(ctx))).
Where(squirrel.And{
squirrel.Eq{accessInstanceIdCol: instanceId},
squirrel.GtOrEq{accessTimestampCol: start},
squirrel.Expr(fmt.Sprintf(`%s #>> '{%s,0}' = '[REDACTED]'`, accessRequestHeadersCol, strings.ToLower(zitadel_http.Authorization))),
squirrel.NotLike{accessRequestURLCol: "%/zitadel.system.v1.SystemService/%"},
squirrel.NotLike{accessRequestURLCol: "%/system/v1/%"},
squirrel.Or{
squirrel.And{
squirrel.Eq{accessProtocolCol: HTTP},
squirrel.NotEq{accessResponseStatusCol: http.StatusForbidden},
squirrel.NotEq{accessResponseStatusCol: http.StatusInternalServerError},
squirrel.NotEq{accessResponseStatusCol: http.StatusTooManyRequests},
},
squirrel.And{
squirrel.Eq{accessProtocolCol: GRPC},
squirrel.NotEq{accessResponseStatusCol: codes.PermissionDenied},
squirrel.NotEq{accessResponseStatusCol: codes.Internal},
squirrel.NotEq{accessResponseStatusCol: codes.ResourceExhausted},
},
},
}).
PlaceholderFormat(squirrel.Dollar).
ToSql()
func (l *databaseLogStorage) incrementUsage(ctx context.Context, bulk []*record.AccessLog) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
if err != nil {
return 0, caos_errors.ThrowInternal(err, "ACCESS-V9Sde", "Errors.Internal")
byInstance := make(map[string][]*record.AccessLog)
for _, r := range bulk {
if r.InstanceID != "" {
byInstance[r.InstanceID] = append(byInstance[r.InstanceID], r)
}
}
var count uint64
err = l.dbClient.
QueryRowContext(ctx,
func(row *sql.Row) error {
return row.Scan(&count)
},
stmt, args...,
)
if err != nil {
return 0, caos_errors.ThrowInternal(err, "ACCESS-pBPrM", "Errors.Logstore.Access.ScanFailed")
for instanceID, instanceBulk := range byInstance {
q, getQuotaErr := l.queries.GetQuota(ctx, instanceID, quota.RequestsAllAuthenticated)
if errors.Is(getQuotaErr, sql.ErrNoRows) {
continue
}
err = errors.Join(err, getQuotaErr)
if getQuotaErr != nil {
continue
}
sum, incrementErr := l.incrementUsageFromAccessLogs(ctx, instanceID, q.CurrentPeriodStart, instanceBulk)
err = errors.Join(err, incrementErr)
if incrementErr != nil {
continue
}
notifications, getNotificationErr := l.queries.GetDueQuotaNotifications(ctx, instanceID, quota.RequestsAllAuthenticated, q, q.CurrentPeriodStart, sum)
err = errors.Join(err, getNotificationErr)
if getNotificationErr != nil || len(notifications) == 0 {
continue
}
ctx = authz.WithInstanceID(ctx, instanceID)
reportErr := l.commands.ReportQuotaUsage(ctx, notifications)
err = errors.Join(err, reportErr)
if reportErr != nil {
continue
}
}
return count, nil
}
func (l *databaseLogStorage) Cleanup(ctx context.Context, keep time.Duration) error {
stmt, args, err := squirrel.Delete(accessLogsTable).
Where(squirrel.LtOrEq{accessTimestampCol: time.Now().Add(-keep)}).
PlaceholderFormat(squirrel.Dollar).
ToSql()
if err != nil {
return caos_errors.ThrowInternal(err, "ACCESS-2oTh6", "Errors.Internal")
}
execCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = l.dbClient.ExecContext(execCtx, stmt, args...)
return err
}
func (l *databaseLogStorage) incrementUsageFromAccessLogs(ctx context.Context, instanceID string, periodStart time.Time, records []*record.AccessLog) (sum uint64, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
var count uint64
for _, r := range records {
if r.IsAuthenticated() {
count++
}
}
return projection.QuotaProjection.IncrementUsage(ctx, quota.RequestsAllAuthenticated, instanceID, periodStart, count)
}

View File

@@ -1,97 +0,0 @@
package access
import (
"strings"
"time"
zitadel_http "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/logstore"
)
var _ logstore.LogRecord = (*Record)(nil)
type Record struct {
LogDate time.Time `json:"logDate"`
Protocol Protocol `json:"protocol"`
RequestURL string `json:"requestUrl"`
ResponseStatus uint32 `json:"responseStatus"`
// RequestHeaders are plain maps so varying implementation
// between HTTP and gRPC don't interfere with each other
RequestHeaders map[string][]string `json:"requestHeaders"`
// ResponseHeaders are plain maps so varying implementation
// between HTTP and gRPC don't interfere with each other
ResponseHeaders map[string][]string `json:"responseHeaders"`
InstanceID string `json:"instanceId"`
ProjectID string `json:"projectId"`
RequestedDomain string `json:"requestedDomain"`
RequestedHost string `json:"requestedHost"`
}
type Protocol uint8
const (
GRPC Protocol = iota
HTTP
redacted = "[REDACTED]"
)
func (a Record) Normalize() logstore.LogRecord {
a.RequestedDomain = cutString(a.RequestedDomain, 200)
a.RequestURL = cutString(a.RequestURL, 200)
a.RequestHeaders = normalizeHeaders(a.RequestHeaders, strings.ToLower(zitadel_http.Authorization), "grpcgateway-authorization", "cookie", "grpcgateway-cookie")
a.ResponseHeaders = normalizeHeaders(a.ResponseHeaders, "set-cookie")
return &a
}
// normalizeHeaders lowers all header keys and redacts secrets
func normalizeHeaders(header map[string][]string, redactKeysLower ...string) map[string][]string {
return pruneKeys(redactKeys(lowerKeys(header), redactKeysLower...))
}
func lowerKeys(header map[string][]string) map[string][]string {
lower := make(map[string][]string, len(header))
for k, v := range header {
lower[strings.ToLower(k)] = v
}
return lower
}
func redactKeys(header map[string][]string, redactKeysLower ...string) map[string][]string {
redactedKeys := make(map[string][]string, len(header))
for k, v := range header {
redactedKeys[k] = v
}
for _, redactKey := range redactKeysLower {
if _, ok := redactedKeys[redactKey]; ok {
redactedKeys[redactKey] = []string{redacted}
}
}
return redactedKeys
}
const maxValuesPerKey = 10
func pruneKeys(header map[string][]string) map[string][]string {
prunedKeys := make(map[string][]string, len(header))
for key, value := range header {
valueItems := make([]string, 0, maxValuesPerKey)
for i, valueItem := range value {
// Max 10 header values per key
if i > maxValuesPerKey {
break
}
// Max 200 value length
valueItems = append(valueItems, cutString(valueItem, 200))
}
prunedKeys[key] = valueItems
}
return prunedKeys
}
func cutString(str string, pos int) string {
if len(str) <= pos {
return str
}
return str[:pos-1]
}

View File

@@ -1,79 +0,0 @@
package access_test
import (
"reflect"
"testing"
"github.com/zitadel/zitadel/internal/logstore/emitters/access"
)
func TestRecord_Normalize(t *testing.T) {
tests := []struct {
name string
record access.Record
want *access.Record
}{{
name: "headers with certain keys should be redacted",
record: access.Record{
RequestHeaders: map[string][]string{
"authorization": {"AValue"},
"grpcgateway-authorization": {"AValue"},
"cookie": {"AValue"},
"grpcgateway-cookie": {"AValue"},
}, ResponseHeaders: map[string][]string{
"set-cookie": {"AValue"},
},
},
want: &access.Record{
RequestHeaders: map[string][]string{
"authorization": {"[REDACTED]"},
"grpcgateway-authorization": {"[REDACTED]"},
"cookie": {"[REDACTED]"},
"grpcgateway-cookie": {"[REDACTED]"},
}, ResponseHeaders: map[string][]string{
"set-cookie": {"[REDACTED]"},
},
},
}, {
name: "header keys should be lower cased",
record: access.Record{
RequestHeaders: map[string][]string{"AKey": {"AValue"}},
ResponseHeaders: map[string][]string{"AKey": {"AValue"}}},
want: &access.Record{
RequestHeaders: map[string][]string{"akey": {"AValue"}},
ResponseHeaders: map[string][]string{"akey": {"AValue"}}},
}, {
name: "an already prune record should stay unchanged",
record: access.Record{
RequestURL: "https://my.zitadel.cloud/",
RequestHeaders: map[string][]string{
"authorization": {"[REDACTED]"},
},
ResponseHeaders: map[string][]string{},
},
want: &access.Record{
RequestURL: "https://my.zitadel.cloud/",
RequestHeaders: map[string][]string{
"authorization": {"[REDACTED]"},
},
ResponseHeaders: map[string][]string{},
},
}, {
name: "empty record should stay empty",
record: access.Record{
RequestHeaders: map[string][]string{},
ResponseHeaders: map[string][]string{},
},
want: &access.Record{
RequestHeaders: map[string][]string{},
ResponseHeaders: map[string][]string{},
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.record.Normalize(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Normalize() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -3,142 +3,84 @@ package execution
import (
"context"
"database/sql"
"fmt"
"errors"
"math"
"time"
"github.com/Masterminds/squirrel"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/database"
caos_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/logstore"
"github.com/zitadel/zitadel/internal/logstore/record"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/quota"
)
const (
executionLogsTable = "logstore.execution"
executionTimestampCol = "log_date"
executionTookCol = "took"
executionMessageCol = "message"
executionLogLevelCol = "loglevel"
executionInstanceIdCol = "instance_id"
executionActionIdCol = "action_id"
executionMetadataCol = "metadata"
)
var _ logstore.UsageQuerier = (*databaseLogStorage)(nil)
var _ logstore.LogCleanupper = (*databaseLogStorage)(nil)
var _ logstore.UsageStorer[*record.ExecutionLog] = (*databaseLogStorage)(nil)
type databaseLogStorage struct {
dbClient *database.DB
commands *command.Commands
queries *query.Queries
}
func NewDatabaseLogStorage(dbClient *database.DB) *databaseLogStorage {
return &databaseLogStorage{dbClient: dbClient}
func NewDatabaseLogStorage(dbClient *database.DB, commands *command.Commands, queries *query.Queries) *databaseLogStorage {
return &databaseLogStorage{dbClient: dbClient, commands: commands, queries: queries}
}
func (l *databaseLogStorage) QuotaUnit() quota.Unit {
return quota.ActionsAllRunsSeconds
}
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []logstore.LogRecord) error {
func (l *databaseLogStorage) Emit(ctx context.Context, bulk []*record.ExecutionLog) error {
if len(bulk) == 0 {
return nil
}
builder := squirrel.Insert(executionLogsTable).
Columns(
executionTimestampCol,
executionTookCol,
executionMessageCol,
executionLogLevelCol,
executionInstanceIdCol,
executionActionIdCol,
executionMetadataCol,
).
PlaceholderFormat(squirrel.Dollar)
return l.incrementUsage(ctx, bulk)
}
for idx := range bulk {
item := bulk[idx].(*Record)
var took interface{}
if item.Took > 0 {
took = item.Took
func (l *databaseLogStorage) incrementUsage(ctx context.Context, bulk []*record.ExecutionLog) (err error) {
byInstance := make(map[string][]*record.ExecutionLog)
for _, r := range bulk {
if r.InstanceID != "" {
byInstance[r.InstanceID] = append(byInstance[r.InstanceID], r)
}
}
for instanceID, instanceBulk := range byInstance {
q, getQuotaErr := l.queries.GetQuota(ctx, instanceID, quota.ActionsAllRunsSeconds)
if errors.Is(getQuotaErr, sql.ErrNoRows) {
continue
}
err = errors.Join(err, getQuotaErr)
if getQuotaErr != nil {
continue
}
sum, incrementErr := l.incrementUsageFromExecutionLogs(ctx, instanceID, q.CurrentPeriodStart, instanceBulk)
err = errors.Join(err, incrementErr)
if incrementErr != nil {
continue
}
builder = builder.Values(
item.LogDate,
took,
item.Message,
item.LogLevel,
item.InstanceID,
item.ActionID,
item.Metadata,
)
notifications, getNotificationErr := l.queries.GetDueQuotaNotifications(ctx, instanceID, quota.ActionsAllRunsSeconds, q, q.CurrentPeriodStart, sum)
err = errors.Join(err, getNotificationErr)
if getNotificationErr != nil || len(notifications) == 0 {
continue
}
ctx = authz.WithInstanceID(ctx, instanceID)
reportErr := l.commands.ReportQuotaUsage(ctx, notifications)
err = errors.Join(err, reportErr)
if reportErr != nil {
continue
}
}
stmt, args, err := builder.ToSql()
if err != nil {
return caos_errors.ThrowInternal(err, "EXEC-KOS7I", "Errors.Internal")
}
result, err := l.dbClient.ExecContext(ctx, stmt, args...)
if err != nil {
return caos_errors.ThrowInternal(err, "EXEC-0j6i5", "Errors.Access.StorageFailed")
}
rows, err := result.RowsAffected()
if err != nil {
return caos_errors.ThrowInternal(err, "EXEC-MGchJ", "Errors.Internal")
}
logging.WithFields("rows", rows).Debug("successfully stored execution logs")
return nil
}
func (l *databaseLogStorage) QueryUsage(ctx context.Context, instanceId string, start time.Time) (uint64, error) {
stmt, args, err := squirrel.Select(
fmt.Sprintf("COALESCE(SUM(%s)::INT,0)", executionTookCol),
).
From(executionLogsTable + l.dbClient.Timetravel(call.Took(ctx))).
Where(squirrel.And{
squirrel.Eq{executionInstanceIdCol: instanceId},
squirrel.GtOrEq{executionTimestampCol: start},
squirrel.NotEq{executionTookCol: nil},
}).
PlaceholderFormat(squirrel.Dollar).
ToSql()
if err != nil {
return 0, caos_errors.ThrowInternal(err, "EXEC-DXtzg", "Errors.Internal")
}
var durationSeconds uint64
err = l.dbClient.
QueryRowContext(ctx,
func(row *sql.Row) error {
return row.Scan(&durationSeconds)
},
stmt, args...,
)
if err != nil {
return 0, caos_errors.ThrowInternal(err, "EXEC-Ad8nP", "Errors.Logstore.Execution.ScanFailed")
}
return durationSeconds, nil
}
func (l *databaseLogStorage) Cleanup(ctx context.Context, keep time.Duration) error {
stmt, args, err := squirrel.Delete(executionLogsTable).
Where(squirrel.LtOrEq{executionTimestampCol: time.Now().Add(-keep)}).
PlaceholderFormat(squirrel.Dollar).
ToSql()
if err != nil {
return caos_errors.ThrowInternal(err, "EXEC-Bja8V", "Errors.Internal")
}
execCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = l.dbClient.ExecContext(execCtx, stmt, args...)
return err
}
func (l *databaseLogStorage) incrementUsageFromExecutionLogs(ctx context.Context, instanceID string, periodStart time.Time, records []*record.ExecutionLog) (sum uint64, err error) {
var total time.Duration
for _, r := range records {
total += r.Took
}
return projection.QuotaProjection.IncrementUsage(ctx, quota.ActionsAllRunsSeconds, instanceID, periodStart, uint64(math.Floor(total.Seconds())))
}

View File

@@ -1,33 +0,0 @@
package execution
import (
"time"
"github.com/sirupsen/logrus"
"github.com/zitadel/zitadel/internal/logstore"
)
var _ logstore.LogRecord = (*Record)(nil)
type Record struct {
LogDate time.Time `json:"logDate"`
Took time.Duration `json:"took"`
Message string `json:"message"`
LogLevel logrus.Level `json:"logLevel"`
InstanceID string `json:"instanceId"`
ActionID string `json:"actionId,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func (e Record) Normalize() logstore.LogRecord {
e.Message = cutString(e.Message, 2000)
return &e
}
func cutString(str string, pos int) string {
if len(str) <= pos {
return str
}
return str[:pos]
}

View File

@@ -1,89 +0,0 @@
package mock
import (
"context"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/zitadel/zitadel/internal/logstore"
"github.com/zitadel/zitadel/internal/repository/quota"
)
var _ logstore.UsageQuerier = (*InmemLogStorage)(nil)
var _ logstore.LogCleanupper = (*InmemLogStorage)(nil)
type InmemLogStorage struct {
mux sync.Mutex
clock clock.Clock
emitted []*record
bulks []int
}
func NewInMemoryStorage(clock clock.Clock) *InmemLogStorage {
return &InmemLogStorage{
clock: clock,
emitted: make([]*record, 0),
bulks: make([]int, 0),
}
}
func (l *InmemLogStorage) QuotaUnit() quota.Unit {
return quota.Unimplemented
}
func (l *InmemLogStorage) Emit(_ context.Context, bulk []logstore.LogRecord) error {
if len(bulk) == 0 {
return nil
}
l.mux.Lock()
defer l.mux.Unlock()
for idx := range bulk {
l.emitted = append(l.emitted, bulk[idx].(*record))
}
l.bulks = append(l.bulks, len(bulk))
return nil
}
func (l *InmemLogStorage) QueryUsage(_ context.Context, _ string, start time.Time) (uint64, error) {
l.mux.Lock()
defer l.mux.Unlock()
var count uint64
for _, r := range l.emitted {
if r.ts.After(start) {
count++
}
}
return count, nil
}
func (l *InmemLogStorage) Cleanup(_ context.Context, keep time.Duration) error {
l.mux.Lock()
defer l.mux.Unlock()
clean := make([]*record, 0)
from := l.clock.Now().Add(-(keep + 1))
for _, r := range l.emitted {
if r.ts.After(from) {
clean = append(clean, r)
}
}
l.emitted = clean
return nil
}
func (l *InmemLogStorage) Bulks() []int {
l.mux.Lock()
defer l.mux.Unlock()
return l.bulks
}
func (l *InmemLogStorage) Len() int {
l.mux.Lock()
defer l.mux.Unlock()
return len(l.emitted)
}

View File

@@ -1,25 +0,0 @@
package mock
import (
"time"
"github.com/benbjohnson/clock"
"github.com/zitadel/zitadel/internal/logstore"
)
var _ logstore.LogRecord = (*record)(nil)
func NewRecord(clock clock.Clock) *record {
return &record{ts: clock.Now()}
}
type record struct {
ts time.Time
redacted bool
}
func (r record) Normalize() logstore.LogRecord {
r.redacted = true
return &r
}

View File

@@ -9,8 +9,8 @@ import (
"github.com/zitadel/zitadel/internal/logstore"
)
func NewStdoutEmitter() logstore.LogEmitter {
return logstore.LogEmitterFunc(func(ctx context.Context, bulk []logstore.LogRecord) error {
func NewStdoutEmitter[T logstore.LogRecord[T]]() logstore.LogEmitter[T] {
return logstore.LogEmitterFunc[T](func(ctx context.Context, bulk []T) error {
for idx := range bulk {
bytes, err := json.Marshal(bulk[idx])
if err != nil {