logtail: move a scratch buffer to Logger

Rather than pass around a scratch buffer, put it on the Logger.

This is a baby step towards removing the background uploading
goroutine and starting it as needed.

Updates tailscale/corp#18514 (insofar as it led me to look at this code)

Change-Id: I6fd94581c28bde40fdb9fca788eb9590bcedae1b
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2024-03-21 06:15:47 -07:00 committed by Brad Fitzpatrick
parent 06e22a96b1
commit 6d90966c1f

View File

@ -185,6 +185,7 @@ type Logger struct {
netMonitor *netmon.Monitor netMonitor *netmon.Monitor
buffer Buffer buffer Buffer
drainWake chan struct{} // signal to speed up drain drainWake chan struct{} // signal to speed up drain
drainBuf bytes.Buffer // owned by drainPending for reuse
flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
flushPending atomic.Bool flushPending atomic.Bool
sentinel chan int32 sentinel chan int32
@ -302,10 +303,10 @@ func (l *Logger) drainBlock() (shuttingDown bool) {
} }
// drainPending drains and encodes a batch of logs from the buffer for upload. // drainPending drains and encodes a batch of logs from the buffer for upload.
// It uses scratch as its initial buffer.
// If no logs are available, drainPending blocks until logs are available. // If no logs are available, drainPending blocks until logs are available.
func (l *Logger) drainPending(scratch []byte) (res []byte) { func (l *Logger) drainPending() (res []byte) {
buf := bytes.NewBuffer(scratch[:0]) buf := &l.drainBuf
buf.Reset()
buf.WriteByte('[') buf.WriteByte('[')
entries := 0 entries := 0
@ -323,6 +324,14 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) {
break break
} }
// We're about to block. If we're holding on to too much memory
// in our buffer from a previous large write, let it go.
if buf.Available() > 4<<10 {
cur := buf.Bytes()
l.drainBuf = bytes.Buffer{}
buf.Write(cur)
}
batchDone = l.drainBlock() batchDone = l.drainBlock()
continue continue
} }
@ -365,9 +374,8 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) {
func (l *Logger) uploading(ctx context.Context) { func (l *Logger) uploading(ctx context.Context) {
defer close(l.shutdownDone) defer close(l.shutdownDone)
scratch := make([]byte, 4096) // reusable buffer to write into
for { for {
body := l.drainPending(scratch) body := l.drainPending()
origlen := -1 // sentinel value: uncompressed origlen := -1 // sentinel value: uncompressed
// Don't attempt to compress tiny bodies; not worth the CPU cycles. // Don't attempt to compress tiny bodies; not worth the CPU cycles.
if (l.compressLogs || l.zstdEncoder != nil) && len(body) > 256 { if (l.compressLogs || l.zstdEncoder != nil) && len(body) > 256 {