2023-02-15 01:52:11 +00:00
|
|
|
package logstore
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/benbjohnson/clock"
|
|
|
|
)
|
|
|
|
|
|
|
|
type EmitterConfig struct {
|
2023-09-15 14:58:45 +00:00
|
|
|
Enabled bool
|
|
|
|
Debounce *DebouncerConfig
|
2023-02-15 01:52:11 +00:00
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
type emitter[T LogRecord[T]] struct {
|
2023-02-15 01:52:11 +00:00
|
|
|
enabled bool
|
|
|
|
ctx context.Context
|
2023-09-15 14:58:45 +00:00
|
|
|
debouncer *debouncer[T]
|
|
|
|
emitter LogEmitter[T]
|
2023-02-15 01:52:11 +00:00
|
|
|
clock clock.Clock
|
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
type LogRecord[T any] interface {
|
|
|
|
Normalize() T
|
2023-02-15 01:52:11 +00:00
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
type LogRecordFunc[T any] func() T
|
2023-02-15 01:52:11 +00:00
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
func (r LogRecordFunc[T]) Normalize() T {
|
2023-02-15 01:52:11 +00:00
|
|
|
return r()
|
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
type LogEmitter[T LogRecord[T]] interface {
|
|
|
|
Emit(ctx context.Context, bulk []T) error
|
2023-02-15 01:52:11 +00:00
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
type LogEmitterFunc[T LogRecord[T]] func(ctx context.Context, bulk []T) error
|
2023-02-15 01:52:11 +00:00
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
func (l LogEmitterFunc[T]) Emit(ctx context.Context, bulk []T) error {
|
2023-02-15 01:52:11 +00:00
|
|
|
return l(ctx, bulk)
|
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
type LogCleanupper[T LogRecord[T]] interface {
|
2023-02-15 01:52:11 +00:00
|
|
|
Cleanup(ctx context.Context, keep time.Duration) error
|
2023-09-15 14:58:45 +00:00
|
|
|
LogEmitter[T]
|
2023-02-15 01:52:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewEmitter accepts Clock from github.com/benbjohnson/clock so we can control timers and tickers in the unit tests
|
2023-09-15 14:58:45 +00:00
|
|
|
func NewEmitter[T LogRecord[T]](ctx context.Context, clock clock.Clock, cfg *EmitterConfig, logger LogEmitter[T]) (*emitter[T], error) {
|
|
|
|
svc := &emitter[T]{
|
2023-02-15 01:52:11 +00:00
|
|
|
enabled: cfg != nil && cfg.Enabled,
|
|
|
|
ctx: ctx,
|
|
|
|
emitter: logger,
|
|
|
|
clock: clock,
|
|
|
|
}
|
|
|
|
|
|
|
|
if !svc.enabled {
|
|
|
|
return svc, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if cfg.Debounce != nil && (cfg.Debounce.MinFrequency > 0 || cfg.Debounce.MaxBulkSize > 0) {
|
2023-09-15 14:58:45 +00:00
|
|
|
svc.debouncer = newDebouncer[T](ctx, *cfg.Debounce, clock, newStorageBulkSink(svc.emitter))
|
2023-02-15 01:52:11 +00:00
|
|
|
}
|
|
|
|
return svc, nil
|
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
func (s *emitter[T]) Emit(ctx context.Context, record T) (err error) {
|
2023-02-15 01:52:11 +00:00
|
|
|
if !s.enabled {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if s.debouncer != nil {
|
|
|
|
s.debouncer.add(record)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
return s.emitter.Emit(ctx, []T{record})
|
2023-02-15 01:52:11 +00:00
|
|
|
}
|
|
|
|
|
2023-09-15 14:58:45 +00:00
|
|
|
func newStorageBulkSink[T LogRecord[T]](emitter LogEmitter[T]) bulkSinkFunc[T] {
|
|
|
|
return func(ctx context.Context, bulk []T) error {
|
2023-02-15 01:52:11 +00:00
|
|
|
return emitter.Emit(ctx, bulk)
|
|
|
|
}
|
|
|
|
}
|