diff --git a/logtail/buffer.go b/logtail/buffer.go index 499dea1c7..e33427fc5 100644 --- a/logtail/buffer.go +++ b/logtail/buffer.go @@ -15,6 +15,9 @@ type Buffer interface { // TryReadLine tries to read a log line from the ring buffer. // If no line is available it returns a nil slice. // If the ring buffer is closed it returns io.EOF. + // + // The returned slice may point to data that will be overwritten + // by a subsequent call to TryReadLine. TryReadLine() ([]byte, error) // Write writes a log line into the ring buffer. diff --git a/logtail/logtail.go b/logtail/logtail.go index f63948e48..1793c8b78 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -62,8 +62,11 @@ type Config struct { TimeNow func() time.Time // if set, subsitutes uses of time.Now Stderr io.Writer // if set, logs are sent here instead of os.Stderr Buffer Buffer // temp storage, if nil a MemoryBuffer - CheckLogs <-chan struct{} // signals Logger to check for filched logs to upload NewZstdEncoder func() Encoder // if set, used to compress logs for transmission + + // DrainLogs, if non-nil, disables autmatic uploading of new logs, + // so that logs are only uploaded when a token is sent to DrainLogs. + DrainLogs <-chan struct{} } func Log(cfg Config) Logger { @@ -86,9 +89,6 @@ func Log(cfg Config) Logger { } cfg.Buffer = NewMemoryBuffer(pendingSize) } - if cfg.CheckLogs == nil { - cfg.CheckLogs = make(chan struct{}) - } l := &logger{ stderr: cfg.Stderr, httpc: cfg.HTTPC, @@ -98,7 +98,7 @@ func Log(cfg Config) Logger { skipClientTime: cfg.SkipClientTime, sent: make(chan struct{}, 1), sentinel: make(chan int32, 16), - checkLogs: cfg.CheckLogs, + drainLogs: cfg.DrainLogs, timeNow: cfg.TimeNow, bo: backoff.Backoff{ Name: "logtail", @@ -127,7 +127,7 @@ type logger struct { skipClientTime bool buffer Buffer sent chan struct{} // signal to speed up drain - checkLogs <-chan struct{} // external signal to attempt a drain + drainLogs <-chan struct{} // if non-nil, external signal to attempt a drain sentinel chan int32 timeNow func() time.Time bo backoff.Backoff @@ -164,6 +164,32 @@ func (l *logger) Close() { l.Shutdown(context.Background()) } +// drainBlock is called by drainPending when there are no logs to drain. +// +// In typical operation, every call to the Write method unblocks and triggers +// a buffer.TryReadline, so logs are written with very low latency. +// +// If the caller provides a DrainLogs channel, then unblock-drain-on-Write +// is disabled, and it is up to the caller to trigger unblock the drain. +func (l *logger) drainBlock() (shuttingDown bool) { + if l.drainLogs == nil { + select { + case <-l.shutdownStart: + return true + case <-l.sent: + } + } else { + select { + case <-l.shutdownStart: + return true + case <-l.drainLogs: + } + } + return false +} + +// drainPending drains and encodes a batch of logs from the buffer for upload. +// If no logs are available, drainPending blocks until logs are available. func (l *logger) drainPending() (res []byte) { buf := new(bytes.Buffer) entries := 0 @@ -182,12 +208,7 @@ func (l *logger) drainPending() (res []byte) { break } - select { - case <-l.shutdownStart: - batchDone = true - case <-l.checkLogs: - case <-l.sent: - } + batchDone = l.drainBlock() continue } @@ -304,9 +325,11 @@ func (l *logger) Flush() error { func (l *logger) send(jsonBlob []byte) (int, error) { n, err := l.buffer.Write(jsonBlob) - select { - case l.sent <- struct{}{}: - default: + if l.drainLogs == nil { + select { + case l.sent <- struct{}{}: + default: + } } return n, err }