logtail/filch: rewrite the package

The filch implementation is fairly broken:

* When Filch.cur exceeds MaxFileSize, it calls moveContents
to copy the entirety of cur into alt (while holding the write lock).
By nature, this is the movement of a lot of data in a hot path,
meaning that all log calls will be globally blocked!
It also means that log uploads will be blocked during the move.

* The implementation of moveContents is buggy in that
it copies data from cur into the start of alt,
but fails to truncate alt to the number of bytes copied.
Consequently, there are unrelated lines near the end,
leading to out-of-order lines when being read back.

* Data filched via stderr do not directly respect MaxFileSize,
which is only checked every 100 Filch.Write calls.
This means that it is possible that the file grows far beyond
the specified max file size before moveContents is called.

* If both log files have data when New is called,
it also copies the entirety of cur into alt.
This can block the startup of a process copying lots of data
before the process can do any useful work.

* TryReadLine is implemented using bufio.Scanner.
Unfortunately, it will choke on any lines longer than
bufio.MaxScanTokenSize, rather than gracefully skip over them.

The re-implementation avoids a lot of these problems
by fundamentally eliminating the need for moveContent.
We enforce MaxFileSize by simply rotating the log files
whenever the current file exceeds MaxFileSize/2.
This is a constant-time operation regardless of file size.

To more gracefully handle lines longer than bufio.MaxScanTokenSize,
we skip over these lines (without growing the read buffer)
and report an error. This allows subsequent lines to be read.

In order to improve debugging, we add alot of metrics.

Note that the the mechanism of dup2 with stderr
is inherently racy with a the two file approach.
The order of operations during a rotation is carefully chosen
to reduce the race window to be as short as possible.
Thus, this is slightly less racy than before.

Updates tailscale/corp#21363

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
This commit is contained in:
Joe Tsai
2025-12-05 17:30:46 -08:00
parent 7bc25f77f4
commit 221ca20a00
6 changed files with 740 additions and 323 deletions

View File

@@ -1,148 +1,420 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !ts_omit_logtail
// Package filch is a file system queue that pilfers your stderr.
// (A FILe CHannel that filches.)
package filch
import (
"bufio"
"bytes"
"cmp"
"errors"
"expvar"
"fmt"
"io"
"os"
"slices"
"sync"
"tailscale.com/util/must"
)
var stderrFD = 2 // a variable for testing
const defaultMaxFileSize = 50 << 20
var errTooLong = errors.New("filch: line too long")
var errClosed = errors.New("filch: buffer is closed")
const DefaultMaxLineSize = 64 << 10
const DefaultMaxFileSize = 50 << 20
type Options struct {
ReplaceStderr bool // dup over fd 2 so everything written to stderr comes here
MaxFileSize int
// ReplaceStderr specifies whether to filch [os.Stderr] such that
// everything written there appears in the [Filch] buffer instead.
// In order to write to stderr instead of writing to [Filch],
// then use [Filch.OrigStderr].
ReplaceStderr bool
// MaxLineSize is the maximum line size that could be encountered,
// including the trailing newline. This is enforced as a hard limit.
// Writes larger than this will be rejected. Reads larger than this
// will report an error and skip over the long line.
// If zero, the [DefaultMaxLineSize] is used.
MaxLineSize int
// MaxFileSize specifies the maximum space on disk to use for logs.
// This is not enforced as a hard limit, but rather a soft limit.
// If zero, then [DefaultMaxFileSize] is used.
MaxFileSize int
}
// A Filch uses two alternating files as a simplistic ring buffer.
type Filch struct {
// OrigStderr is the original [os.Stderr] if [Options.ReplaceStderr] is specified.
// Writing directly to this avoids writing into the Filch buffer.
// Otherwise, it is nil.
OrigStderr *os.File
mu sync.Mutex
cur *os.File
alt *os.File
altscan *bufio.Scanner
recovered int64
// maxLineSize specifies the maximum line size to use.
maxLineSize int // immutable once set
maxFileSize int64
writeCounter int
// maxFileSize specifies the max space either newer and older should use.
maxFileSize int64 // immutable once set
// buf is an initial buffer for altscan.
// As of August 2021, 99.96% of all log lines
// are below 4096 bytes in length.
// Since this cutoff is arbitrary, instead of using 4096,
// we subtract off the size of the rest of the struct
// so that the whole struct takes 4096 bytes
// (less on 32 bit platforms).
// This reduces allocation waste.
buf [4096 - 64]byte
mu sync.Mutex
newer *os.File // newer logs data; writes are appended to the end
older *os.File // older logs data; reads are consumed from the start
newlyWrittenBytes int64 // bytes written directly to newer; reset upon rotation
newlyFilchedBytes int64 // bytes filched indirectly to newer; reset upon rotation
wrBuf []byte // temporary buffer for writing; only used for writes without trailing newline
wrBufMaxLen int // maximum length of wrBuf; reduced upon every rotation
rdBufIdx int // index into rdBuf for the next unread bytes
rdBuf []byte // temporary buffer for reading
rdBufMaxLen int // maximum length of rdBuf; reduced upon every rotation
// Metrics (see [Filch.ExpVar] for details).
writeCalls expvar.Int
readCalls expvar.Int
rotateCalls expvar.Int
callErrors expvar.Int
writeBytes expvar.Int
readBytes expvar.Int
filchedBytes expvar.Int
droppedBytes expvar.Int
storedBytes expvar.Int
}
// ExpVar report metrics about the buffer.
//
// - counter_write_calls: Total number of calls to [Filch.Write]
// (excludes calls when file is closed).
//
// - counter_read_calls: Total number of calls to [Filch.TryReadLine]
// (excludes calls when file is closed or no bytes).
//
// - counter_rotate_calls: Total number of calls to rotate the log files
// (excludes calls when there is nothing to rotate to).
//
// - counter_call_errors: Total number of calls returning errors.
//
// - counter_write_bytes: Total number of bytes written
// (includes bytes filched from stderr).
//
// - counter_read_bytes: Total number of bytes read
// (includes bytes filched from stderr).
//
// - counter_filched_bytes: Total number of bytes filched from stderr.
//
// - counter_dropped_bytes: Total number of bytes dropped
// (includes bytes filched from stderr and lines too long to read).
//
// - gauge_stored_bytes: Current number of bytes stored on disk.
func (f *Filch) ExpVar() expvar.Var {
m := new(expvar.Map)
m.Set("counter_write_calls", &f.writeCalls)
m.Set("counter_read_calls", &f.readCalls)
m.Set("counter_rotate_calls", &f.rotateCalls)
m.Set("counter_call_errors", &f.callErrors)
m.Set("counter_write_bytes", &f.writeBytes)
m.Set("counter_read_bytes", &f.readBytes)
m.Set("counter_filched_bytes", &f.filchedBytes)
m.Set("counter_dropped_bytes", &f.droppedBytes)
m.Set("gauge_stored_bytes", &f.storedBytes)
return m
}
func (f *Filch) unreadReadBuffer() []byte {
return f.rdBuf[f.rdBufIdx:]
}
func (f *Filch) availReadBuffer() []byte {
return f.rdBuf[len(f.rdBuf):cap(f.rdBuf)]
}
func (f *Filch) resetReadBuffer() {
f.rdBufIdx, f.rdBuf = 0, f.rdBuf[:0]
}
func (f *Filch) moveReadBufferToFront() {
f.rdBufIdx, f.rdBuf = 0, f.rdBuf[:copy(f.rdBuf, f.rdBuf[f.rdBufIdx:])]
}
func (f *Filch) growReadBuffer() {
f.rdBuf = slices.Grow(f.rdBuf, cap(f.rdBuf)+1)
}
func (f *Filch) consumeReadBuffer(n int) {
f.rdBufIdx += n
}
func (f *Filch) appendReadBuffer(n int) {
f.rdBuf = f.rdBuf[:len(f.rdBuf)+n]
f.rdBufMaxLen = max(f.rdBufMaxLen, len(f.rdBuf))
}
// TryReadline implements the logtail.Buffer interface.
func (f *Filch) TryReadLine() ([]byte, error) {
func (f *Filch) TryReadLine() (b []byte, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.altscan != nil {
if b, err := f.scan(); b != nil || err != nil {
return b, err
}
if f.older == nil {
return nil, io.EOF
}
f.cur, f.alt = f.alt, f.cur
if f.OrigStderr != nil {
if err := dup2Stderr(f.cur); err != nil {
var tooLong bool // whether we are in a line that is too long
defer func() {
f.consumeReadBuffer(len(b))
if tooLong || len(b) > f.maxLineSize {
f.droppedBytes.Add(int64(len(b)))
b, err = nil, cmp.Or(err, errTooLong)
} else {
f.readBytes.Add(int64(len(b)))
}
if len(b) != 0 || err != nil {
f.readCalls.Add(1)
}
if err != nil {
f.callErrors.Add(1)
}
}()
for {
// Check if unread buffer already has the next line.
unread := f.unreadReadBuffer()
if i := bytes.IndexByte(unread, '\n') + len("\n"); i > 0 {
return unread[:i], nil
}
// Check whether to make space for more data to read.
avail := f.availReadBuffer()
if len(avail) == 0 {
switch {
case len(unread) > f.maxLineSize:
tooLong = true
f.droppedBytes.Add(int64(len(unread)))
f.resetReadBuffer()
case len(unread) < cap(f.rdBuf)/10:
f.moveReadBufferToFront()
default:
f.growReadBuffer()
}
avail = f.availReadBuffer() // invariant: len(avail) > 0
}
// Read data into the available buffer.
n, err := f.older.Read(avail)
f.appendReadBuffer(n)
if err != nil {
if err == io.EOF {
unread = f.unreadReadBuffer()
if len(unread) == 0 {
if err := f.rotateLocked(); err != nil {
return nil, err
}
if f.storedBytes.Value() == 0 {
return nil, nil
}
continue
}
return unread, nil
}
return nil, err
}
}
if _, err := f.alt.Seek(0, io.SeekStart); err != nil {
return nil, err
}
f.altscan = bufio.NewScanner(f.alt)
f.altscan.Buffer(f.buf[:], bufio.MaxScanTokenSize)
f.altscan.Split(splitLines)
return f.scan()
}
func (f *Filch) scan() ([]byte, error) {
if f.altscan.Scan() {
return f.altscan.Bytes(), nil
}
err := f.altscan.Err()
err2 := f.alt.Truncate(0)
_, err3 := f.alt.Seek(0, io.SeekStart)
f.altscan = nil
if err != nil {
return nil, err
}
if err2 != nil {
return nil, err2
}
if err3 != nil {
return nil, err3
}
return nil, nil
}
var alwaysStatForTests bool
// Write implements the logtail.Buffer interface.
func (f *Filch) Write(b []byte) (int, error) {
func (f *Filch) Write(b []byte) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.writeCounter == 100 {
// Check the file size every 100 writes.
f.writeCounter = 0
fi, err := f.cur.Stat()
if f.newer == nil {
return 0, errClosed
}
defer func() {
f.writeCalls.Add(1)
if err != nil {
return 0, err
f.callErrors.Add(1)
}
if fi.Size() >= f.maxFileSize {
// This most likely means we are not draining.
// To limit the amount of space we use, throw away the old logs.
if err := moveContents(f.alt, f.cur); err != nil {
}()
// To make sure we do not write data to disk unbounded
// (in the event that we are not draining fast enough)
// check whether we exceeded maxFileSize.
// If so, then force a file rotation.
if f.newlyWrittenBytes+f.newlyFilchedBytes > f.maxFileSize || f.writeCalls.Value()%100 == 0 || alwaysStatForTests {
f.statAndUpdateBytes()
if f.newlyWrittenBytes+f.newlyFilchedBytes > f.maxFileSize {
if err := f.rotateLocked(); err != nil {
return 0, err
}
}
}
f.writeCounter++
// Write the log entry (appending a newline character if needed).
var newline string
if len(b) == 0 || b[len(b)-1] != '\n' {
bnl := make([]byte, len(b)+1)
copy(bnl, b)
bnl[len(bnl)-1] = '\n'
return f.cur.Write(bnl)
newline = "\n"
f.wrBuf = append(append(f.wrBuf[:0], b...), newline...)
f.wrBufMaxLen = max(f.wrBufMaxLen, len(f.wrBuf))
b = f.wrBuf
}
return f.cur.Write(b)
if len(b) > f.maxLineSize {
for line := range bytes.Lines(b) {
if len(line) > f.maxLineSize {
return 0, errTooLong
}
}
}
n, err = f.newer.Write(b)
f.writeBytes.Add(int64(n))
f.storedBytes.Add(int64(n))
f.newlyWrittenBytes += int64(n)
return n - len(newline), err // subtract possibly appended newline
}
// Close closes the Filch, releasing all os resources.
func (f *Filch) Close() (err error) {
func (f *Filch) statAndUpdateBytes() {
if fi, err := f.newer.Stat(); err == nil {
prevSize := f.newlyWrittenBytes + f.newlyFilchedBytes
filchedBytes := max(0, fi.Size()-prevSize)
f.writeBytes.Add(filchedBytes)
f.filchedBytes.Add(filchedBytes)
f.storedBytes.Add(filchedBytes)
f.newlyFilchedBytes += filchedBytes
}
}
func (f *Filch) storedBytesForTest() int64 {
return must.Get(f.newer.Stat()).Size() + must.Get(f.older.Stat()).Size()
}
var activeStderrWriteForTest sync.RWMutex
// stderrWriteForTest calls [os.Stderr.Write], but respects calls to [waitIdleStderrForTest].
func stderrWriteForTest(b []byte) int {
activeStderrWriteForTest.RLock()
defer activeStderrWriteForTest.RUnlock()
return must.Get(os.Stderr.Write(b))
}
// waitIdleStderrForTest waits until there are no active stderrWriteForTest calls.
func waitIdleStderrForTest() {
activeStderrWriteForTest.Lock()
defer activeStderrWriteForTest.Unlock()
}
// rotateLocked swaps f.newer and f.older such that:
//
// - f.newer will be truncated and future writes will be appended to the end.
// - if [Options.ReplaceStderr], then stderr writes will redirect to f.newer
// - f.older will contain historical data, reads will consume from the start.
// - f.older is guaranteed to be immutable.
//
// There are two reasons for rotating:
//
// - The reader finished reading f.older.
// No data should be lost under this condition.
//
// - The writer exceeded a limit for f.newer.
// Data may be lost under this cxondition.
func (f *Filch) rotateLocked() error {
f.rotateCalls.Add(1)
// Truncate the older file.
if fi, err := f.older.Stat(); err != nil {
return err
} else if fi.Size() > 0 {
// Update dropped bytes.
if pos, err := f.older.Seek(0, io.SeekCurrent); err == nil {
rdPos := pos - int64(len(f.unreadReadBuffer())) // adjust for data already read into the read buffer
f.droppedBytes.Add(max(0, fi.Size()-rdPos))
}
f.resetReadBuffer()
// Truncate the older file and write relative to the start.
if err := f.older.Truncate(0); err != nil {
return err
}
if _, err := f.older.Seek(0, io.SeekStart); err != nil {
return err
}
}
// Swap newer and older.
f.newer, f.older = f.older, f.newer
// If necessary, filch stderr into newer instead of older.
// This must be done after truncation otherwise
// we might lose some stderr data asynchronously written
// right in the middle of a rotation.
// Note that mutex does not prevent stderr writes.
prevSize := f.newlyWrittenBytes + f.newlyFilchedBytes
f.newlyWrittenBytes, f.newlyFilchedBytes = 0, 0
if f.OrigStderr != nil {
if err := dup2Stderr(f.newer); err != nil {
return err
}
}
// Update filched bytes and stored bytes metrics.
// This must be done after filching to newer
// so that f.older.Stat is *mostly* stable.
//
// NOTE: Unfortunately, an asynchronous os.Stderr.Write call
// that is already in progress when we called dup2Stderr
// will still write to the previous FD and
// may not be immediately observable by this Stat call.
// This is fundamentally unsolvable with the current design
// as we cannot synchronize all other os.Stderr.Write calls.
// In rare cases, it is possible that [Filch.TryReadLine] consumes
// the entire older file before the write commits,
// leading to dropped stderr lines.
waitIdleStderrForTest()
if fi, err := f.older.Stat(); err != nil {
return err
} else {
filchedBytes := max(0, fi.Size()-prevSize)
f.writeBytes.Add(filchedBytes)
f.filchedBytes.Add(filchedBytes)
f.storedBytes.Set(fi.Size()) // newer has been truncated, so only older matters
}
// Start reading from the start of older.
if _, err := f.older.Seek(0, io.SeekStart); err != nil {
return err
}
// Garbage collect unnecessarily large buffers.
mayGarbageCollect := func(b []byte, maxLen int) ([]byte, int) {
if cap(b)/4 > maxLen { // if less than 25% utilized
b = slices.Grow([]byte(nil), 2*maxLen)
}
maxLen = 3 * (maxLen / 4) // reduce by 25%
return b, maxLen
}
f.wrBuf, f.wrBufMaxLen = mayGarbageCollect(f.wrBuf, f.wrBufMaxLen)
f.rdBuf, f.rdBufMaxLen = mayGarbageCollect(f.rdBuf, f.rdBufMaxLen)
return nil
}
// Close closes the Filch, releasing all resources.
func (f *Filch) Close() error {
f.mu.Lock()
defer f.mu.Unlock()
var errUnsave, errCloseNew, errCloseOld error
if f.OrigStderr != nil {
if err2 := unsaveStderr(f.OrigStderr); err == nil {
err = err2
}
errUnsave = unsaveStderr(f.OrigStderr)
f.OrigStderr = nil
}
if err2 := f.cur.Close(); err == nil {
err = err2
if f.newer != nil {
errCloseNew = f.newer.Close()
f.newer = nil
}
if err2 := f.alt.Close(); err == nil {
err = err2
if f.older != nil {
errCloseOld = f.older.Close()
f.older = nil
}
return err
return errors.Join(errUnsave, errCloseNew, errCloseOld)
}
// New creates a new filch around two log files, each starting with filePrefix.
@@ -181,14 +453,10 @@ func New(filePrefix string, opts Options) (f *Filch, err error) {
return nil, err
}
mfs := defaultMaxFileSize
if opts.MaxFileSize > 0 {
mfs = opts.MaxFileSize
}
f = &Filch{
OrigStderr: os.Stderr, // temporary, for past logs recovery
maxFileSize: int64(mfs),
}
f = new(Filch)
f.maxLineSize = int(cmp.Or(max(0, opts.MaxLineSize), DefaultMaxLineSize))
f.maxFileSize = int64(cmp.Or(max(0, opts.MaxFileSize), DefaultMaxFileSize))
f.maxFileSize /= 2 // since there are two log files that combine to equal MaxFileSize
// Neither, either, or both files may exist and contain logs from
// the last time the process ran. The three cases are:
@@ -198,35 +466,22 @@ func New(filePrefix string, opts Options) (f *Filch, err error) {
// - both: the files were swapped and were starting to be
// read out, while new logs streamed into the other
// file, but the read out did not complete
if n := fi1.Size() + fi2.Size(); n > 0 {
f.recovered = n
}
switch {
case fi1.Size() > 0 && fi2.Size() == 0:
f.cur, f.alt = f2, f1
f.newer, f.older = f2, f1 // use empty file as newer
case fi2.Size() > 0 && fi1.Size() == 0:
f.cur, f.alt = f1, f2
case fi1.Size() > 0 && fi2.Size() > 0: // both
// We need to pick one of the files to be the elder,
// which we do using the mtime.
var older, newer *os.File
if fi1.ModTime().Before(fi2.ModTime()) {
older, newer = f1, f2
} else {
older, newer = f2, f1
}
if err := moveContents(older, newer); err != nil {
fmt.Fprintf(f.OrigStderr, "filch: recover move failed: %v\n", err)
fmt.Fprintf(older, "filch: recover move failed: %v\n", err)
}
f.cur, f.alt = newer, older
f.newer, f.older = f1, f2 // use empty file as newer
case fi1.ModTime().Before(fi2.ModTime()):
f.newer, f.older = f2, f1 // use older file as older
case fi2.ModTime().Before(fi1.ModTime()):
f.newer, f.older = f1, f2 // use newer file as newer
default:
f.cur, f.alt = f1, f2 // does not matter
f.newer, f.older = f1, f2 // does not matter
}
if f.recovered > 0 {
f.altscan = bufio.NewScanner(f.alt)
f.altscan.Buffer(f.buf[:], bufio.MaxScanTokenSize)
f.altscan.Split(splitLines)
f.writeBytes.Set(fi1.Size() + fi2.Size())
f.storedBytes.Set(fi1.Size() + fi2.Size())
if fi, err := f.newer.Stat(); err == nil {
f.newlyWrittenBytes = fi.Size()
}
f.OrigStderr = nil
@@ -235,50 +490,10 @@ func New(filePrefix string, opts Options) (f *Filch, err error) {
if err != nil {
return nil, err
}
if err := dup2Stderr(f.cur); err != nil {
if err := dup2Stderr(f.newer); err != nil {
return nil, err
}
}
return f, nil
}
func moveContents(dst, src *os.File) (err error) {
defer func() {
_, err2 := src.Seek(0, io.SeekStart)
err3 := src.Truncate(0)
_, err4 := dst.Seek(0, io.SeekStart)
if err == nil {
err = err2
}
if err == nil {
err = err3
}
if err == nil {
err = err4
}
}()
if _, err := src.Seek(0, io.SeekStart); err != nil {
return err
}
if _, err := dst.Seek(0, io.SeekStart); err != nil {
return err
}
if _, err := io.Copy(dst, src); err != nil {
return err
}
return nil
}
func splitLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
return i + 1, data[0 : i+1], nil
}
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}

View File

@@ -0,0 +1,34 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build ts_omit_logtail
package filch
import "os"
type Options struct {
ReplaceStderr bool
MaxLineSize int
MaxFileSize int
}
type Filch struct {
OrigStderr *os.File
}
func (*Filch) TryReadLine() ([]byte, error) {
return nil, nil
}
func (*Filch) Write(b []byte) (int, error) {
return len(b), nil
}
func (f *Filch) Close() error {
return nil
}
func New(string, Options) (*Filch, error) {
return new(Filch), nil
}

View File

@@ -1,13 +1,13 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build wasm || plan9 || tamago
//go:build !ts_omit_logtail && (wasm || plan9 || tamago)
package filch
import (
"os"
)
import "os"
const replaceStderrSupportedForTest = false
func saveStderr() (*os.File, error) {
return os.Stderr, nil

View File

@@ -4,207 +4,369 @@
package filch
import (
"bytes"
"encoding/json"
"fmt"
"io"
"math"
"math/rand/v2"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"unicode"
"unsafe"
"time"
jsonv2 "github.com/go-json-experiment/json"
"tailscale.com/tstest"
"tailscale.com/util/must"
)
func init() { alwaysStatForTests = true }
type filchTest struct {
*Filch
filePrefix string
}
func newFilchTest(t *testing.T, filePrefix string, opts Options) *filchTest {
func newForTest(t *testing.T, filePrefix string, opts Options) *filchTest {
t.Helper()
if filePrefix == "" {
filePrefix = filepath.Join(t.TempDir(), "testlog")
}
f, err := New(filePrefix, opts)
if err != nil {
t.Fatal(err)
}
return &filchTest{Filch: f}
t.Cleanup(func() {
if err := f.Close(); err != nil {
t.Errorf("Close error: %v", err)
}
})
return &filchTest{Filch: f, filePrefix: filePrefix}
}
func (f *filchTest) write(t *testing.T, s string) {
func (f *filchTest) read(t *testing.T, want []byte) {
t.Helper()
if _, err := f.Write([]byte(s)); err != nil {
t.Fatal(err)
if got, err := f.TryReadLine(); err != nil {
t.Fatalf("TryReadLine error: %v", err)
} else if string(got) != string(want) {
t.Errorf("TryReadLine = %q, want %q", got, want)
}
}
func (f *filchTest) read(t *testing.T, want string) {
t.Helper()
if b, err := f.TryReadLine(); err != nil {
t.Fatalf("r.ReadLine() err=%v", err)
} else if got := strings.TrimRightFunc(string(b), unicode.IsSpace); got != want {
t.Errorf("r.ReadLine()=%q, want %q", got, want)
}
}
func (f *filchTest) readEOF(t *testing.T) {
t.Helper()
if b, err := f.TryReadLine(); b != nil || err != nil {
t.Fatalf("r.ReadLine()=%q err=%v, want nil slice", string(b), err)
}
}
func (f *filchTest) close(t *testing.T) {
t.Helper()
if err := f.Close(); err != nil {
t.Fatal(err)
}
}
func TestDropOldLogs(t *testing.T) {
const line1 = "123456789" // 10 bytes (9+newline)
tests := []struct {
write, read int
}{
{10, 10},
{100, 100},
{200, 200},
{250, 150},
{500, 200},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("w%d-r%d", tc.write, tc.read), func(t *testing.T) {
filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false, MaxFileSize: 1000})
defer f.close(t)
// Make filch rotate the logs 3 times
for range tc.write {
f.write(t, line1)
func TestNew(t *testing.T) {
const want1 = "Lorem\nipsum\ndolor\nsit\namet,\nconsectetur\nadipiscing\nelit,\nsed\n"
const want2 = "do\neiusmod\ntempor\nincididunt\nut\nlabore\net\ndolore\nmagna\naliqua.\n"
filePrefix := filepath.Join(t.TempDir(), "testlog")
checkLinesAndCleanup := func() {
t.Helper()
defer os.Remove(filepath.Join(filePrefix + ".log1.txt"))
defer os.Remove(filepath.Join(filePrefix + ".log2.txt"))
f := newForTest(t, filePrefix, Options{})
var got []byte
for {
b := must.Get(f.TryReadLine())
if b == nil {
break
}
// We should only be able to read the last 150 lines
for i := range tc.read {
f.read(t, line1)
if t.Failed() {
t.Logf("could only read %d lines", i)
break
}
}
f.readEOF(t)
})
got = append(got, b...)
}
if string(got) != want1+want2 {
t.Errorf("got %q\nwant %q", got, want1+want2)
}
}
now := time.Now()
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1+want2), 0600))
checkLinesAndCleanup()
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1+want2), 0600))
checkLinesAndCleanup()
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1), 0600))
os.Chtimes(filePrefix+".log1.txt", now.Add(-time.Minute), now.Add(-time.Minute))
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want2), 0600))
os.Chtimes(filePrefix+".log2.txt", now.Add(+time.Minute), now.Add(+time.Minute))
checkLinesAndCleanup()
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want2), 0600))
os.Chtimes(filePrefix+".log1.txt", now.Add(+time.Minute), now.Add(+time.Minute))
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1), 0600))
os.Chtimes(filePrefix+".log2.txt", now.Add(-time.Minute), now.Add(-time.Minute))
checkLinesAndCleanup()
}
func TestQueue(t *testing.T) {
filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false})
f.readEOF(t)
const line1 = "Hello, World!"
const line2 = "This is a test."
const line3 = "Of filch."
f.write(t, line1)
f.write(t, line2)
f.read(t, line1)
f.write(t, line3)
f.read(t, line2)
f.read(t, line3)
f.readEOF(t)
f.write(t, line1)
f.read(t, line1)
f.readEOF(t)
f.close(t)
}
func TestRecover(t *testing.T) {
t.Run("empty", func(t *testing.T) {
filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false})
f.write(t, "hello")
f.read(t, "hello")
f.readEOF(t)
f.close(t)
f = newFilchTest(t, filePrefix, Options{ReplaceStderr: false})
f.readEOF(t)
f.close(t)
})
t.Run("cur", func(t *testing.T) {
filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false})
f.write(t, "hello")
f.close(t)
f = newFilchTest(t, filePrefix, Options{ReplaceStderr: false})
f.read(t, "hello")
f.readEOF(t)
f.close(t)
})
t.Run("alt", func(t *testing.T) {
t.Skip("currently broken on linux, passes on macOS")
/* --- FAIL: TestRecover/alt (0.00s)
filch_test.go:128: r.ReadLine()="world", want "hello"
filch_test.go:129: r.ReadLine()="hello", want "world"
*/
filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false})
f.write(t, "hello")
f.read(t, "hello")
f.write(t, "world")
f.close(t)
f = newFilchTest(t, filePrefix, Options{ReplaceStderr: false})
// TODO(crawshaw): The "hello" log is replayed in recovery.
// We could reduce replays by risking some logs loss.
// What should our policy here be?
f.read(t, "hello")
f.read(t, "world")
f.readEOF(t)
f.close(t)
})
}
func TestFilchStderr(t *testing.T) {
if runtime.GOOS == "windows" {
// TODO(bradfitz): this is broken on Windows but not
// fully sure why. Investigate. But notably, the
// stderrFD variable (defined in filch.go) and set
// below is only ever read in filch_unix.go. So just
// skip this for test for now.
t.Skip("test broken on Windows")
}
func TestConcurrentWriteAndRead(t *testing.T) {
pipeR, pipeW, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer pipeR.Close()
defer pipeW.Close()
defer func() {
pipeW.Close()
switch b, err := io.ReadAll(pipeR); {
case err != nil:
t.Fatalf("ReadAll error: %v", err)
case len(b) > 0:
t.Errorf("unexpected write to fake stderr: %s", b)
}
}()
tstest.Replace(t, &stderrFD, int(pipeW.Fd()))
filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: true})
f.write(t, "hello")
if _, err := fmt.Fprintf(pipeW, "filch\n"); err != nil {
t.Fatal(err)
}
f.read(t, "hello")
f.read(t, "filch")
f.readEOF(t)
f.close(t)
const numWriters = 10
const linesPerWriter = 1000
opts := Options{ReplaceStderr: replaceStderrSupportedForTest, MaxFileSize: math.MaxInt32}
f := newForTest(t, "", opts)
pipeW.Close()
b, err := io.ReadAll(pipeR)
if err != nil {
t.Fatal(err)
// Concurrently write many lines.
var draining sync.RWMutex
var group sync.WaitGroup
defer group.Wait()
data := bytes.Repeat([]byte("X"), 1000)
var runningWriters atomic.Int64
for i := range numWriters {
runningWriters.Add(+1)
group.Go(func() {
defer runningWriters.Add(-1)
var b []byte
for j := range linesPerWriter {
b = fmt.Appendf(b[:0], `{"Index":%d,"Count":%d,"Data":"%s"}`+"\n", i+1, j+1, data[:rand.IntN(len(data))])
draining.RLock()
if i%2 == 0 && opts.ReplaceStderr {
stderrWriteForTest(b)
} else {
must.Get(f.Write(b))
}
draining.RUnlock()
runtime.Gosched()
}
})
}
if len(b) > 0 {
t.Errorf("unexpected write to fake stderr: %s", b)
// Verify that we can read back the lines in an ordered manner.
var lines int
var entry struct{ Index, Count int }
state := make(map[int]int)
checkLine := func() (ok bool) {
b := must.Get(f.TryReadLine())
if len(b) == 0 {
return false
}
entry.Index, entry.Count = 0, 0
if err := jsonv2.Unmarshal(b, &entry); err != nil {
t.Fatalf("json.Unmarshal error: %v", err)
}
if wantCount := state[entry.Index] + 1; entry.Count != wantCount {
t.Fatalf("Index:%d, Count = %d, want %d", entry.Index, entry.Count, wantCount)
}
state[entry.Index] = entry.Count
lines++
return true
}
for lines < numWriters*linesPerWriter {
writersDone := runningWriters.Load() == 0
for range rand.IntN(100) {
runtime.Gosched() // bias towards more writer operations
}
if rand.IntN(100) == 0 {
// Asynchronous read of a single line.
if !checkLine() && writersDone {
t.Fatal("failed to read all lines after all writers done")
}
} else {
// Synchronous reading of all lines.
draining.Lock()
for checkLine() {
}
draining.Unlock()
}
}
}
func TestSizeOf(t *testing.T) {
s := unsafe.Sizeof(Filch{})
if s > 4096 {
t.Fatalf("Filch{} has size %d on %v, decrease size of buf field", s, runtime.GOARCH)
// Test that the
func TestBufferCapacity(t *testing.T) {
f := newForTest(t, "", Options{})
b := bytes.Repeat([]byte("X"), 1000)
for range 1000 {
must.Get(f.Write(b[:rand.IntN(len(b))]))
}
for must.Get(f.TryReadLine()) != nil {
}
if !(10*len(b) < cap(f.rdBuf) && cap(f.rdBuf) < 20*len(b)) {
t.Errorf("cap(rdBuf) = %v, want within [%v:%v]", cap(f.rdBuf), 10*len(b), 20*len(b))
}
must.Get(f.Write(bytes.Repeat([]byte("X"), DefaultMaxLineSize-1)))
must.Get(f.TryReadLine())
wrCap, rdCap := cap(f.wrBuf), cap(f.rdBuf)
// Force another rotation. Buffers should not be GC'd yet.
must.Get(f.TryReadLine())
if cap(f.wrBuf) != wrCap {
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), wrCap)
}
if cap(f.rdBuf) != rdCap {
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), rdCap)
}
// Force many rotations. Buffers should be GC'd.
for range 64 {
t.Logf("cap(f.wrBuf), cap(f.rdBuf) = %d, %d", cap(f.wrBuf), cap(f.rdBuf))
must.Get(f.TryReadLine())
}
if cap(f.wrBuf) != 0 {
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), 0)
}
if cap(f.rdBuf) != 0 {
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), 0)
}
}
func TestMaxLineSize(t *testing.T) {
const maxLineSize = 1000
f := newForTest(t, "", Options{MaxLineSize: maxLineSize})
// Test writing.
b0 := []byte(strings.Repeat("X", maxLineSize-len("\n")) + "\n")
must.Get(f.Write(b0))
b1 := []byte(strings.Repeat("X", maxLineSize))
if _, err := f.Write(b1); err != errTooLong {
t.Errorf("Write error = %v, want errTooLong", err)
}
b2 := bytes.Repeat(b0, 2)
must.Get(f.Write(b2))
if f.storedBytesForTest() != int64(len(b0)+len(b2)) {
t.Errorf("storedBytes = %v, want %v", f.storedBytesForTest(), int64(len(b0)+len(b2)))
}
// Test reading.
f.read(t, b0)
f.read(t, b0)
f.read(t, b0)
f.read(t, nil) // should trigger rotate
if f.storedBytesForTest() != 0 {
t.Errorf("storedBytes = %v, want 0", f.storedBytesForTest())
}
// Test writing
must.Get(f.Write([]byte("hello")))
must.Get(f.Write(b0))
must.Get(f.Write([]byte("goodbye")))
// Test reading.
f.Close()
f = newForTest(t, f.filePrefix, Options{MaxLineSize: 10})
f.read(t, []byte("hello\n"))
if _, err := f.TryReadLine(); err != errTooLong {
t.Errorf("Write error = %v, want errTooLong", err)
}
f.read(t, []byte("goodbye\n"))
// Check that the read buffer does not need to be as long
// as the overly long line to skip over it.
if cap(f.rdBuf) >= maxLineSize/2 {
t.Errorf("cap(rdBuf) = %v, want <%v", cap(f.rdBuf), maxLineSize/2)
}
}
func TestMaxFileSize(t *testing.T) {
if replaceStderrSupportedForTest {
t.Run("ReplaceStderr:true", func(t *testing.T) { testMaxFileSize(t, true) })
}
t.Run("ReplaceStderr:false", func(t *testing.T) { testMaxFileSize(t, false) })
}
func testMaxFileSize(t *testing.T, replaceStderr bool) {
opts := Options{ReplaceStderr: replaceStderr, MaxFileSize: 1000}
f := newForTest(t, "", opts)
// Write lots of data.
const calls = 1000
var group sync.WaitGroup
var filchedBytes, writeBytes int64
group.Go(func() {
if !opts.ReplaceStderr {
return
}
var b []byte
for i := range calls {
b = fmt.Appendf(b[:0], `{"FilchIndex":%d}`+"\n", i+1)
filchedBytes += int64(stderrWriteForTest(b))
}
})
group.Go(func() {
var b []byte
for i := range calls {
b = fmt.Appendf(b[:0], `{"WriteIndex":%d}`+"\n", i+1)
writeBytes += int64(must.Get(f.Write(b)))
}
})
group.Wait()
f.statAndUpdateBytes()
droppedBytes := filchedBytes + writeBytes - f.storedBytes.Value()
switch {
case f.writeCalls.Value() != calls:
t.Errorf("writeCalls = %v, want %d", f.writeCalls.Value(), calls)
case f.readCalls.Value() != 0:
t.Errorf("readCalls = %v, want 0", f.readCalls.Value())
case f.rotateCalls.Value() == 0:
t.Errorf("rotateCalls = 0, want >0")
case f.callErrors.Value() != 0:
t.Errorf("callErrors = %v, want 0", f.callErrors.Value())
case f.writeBytes.Value() != writeBytes+filchedBytes:
t.Errorf("writeBytes = %v, want %d", f.writeBytes.Value(), writeBytes+filchedBytes)
case f.readBytes.Value() != 0:
t.Errorf("readBytes = %v, want 0", f.readBytes.Value())
case f.filchedBytes.Value() != filchedBytes:
t.Errorf("filchedBytes = %v, want %d", f.filchedBytes.Value(), filchedBytes)
case f.droppedBytes.Value() != droppedBytes:
t.Errorf("droppedBytes = %v, want %d", f.droppedBytes.Value(), droppedBytes)
case f.droppedBytes.Value() == 0:
t.Errorf("droppedBytes = 0, want >0")
case f.storedBytes.Value() != f.storedBytesForTest():
t.Errorf("storedBytes = %v, want %d", f.storedBytes.Value(), f.storedBytesForTest())
case f.storedBytes.Value() > int64(opts.MaxFileSize) && !opts.ReplaceStderr:
// If ReplaceStderr, it is impossible for MaxFileSize to be
// strictly adhered to since asynchronous os.Stderr.Write calls
// do not trigger any checks to enforce maximum file size.
t.Errorf("storedBytes = %v, want <=%d", f.storedBytes.Value(), opts.MaxFileSize)
}
// Read back the data and verify that the entries are in order.
var readBytes, lastFilchIndex, lastWriteIndex int64
for {
b := must.Get(f.TryReadLine())
if len(b) == 0 {
break
}
var entry struct{ FilchIndex, WriteIndex int64 }
must.Do(json.Unmarshal(b, &entry))
if entry.FilchIndex == 0 && entry.WriteIndex == 0 {
t.Errorf("both indexes are zero")
}
if entry.FilchIndex > 0 {
if entry.FilchIndex <= lastFilchIndex {
t.Errorf("FilchIndex = %d, want >%d", entry.FilchIndex, lastFilchIndex)
}
lastFilchIndex = entry.FilchIndex
}
if entry.WriteIndex > 0 {
if entry.WriteIndex <= lastWriteIndex {
t.Errorf("WriteIndex = %d, want >%d", entry.WriteIndex, lastWriteIndex)
}
lastWriteIndex = entry.WriteIndex
}
readBytes += int64(len(b))
}
if f.readBytes.Value() != readBytes {
t.Errorf("readBytes = %v, want %v", f.readBytes.Value(), readBytes)
}
}

View File

@@ -1,7 +1,7 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !windows && !wasm && !plan9 && !tamago
//go:build !ts_omit_logtail && !windows && !wasm && !plan9 && !tamago
package filch
@@ -11,6 +11,8 @@ import (
"golang.org/x/sys/unix"
)
const replaceStderrSupportedForTest = true
func saveStderr() (*os.File, error) {
fd, err := unix.Dup(stderrFD)
if err != nil {

View File

@@ -1,6 +1,8 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !ts_omit_logtail && windows
package filch
import (
@@ -9,6 +11,8 @@ import (
"syscall"
)
const replaceStderrSupportedForTest = true
var kernel32 = syscall.MustLoadDLL("kernel32.dll")
var procSetStdHandle = kernel32.MustFindProc("SetStdHandle")