Files
tailscale/logtail/filch/filch.go
Joe Tsai 91c7a511b5 logtail: add metrics
Add metrics about logtail uploading and underlying buffer.
Add metrics to the in-memory buffer implementation.

Updates tailscale/corp#21363

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
2025-12-10 17:43:53 -08:00

501 lines
14 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !ts_omit_logtail
// Package filch is a file system queue that pilfers your stderr.
// (A FILe CHannel that filches.)
package filch
import (
"bytes"
"cmp"
"errors"
"expvar"
"fmt"
"io"
"os"
"slices"
"sync"
"tailscale.com/metrics"
"tailscale.com/util/must"
)
var stderrFD = 2 // a variable for testing
var errTooLong = errors.New("filch: line too long")
var errClosed = errors.New("filch: buffer is closed")
const DefaultMaxLineSize = 64 << 10
const DefaultMaxFileSize = 50 << 20
type Options struct {
// ReplaceStderr specifies whether to filch [os.Stderr] such that
// everything written there appears in the [Filch] buffer instead.
// In order to write to stderr instead of writing to [Filch],
// then use [Filch.OrigStderr].
ReplaceStderr bool
// MaxLineSize is the maximum line size that could be encountered,
// including the trailing newline. This is enforced as a hard limit.
// Writes larger than this will be rejected. Reads larger than this
// will report an error and skip over the long line.
// If zero, the [DefaultMaxLineSize] is used.
MaxLineSize int
// MaxFileSize specifies the maximum space on disk to use for logs.
// This is not enforced as a hard limit, but rather a soft limit.
// If zero, then [DefaultMaxFileSize] is used.
MaxFileSize int
}
// A Filch uses two alternating files as a simplistic ring buffer.
type Filch struct {
// OrigStderr is the original [os.Stderr] if [Options.ReplaceStderr] is specified.
// Writing directly to this avoids writing into the Filch buffer.
// Otherwise, it is nil.
OrigStderr *os.File
// maxLineSize specifies the maximum line size to use.
maxLineSize int // immutable once set
// maxFileSize specifies the max space either newer and older should use.
maxFileSize int64 // immutable once set
mu sync.Mutex
newer *os.File // newer logs data; writes are appended to the end
older *os.File // older logs data; reads are consumed from the start
newlyWrittenBytes int64 // bytes written directly to newer; reset upon rotation
newlyFilchedBytes int64 // bytes filched indirectly to newer; reset upon rotation
wrBuf []byte // temporary buffer for writing; only used for writes without trailing newline
wrBufMaxLen int // maximum length of wrBuf; reduced upon every rotation
rdBufIdx int // index into rdBuf for the next unread bytes
rdBuf []byte // temporary buffer for reading
rdBufMaxLen int // maximum length of rdBuf; reduced upon every rotation
// Metrics (see [Filch.ExpVar] for details).
writeCalls expvar.Int
readCalls expvar.Int
rotateCalls expvar.Int
callErrors expvar.Int
writeBytes expvar.Int
readBytes expvar.Int
filchedBytes expvar.Int
droppedBytes expvar.Int
storedBytes expvar.Int
}
// ExpVar returns a [metrics.Set] with metrics about the buffer.
//
// - counter_write_calls: Total number of calls to [Filch.Write]
// (excludes calls when file is closed).
//
// - counter_read_calls: Total number of calls to [Filch.TryReadLine]
// (excludes calls when file is closed or no bytes).
//
// - counter_rotate_calls: Total number of calls to rotate the log files
// (excludes calls when there is nothing to rotate to).
//
// - counter_call_errors: Total number of calls returning errors.
//
// - counter_write_bytes: Total number of bytes written
// (includes bytes filched from stderr).
//
// - counter_read_bytes: Total number of bytes read
// (includes bytes filched from stderr).
//
// - counter_filched_bytes: Total number of bytes filched from stderr.
//
// - counter_dropped_bytes: Total number of bytes dropped
// (includes bytes filched from stderr and lines too long to read).
//
// - gauge_stored_bytes: Current number of bytes stored on disk.
func (f *Filch) ExpVar() expvar.Var {
m := new(metrics.Set)
m.Set("counter_write_calls", &f.writeCalls)
m.Set("counter_read_calls", &f.readCalls)
m.Set("counter_rotate_calls", &f.rotateCalls)
m.Set("counter_call_errors", &f.callErrors)
m.Set("counter_write_bytes", &f.writeBytes)
m.Set("counter_read_bytes", &f.readBytes)
m.Set("counter_filched_bytes", &f.filchedBytes)
m.Set("counter_dropped_bytes", &f.droppedBytes)
m.Set("gauge_stored_bytes", &f.storedBytes)
return m
}
func (f *Filch) unreadReadBuffer() []byte {
return f.rdBuf[f.rdBufIdx:]
}
func (f *Filch) availReadBuffer() []byte {
return f.rdBuf[len(f.rdBuf):cap(f.rdBuf)]
}
func (f *Filch) resetReadBuffer() {
f.rdBufIdx, f.rdBuf = 0, f.rdBuf[:0]
}
func (f *Filch) moveReadBufferToFront() {
f.rdBufIdx, f.rdBuf = 0, f.rdBuf[:copy(f.rdBuf, f.rdBuf[f.rdBufIdx:])]
}
func (f *Filch) growReadBuffer() {
f.rdBuf = slices.Grow(f.rdBuf, cap(f.rdBuf)+1)
}
func (f *Filch) consumeReadBuffer(n int) {
f.rdBufIdx += n
}
func (f *Filch) appendReadBuffer(n int) {
f.rdBuf = f.rdBuf[:len(f.rdBuf)+n]
f.rdBufMaxLen = max(f.rdBufMaxLen, len(f.rdBuf))
}
// TryReadline implements the logtail.Buffer interface.
func (f *Filch) TryReadLine() (b []byte, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.older == nil {
return nil, io.EOF
}
var tooLong bool // whether we are in a line that is too long
defer func() {
f.consumeReadBuffer(len(b))
if tooLong || len(b) > f.maxLineSize {
f.droppedBytes.Add(int64(len(b)))
b, err = nil, cmp.Or(err, errTooLong)
} else {
f.readBytes.Add(int64(len(b)))
}
if len(b) != 0 || err != nil {
f.readCalls.Add(1)
}
if err != nil {
f.callErrors.Add(1)
}
}()
for {
// Check if unread buffer already has the next line.
unread := f.unreadReadBuffer()
if i := bytes.IndexByte(unread, '\n') + len("\n"); i > 0 {
return unread[:i], nil
}
// Check whether to make space for more data to read.
avail := f.availReadBuffer()
if len(avail) == 0 {
switch {
case len(unread) > f.maxLineSize:
tooLong = true
f.droppedBytes.Add(int64(len(unread)))
f.resetReadBuffer()
case len(unread) < cap(f.rdBuf)/10:
f.moveReadBufferToFront()
default:
f.growReadBuffer()
}
avail = f.availReadBuffer() // invariant: len(avail) > 0
}
// Read data into the available buffer.
n, err := f.older.Read(avail)
f.appendReadBuffer(n)
if err != nil {
if err == io.EOF {
unread = f.unreadReadBuffer()
if len(unread) == 0 {
if err := f.rotateLocked(); err != nil {
return nil, err
}
if f.storedBytes.Value() == 0 {
return nil, nil
}
continue
}
return unread, nil
}
return nil, err
}
}
}
var alwaysStatForTests bool
// Write implements the logtail.Buffer interface.
func (f *Filch) Write(b []byte) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.newer == nil {
return 0, errClosed
}
defer func() {
f.writeCalls.Add(1)
if err != nil {
f.callErrors.Add(1)
}
}()
// To make sure we do not write data to disk unbounded
// (in the event that we are not draining fast enough)
// check whether we exceeded maxFileSize.
// If so, then force a file rotation.
if f.newlyWrittenBytes+f.newlyFilchedBytes > f.maxFileSize || f.writeCalls.Value()%100 == 0 || alwaysStatForTests {
f.statAndUpdateBytes()
if f.newlyWrittenBytes+f.newlyFilchedBytes > f.maxFileSize {
if err := f.rotateLocked(); err != nil {
return 0, err
}
}
}
// Write the log entry (appending a newline character if needed).
var newline string
if len(b) == 0 || b[len(b)-1] != '\n' {
newline = "\n"
f.wrBuf = append(append(f.wrBuf[:0], b...), newline...)
f.wrBufMaxLen = max(f.wrBufMaxLen, len(f.wrBuf))
b = f.wrBuf
}
if len(b) > f.maxLineSize {
for line := range bytes.Lines(b) {
if len(line) > f.maxLineSize {
return 0, errTooLong
}
}
}
n, err = f.newer.Write(b)
f.writeBytes.Add(int64(n))
f.storedBytes.Add(int64(n))
f.newlyWrittenBytes += int64(n)
return n - len(newline), err // subtract possibly appended newline
}
func (f *Filch) statAndUpdateBytes() {
if fi, err := f.newer.Stat(); err == nil {
prevSize := f.newlyWrittenBytes + f.newlyFilchedBytes
filchedBytes := max(0, fi.Size()-prevSize)
f.writeBytes.Add(filchedBytes)
f.filchedBytes.Add(filchedBytes)
f.storedBytes.Add(filchedBytes)
f.newlyFilchedBytes += filchedBytes
}
}
func (f *Filch) storedBytesForTest() int64 {
return must.Get(f.newer.Stat()).Size() + must.Get(f.older.Stat()).Size()
}
var activeStderrWriteForTest sync.RWMutex
// stderrWriteForTest calls [os.Stderr.Write], but respects calls to [waitIdleStderrForTest].
func stderrWriteForTest(b []byte) int {
activeStderrWriteForTest.RLock()
defer activeStderrWriteForTest.RUnlock()
return must.Get(os.Stderr.Write(b))
}
// waitIdleStderrForTest waits until there are no active stderrWriteForTest calls.
func waitIdleStderrForTest() {
activeStderrWriteForTest.Lock()
defer activeStderrWriteForTest.Unlock()
}
// rotateLocked swaps f.newer and f.older such that:
//
// - f.newer will be truncated and future writes will be appended to the end.
// - if [Options.ReplaceStderr], then stderr writes will redirect to f.newer
// - f.older will contain historical data, reads will consume from the start.
// - f.older is guaranteed to be immutable.
//
// There are two reasons for rotating:
//
// - The reader finished reading f.older.
// No data should be lost under this condition.
//
// - The writer exceeded a limit for f.newer.
// Data may be lost under this cxondition.
func (f *Filch) rotateLocked() error {
f.rotateCalls.Add(1)
// Truncate the older file.
if fi, err := f.older.Stat(); err != nil {
return err
} else if fi.Size() > 0 {
// Update dropped bytes.
if pos, err := f.older.Seek(0, io.SeekCurrent); err == nil {
rdPos := pos - int64(len(f.unreadReadBuffer())) // adjust for data already read into the read buffer
f.droppedBytes.Add(max(0, fi.Size()-rdPos))
}
f.resetReadBuffer()
// Truncate the older file and write relative to the start.
if err := f.older.Truncate(0); err != nil {
return err
}
if _, err := f.older.Seek(0, io.SeekStart); err != nil {
return err
}
}
// Swap newer and older.
f.newer, f.older = f.older, f.newer
// If necessary, filch stderr into newer instead of older.
// This must be done after truncation otherwise
// we might lose some stderr data asynchronously written
// right in the middle of a rotation.
// Note that mutex does not prevent stderr writes.
prevSize := f.newlyWrittenBytes + f.newlyFilchedBytes
f.newlyWrittenBytes, f.newlyFilchedBytes = 0, 0
if f.OrigStderr != nil {
if err := dup2Stderr(f.newer); err != nil {
return err
}
}
// Update filched bytes and stored bytes metrics.
// This must be done after filching to newer
// so that f.older.Stat is *mostly* stable.
//
// NOTE: Unfortunately, an asynchronous os.Stderr.Write call
// that is already in progress when we called dup2Stderr
// will still write to the previous FD and
// may not be immediately observable by this Stat call.
// This is fundamentally unsolvable with the current design
// as we cannot synchronize all other os.Stderr.Write calls.
// In rare cases, it is possible that [Filch.TryReadLine] consumes
// the entire older file before the write commits,
// leading to dropped stderr lines.
waitIdleStderrForTest()
if fi, err := f.older.Stat(); err != nil {
return err
} else {
filchedBytes := max(0, fi.Size()-prevSize)
f.writeBytes.Add(filchedBytes)
f.filchedBytes.Add(filchedBytes)
f.storedBytes.Set(fi.Size()) // newer has been truncated, so only older matters
}
// Start reading from the start of older.
if _, err := f.older.Seek(0, io.SeekStart); err != nil {
return err
}
// Garbage collect unnecessarily large buffers.
mayGarbageCollect := func(b []byte, maxLen int) ([]byte, int) {
if cap(b)/4 > maxLen { // if less than 25% utilized
b = slices.Grow([]byte(nil), 2*maxLen)
}
maxLen = 3 * (maxLen / 4) // reduce by 25%
return b, maxLen
}
f.wrBuf, f.wrBufMaxLen = mayGarbageCollect(f.wrBuf, f.wrBufMaxLen)
f.rdBuf, f.rdBufMaxLen = mayGarbageCollect(f.rdBuf, f.rdBufMaxLen)
return nil
}
// Close closes the Filch, releasing all resources.
func (f *Filch) Close() error {
f.mu.Lock()
defer f.mu.Unlock()
var errUnsave, errCloseNew, errCloseOld error
if f.OrigStderr != nil {
errUnsave = unsaveStderr(f.OrigStderr)
f.OrigStderr = nil
}
if f.newer != nil {
errCloseNew = f.newer.Close()
f.newer = nil
}
if f.older != nil {
errCloseOld = f.older.Close()
f.older = nil
}
return errors.Join(errUnsave, errCloseNew, errCloseOld)
}
// New creates a new filch around two log files, each starting with filePrefix.
func New(filePrefix string, opts Options) (f *Filch, err error) {
var f1, f2 *os.File
defer func() {
if err != nil {
if f1 != nil {
f1.Close()
}
if f2 != nil {
f2.Close()
}
err = fmt.Errorf("filch: %s", err)
}
}()
path1 := filePrefix + ".log1.txt"
path2 := filePrefix + ".log2.txt"
f1, err = os.OpenFile(path1, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
f2, err = os.OpenFile(path2, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
fi1, err := f1.Stat()
if err != nil {
return nil, err
}
fi2, err := f2.Stat()
if err != nil {
return nil, err
}
f = new(Filch)
f.maxLineSize = int(cmp.Or(max(0, opts.MaxLineSize), DefaultMaxLineSize))
f.maxFileSize = int64(cmp.Or(max(0, opts.MaxFileSize), DefaultMaxFileSize))
f.maxFileSize /= 2 // since there are two log files that combine to equal MaxFileSize
// Neither, either, or both files may exist and contain logs from
// the last time the process ran. The three cases are:
//
// - neither: all logs were read out and files were truncated
// - either: logs were being written into one of the files
// - both: the files were swapped and were starting to be
// read out, while new logs streamed into the other
// file, but the read out did not complete
switch {
case fi1.Size() > 0 && fi2.Size() == 0:
f.newer, f.older = f2, f1 // use empty file as newer
case fi2.Size() > 0 && fi1.Size() == 0:
f.newer, f.older = f1, f2 // use empty file as newer
case fi1.ModTime().Before(fi2.ModTime()):
f.newer, f.older = f2, f1 // use older file as older
case fi2.ModTime().Before(fi1.ModTime()):
f.newer, f.older = f1, f2 // use newer file as newer
default:
f.newer, f.older = f1, f2 // does not matter
}
f.writeBytes.Set(fi1.Size() + fi2.Size())
f.storedBytes.Set(fi1.Size() + fi2.Size())
if fi, err := f.newer.Stat(); err == nil {
f.newlyWrittenBytes = fi.Size()
}
f.OrigStderr = nil
if opts.ReplaceStderr {
f.OrigStderr, err = saveStderr()
if err != nil {
return nil, err
}
if err := dup2Stderr(f.newer); err != nil {
return nil, err
}
}
return f, nil
}