logtail/filch: limit buffer file size to 50MB

Signed-off-by: Maisem Ali <maisem@tailscale.com>
This commit is contained in:
Maisem Ali 2021-10-29 12:04:57 -07:00 committed by Maisem Ali
parent 55b6753c11
commit 05e55f4a0b
2 changed files with 66 additions and 2 deletions

View File

@ -17,8 +17,11 @@
var stderrFD = 2 // a variable for testing var stderrFD = 2 // a variable for testing
const defaultMaxFileSize = 50 << 20
type Options struct { type Options struct {
ReplaceStderr bool // dup over fd 2 so everything written to stderr comes here ReplaceStderr bool // dup over fd 2 so everything written to stderr comes here
MaxFileSize int
} }
// A Filch uses two alternating files as a simplistic ring buffer. // A Filch uses two alternating files as a simplistic ring buffer.
@ -30,6 +33,10 @@ type Filch struct {
alt *os.File alt *os.File
altscan *bufio.Scanner altscan *bufio.Scanner
recovered int64 recovered int64
maxFileSize int64
writeCounter int
// buf is an initial buffer for altscan. // buf is an initial buffer for altscan.
// As of August 2021, 99.96% of all log lines // As of August 2021, 99.96% of all log lines
// are below 4096 bytes in length. // are below 4096 bytes in length.
@ -38,7 +45,7 @@ type Filch struct {
// so that the whole struct takes 4096 bytes // so that the whole struct takes 4096 bytes
// (less on 32 bit platforms). // (less on 32 bit platforms).
// This reduces allocation waste. // This reduces allocation waste.
buf [4096 - 48]byte buf [4096 - 64]byte
} }
// TryReadline implements the logtail.Buffer interface. // TryReadline implements the logtail.Buffer interface.
@ -91,6 +98,22 @@ func (f *Filch) scan() ([]byte, error) {
func (f *Filch) Write(b []byte) (int, error) { func (f *Filch) Write(b []byte) (int, error) {
f.mu.Lock() f.mu.Lock()
defer f.mu.Unlock() defer f.mu.Unlock()
if f.writeCounter == 100 {
// Check the file size every 100 writes.
f.writeCounter = 0
fi, err := f.cur.Stat()
if err != nil {
return 0, err
}
if fi.Size() >= f.maxFileSize {
// This most likely means we are not draining.
// To limit the amount of space we use, throw away the old logs.
if err := moveContents(f.alt, f.cur); err != nil {
return 0, err
}
}
}
f.writeCounter++
if len(b) == 0 || b[len(b)-1] != '\n' { if len(b) == 0 || b[len(b)-1] != '\n' {
bnl := make([]byte, len(b)+1) bnl := make([]byte, len(b)+1)
@ -159,8 +182,13 @@ func New(filePrefix string, opts Options) (f *Filch, err error) {
return nil, err return nil, err
} }
mfs := defaultMaxFileSize
if opts.MaxFileSize > 0 {
mfs = opts.MaxFileSize
}
f = &Filch{ f = &Filch{
OrigStderr: os.Stderr, // temporary, for past logs recovery OrigStderr: os.Stderr, // temporary, for past logs recovery
maxFileSize: int64(mfs),
} }
// Neither, either, or both files may exist and contain logs from // Neither, either, or both files may exist and contain logs from
@ -234,6 +262,9 @@ func moveContents(dst, src *os.File) (err error) {
if _, err := src.Seek(0, io.SeekStart); err != nil { if _, err := src.Seek(0, io.SeekStart); err != nil {
return err return err
} }
if _, err := dst.Seek(0, io.SeekStart); err != nil {
return err
}
if _, err := io.Copy(dst, src); err != nil { if _, err := io.Copy(dst, src); err != nil {
return err return err
} }

View File

@ -57,6 +57,39 @@ func (f *filchTest) close(t *testing.T) {
} }
} }
func TestDropOldLogs(t *testing.T) {
const line1 = "123456789" // 10 bytes (9+newline)
tests := []struct {
write, read int
}{
{10, 10},
{100, 100},
{200, 200},
{250, 150},
{500, 200},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("w%d-r%d", tc.write, tc.read), func(t *testing.T) {
filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false, MaxFileSize: 1000})
defer f.close(t)
// Make filch rotate the logs 3 times
for i := 0; i < tc.write; i++ {
f.write(t, line1)
}
// We should only be able to read the last 150 lines
for i := 0; i < tc.read; i++ {
f.read(t, line1)
if t.Failed() {
t.Logf("could only read %d lines", i)
break
}
}
f.readEOF(t)
})
}
}
func TestQueue(t *testing.T) { func TestQueue(t *testing.T) {
filePrefix := t.TempDir() filePrefix := t.TempDir()
f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false})