mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-28 14:08:26 +00:00
The filch implementation is fairly broken: * When Filch.cur exceeds MaxFileSize, it calls moveContents to copy the entirety of cur into alt (while holding the write lock). By nature, this is the movement of a lot of data in a hot path, meaning that all log calls will be globally blocked! It also means that log uploads will be blocked during the move. * The implementation of moveContents is buggy in that it copies data from cur into the start of alt, but fails to truncate alt to the number of bytes copied. Consequently, there are unrelated lines near the end, leading to out-of-order lines when being read back. * Data filched via stderr do not directly respect MaxFileSize, which is only checked every 100 Filch.Write calls. This means that it is possible that the file grows far beyond the specified max file size before moveContents is called. * If both log files have data when New is called, it also copies the entirety of cur into alt. This can block the startup of a process copying lots of data before the process can do any useful work. * TryReadLine is implemented using bufio.Scanner. Unfortunately, it will choke on any lines longer than bufio.MaxScanTokenSize, rather than gracefully skip over them. The re-implementation avoids a lot of these problems by fundamentally eliminating the need for moveContent. We enforce MaxFileSize by simply rotating the log files whenever the current file exceeds MaxFileSize/2. This is a constant-time operation regardless of file size. To more gracefully handle lines longer than bufio.MaxScanTokenSize, we skip over these lines (without growing the read buffer) and report an error. This allows subsequent lines to be read. In order to improve debugging, we add a lot of metrics. Note that the the mechanism of dup2 with stderr is inherently racy with a the two file approach. The order of operations during a rotation is carefully chosen to reduce the race window to be as short as possible. Thus, this is slightly less racy than before. Updates tailscale/corp#21363 Signed-off-by: Joe Tsai <joetsai@digital-static.net>
384 lines
11 KiB
Go
384 lines
11 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package filch
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand/v2"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
jsonv2 "github.com/go-json-experiment/json"
|
|
"tailscale.com/tstest"
|
|
"tailscale.com/util/must"
|
|
)
|
|
|
|
func init() { alwaysStatForTests = true }
|
|
|
|
type filchTest struct {
|
|
*Filch
|
|
|
|
filePrefix string
|
|
}
|
|
|
|
func newForTest(t *testing.T, filePrefix string, opts Options) *filchTest {
|
|
t.Helper()
|
|
if filePrefix == "" {
|
|
filePrefix = filepath.Join(t.TempDir(), "testlog")
|
|
}
|
|
f, err := New(filePrefix, opts)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if err := f.Close(); err != nil {
|
|
t.Errorf("Close error: %v", err)
|
|
}
|
|
})
|
|
return &filchTest{Filch: f, filePrefix: filePrefix}
|
|
}
|
|
|
|
func (f *filchTest) read(t *testing.T, want []byte) {
|
|
t.Helper()
|
|
if got, err := f.TryReadLine(); err != nil {
|
|
t.Fatalf("TryReadLine error: %v", err)
|
|
} else if string(got) != string(want) {
|
|
t.Errorf("TryReadLine = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestNew(t *testing.T) {
|
|
const want1 = "Lorem\nipsum\ndolor\nsit\namet,\nconsectetur\nadipiscing\nelit,\nsed\n"
|
|
const want2 = "do\neiusmod\ntempor\nincididunt\nut\nlabore\net\ndolore\nmagna\naliqua.\n"
|
|
filePrefix := filepath.Join(t.TempDir(), "testlog")
|
|
checkLinesAndCleanup := func() {
|
|
t.Helper()
|
|
defer os.Remove(filepath.Join(filePrefix + ".log1.txt"))
|
|
defer os.Remove(filepath.Join(filePrefix + ".log2.txt"))
|
|
f := newForTest(t, filePrefix, Options{})
|
|
var got []byte
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if b == nil {
|
|
break
|
|
}
|
|
got = append(got, b...)
|
|
}
|
|
if string(got) != want1+want2 {
|
|
t.Errorf("got %q\nwant %q", got, want1+want2)
|
|
}
|
|
}
|
|
now := time.Now()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
checkLinesAndCleanup()
|
|
}
|
|
|
|
func setupStderr(t *testing.T) {
|
|
t.Helper()
|
|
pipeR, pipeW, err := os.Pipe()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() { pipeR.Close() })
|
|
t.Cleanup(func() {
|
|
switch b, err := io.ReadAll(pipeR); {
|
|
case err != nil:
|
|
t.Fatalf("ReadAll error: %v", err)
|
|
case len(b) > 0:
|
|
t.Errorf("unexpected write to fake stderr: %s", b)
|
|
}
|
|
})
|
|
t.Cleanup(func() { pipeW.Close() })
|
|
tstest.Replace(t, &stderrFD, int(pipeW.Fd()))
|
|
tstest.Replace(t, &os.Stderr, pipeW)
|
|
}
|
|
|
|
func TestConcurrentWriteAndRead(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
setupStderr(t)
|
|
}
|
|
|
|
const numWriters = 10
|
|
const linesPerWriter = 1000
|
|
opts := Options{ReplaceStderr: replaceStderrSupportedForTest, MaxFileSize: math.MaxInt32}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Concurrently write many lines.
|
|
var draining sync.RWMutex
|
|
var group sync.WaitGroup
|
|
defer group.Wait()
|
|
data := bytes.Repeat([]byte("X"), 1000)
|
|
var runningWriters atomic.Int64
|
|
for i := range numWriters {
|
|
runningWriters.Add(+1)
|
|
group.Go(func() {
|
|
defer runningWriters.Add(-1)
|
|
var b []byte
|
|
for j := range linesPerWriter {
|
|
b = fmt.Appendf(b[:0], `{"Index":%d,"Count":%d,"Data":"%s"}`+"\n", i+1, j+1, data[:rand.IntN(len(data))])
|
|
draining.RLock()
|
|
if i%2 == 0 && opts.ReplaceStderr {
|
|
stderrWriteForTest(b)
|
|
} else {
|
|
must.Get(f.Write(b))
|
|
}
|
|
draining.RUnlock()
|
|
runtime.Gosched()
|
|
}
|
|
})
|
|
}
|
|
|
|
// Verify that we can read back the lines in an ordered manner.
|
|
var lines int
|
|
var entry struct{ Index, Count int }
|
|
state := make(map[int]int)
|
|
checkLine := func() (ok bool) {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
return false
|
|
}
|
|
entry.Index, entry.Count = 0, 0
|
|
if err := jsonv2.Unmarshal(b, &entry); err != nil {
|
|
t.Fatalf("json.Unmarshal error: %v", err)
|
|
}
|
|
if wantCount := state[entry.Index] + 1; entry.Count != wantCount {
|
|
t.Fatalf("Index:%d, Count = %d, want %d", entry.Index, entry.Count, wantCount)
|
|
}
|
|
state[entry.Index] = entry.Count
|
|
lines++
|
|
return true
|
|
}
|
|
for lines < numWriters*linesPerWriter {
|
|
writersDone := runningWriters.Load() == 0
|
|
for range rand.IntN(100) {
|
|
runtime.Gosched() // bias towards more writer operations
|
|
}
|
|
|
|
if rand.IntN(100) == 0 {
|
|
// Asynchronous read of a single line.
|
|
if !checkLine() && writersDone {
|
|
t.Fatal("failed to read all lines after all writers done")
|
|
}
|
|
} else {
|
|
// Synchronous reading of all lines.
|
|
draining.Lock()
|
|
for checkLine() {
|
|
}
|
|
draining.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test that the
|
|
func TestBufferCapacity(t *testing.T) {
|
|
f := newForTest(t, "", Options{})
|
|
b := bytes.Repeat([]byte("X"), 1000)
|
|
for range 1000 {
|
|
must.Get(f.Write(b[:rand.IntN(len(b))]))
|
|
}
|
|
for must.Get(f.TryReadLine()) != nil {
|
|
}
|
|
if !(10*len(b) < cap(f.rdBuf) && cap(f.rdBuf) < 20*len(b)) {
|
|
t.Errorf("cap(rdBuf) = %v, want within [%v:%v]", cap(f.rdBuf), 10*len(b), 20*len(b))
|
|
}
|
|
|
|
must.Get(f.Write(bytes.Repeat([]byte("X"), DefaultMaxLineSize-1)))
|
|
must.Get(f.TryReadLine())
|
|
wrCap, rdCap := cap(f.wrBuf), cap(f.rdBuf)
|
|
|
|
// Force another rotation. Buffers should not be GC'd yet.
|
|
must.Get(f.TryReadLine())
|
|
if cap(f.wrBuf) != wrCap {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), wrCap)
|
|
}
|
|
if cap(f.rdBuf) != rdCap {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), rdCap)
|
|
}
|
|
|
|
// Force many rotations. Buffers should be GC'd.
|
|
for range 64 {
|
|
t.Logf("cap(f.wrBuf), cap(f.rdBuf) = %d, %d", cap(f.wrBuf), cap(f.rdBuf))
|
|
must.Get(f.TryReadLine())
|
|
}
|
|
if cap(f.wrBuf) != 0 {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), 0)
|
|
}
|
|
if cap(f.rdBuf) != 0 {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), 0)
|
|
}
|
|
}
|
|
|
|
func TestMaxLineSize(t *testing.T) {
|
|
const maxLineSize = 1000
|
|
f := newForTest(t, "", Options{MaxLineSize: maxLineSize})
|
|
|
|
// Test writing.
|
|
b0 := []byte(strings.Repeat("X", maxLineSize-len("\n")) + "\n")
|
|
must.Get(f.Write(b0))
|
|
b1 := []byte(strings.Repeat("X", maxLineSize))
|
|
if _, err := f.Write(b1); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
b2 := bytes.Repeat(b0, 2)
|
|
must.Get(f.Write(b2))
|
|
if f.storedBytesForTest() != int64(len(b0)+len(b2)) {
|
|
t.Errorf("storedBytes = %v, want %v", f.storedBytesForTest(), int64(len(b0)+len(b2)))
|
|
}
|
|
|
|
// Test reading.
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, nil) // should trigger rotate
|
|
if f.storedBytesForTest() != 0 {
|
|
t.Errorf("storedBytes = %v, want 0", f.storedBytesForTest())
|
|
}
|
|
|
|
// Test writing
|
|
must.Get(f.Write([]byte("hello")))
|
|
must.Get(f.Write(b0))
|
|
must.Get(f.Write([]byte("goodbye")))
|
|
|
|
// Test reading.
|
|
f.Close()
|
|
f = newForTest(t, f.filePrefix, Options{MaxLineSize: 10})
|
|
f.read(t, []byte("hello\n"))
|
|
if _, err := f.TryReadLine(); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
f.read(t, []byte("goodbye\n"))
|
|
|
|
// Check that the read buffer does not need to be as long
|
|
// as the overly long line to skip over it.
|
|
if cap(f.rdBuf) >= maxLineSize/2 {
|
|
t.Errorf("cap(rdBuf) = %v, want <%v", cap(f.rdBuf), maxLineSize/2)
|
|
}
|
|
}
|
|
|
|
func TestMaxFileSize(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
t.Run("ReplaceStderr:true", func(t *testing.T) { testMaxFileSize(t, true) })
|
|
}
|
|
t.Run("ReplaceStderr:false", func(t *testing.T) { testMaxFileSize(t, false) })
|
|
}
|
|
|
|
func testMaxFileSize(t *testing.T, replaceStderr bool) {
|
|
if replaceStderr {
|
|
setupStderr(t)
|
|
}
|
|
|
|
opts := Options{ReplaceStderr: replaceStderr, MaxFileSize: 1000}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Write lots of data.
|
|
const calls = 1000
|
|
var group sync.WaitGroup
|
|
var filchedBytes, writeBytes int64
|
|
group.Go(func() {
|
|
if !opts.ReplaceStderr {
|
|
return
|
|
}
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"FilchIndex":%d}`+"\n", i+1)
|
|
filchedBytes += int64(stderrWriteForTest(b))
|
|
}
|
|
})
|
|
group.Go(func() {
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"WriteIndex":%d}`+"\n", i+1)
|
|
writeBytes += int64(must.Get(f.Write(b)))
|
|
}
|
|
})
|
|
group.Wait()
|
|
f.statAndUpdateBytes()
|
|
droppedBytes := filchedBytes + writeBytes - f.storedBytes.Value()
|
|
|
|
switch {
|
|
case f.writeCalls.Value() != calls:
|
|
t.Errorf("writeCalls = %v, want %d", f.writeCalls.Value(), calls)
|
|
case f.readCalls.Value() != 0:
|
|
t.Errorf("readCalls = %v, want 0", f.readCalls.Value())
|
|
case f.rotateCalls.Value() == 0:
|
|
t.Errorf("rotateCalls = 0, want >0")
|
|
case f.callErrors.Value() != 0:
|
|
t.Errorf("callErrors = %v, want 0", f.callErrors.Value())
|
|
case f.writeBytes.Value() != writeBytes+filchedBytes:
|
|
t.Errorf("writeBytes = %v, want %d", f.writeBytes.Value(), writeBytes+filchedBytes)
|
|
case f.readBytes.Value() != 0:
|
|
t.Errorf("readBytes = %v, want 0", f.readBytes.Value())
|
|
case f.filchedBytes.Value() != filchedBytes:
|
|
t.Errorf("filchedBytes = %v, want %d", f.filchedBytes.Value(), filchedBytes)
|
|
case f.droppedBytes.Value() != droppedBytes:
|
|
t.Errorf("droppedBytes = %v, want %d", f.droppedBytes.Value(), droppedBytes)
|
|
case f.droppedBytes.Value() == 0:
|
|
t.Errorf("droppedBytes = 0, want >0")
|
|
case f.storedBytes.Value() != f.storedBytesForTest():
|
|
t.Errorf("storedBytes = %v, want %d", f.storedBytes.Value(), f.storedBytesForTest())
|
|
case f.storedBytes.Value() > int64(opts.MaxFileSize) && !opts.ReplaceStderr:
|
|
// If ReplaceStderr, it is impossible for MaxFileSize to be
|
|
// strictly adhered to since asynchronous os.Stderr.Write calls
|
|
// do not trigger any checks to enforce maximum file size.
|
|
t.Errorf("storedBytes = %v, want <=%d", f.storedBytes.Value(), opts.MaxFileSize)
|
|
}
|
|
|
|
// Read back the data and verify that the entries are in order.
|
|
var readBytes, lastFilchIndex, lastWriteIndex int64
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
break
|
|
}
|
|
var entry struct{ FilchIndex, WriteIndex int64 }
|
|
must.Do(json.Unmarshal(b, &entry))
|
|
if entry.FilchIndex == 0 && entry.WriteIndex == 0 {
|
|
t.Errorf("both indexes are zero")
|
|
}
|
|
if entry.FilchIndex > 0 {
|
|
if entry.FilchIndex <= lastFilchIndex {
|
|
t.Errorf("FilchIndex = %d, want >%d", entry.FilchIndex, lastFilchIndex)
|
|
}
|
|
lastFilchIndex = entry.FilchIndex
|
|
}
|
|
if entry.WriteIndex > 0 {
|
|
if entry.WriteIndex <= lastWriteIndex {
|
|
t.Errorf("WriteIndex = %d, want >%d", entry.WriteIndex, lastWriteIndex)
|
|
}
|
|
lastWriteIndex = entry.WriteIndex
|
|
}
|
|
readBytes += int64(len(b))
|
|
}
|
|
|
|
if f.readBytes.Value() != readBytes {
|
|
t.Errorf("readBytes = %v, want %v", f.readBytes.Value(), readBytes)
|
|
}
|
|
}
|