From 0b7087c4012096ea5329b021606b0fc012ef6856 Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Tue, 4 Feb 2025 12:51:27 -0800 Subject: [PATCH] logpolicy: expose MaxBufferSize and MaxUploadSize options (#14903) Updates tailscale/corp#26342 Signed-off-by: Joe Tsai --- logpolicy/logpolicy.go | 26 ++++++++++++++++++++------ logtail/logtail.go | 11 ++++++++--- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/logpolicy/logpolicy.go b/logpolicy/logpolicy.go index b9b813718..1419fff65 100644 --- a/logpolicy/logpolicy.go +++ b/logpolicy/logpolicy.go @@ -503,6 +503,18 @@ type Options struct { // If nil, [TransportOptions.New] is used to construct a new client // with that particular transport sending logs to the default logs server. HTTPC *http.Client + + // MaxBufferSize is the maximum size of the log buffer. + // This controls the amount of logs that can be temporarily stored + // before the logs can be successfully upload. + // If zero, a default buffer size is chosen. + MaxBufferSize int + + // MaxUploadSize is the maximum size per upload. + // This should only be set by clients that have been authenticated + // with the logging service as having a higher upload limit. + // If zero, a default upload size is chosen. + MaxUploadSize int } // New returns a new log policy (a logger and its instance ID). @@ -603,10 +615,11 @@ func (opts Options) New() *Policy { } conf := logtail.Config{ - Collection: newc.Collection, - PrivateID: newc.PrivateID, - Stderr: logWriter{console}, - CompressLogs: true, + Collection: newc.Collection, + PrivateID: newc.PrivateID, + Stderr: logWriter{console}, + CompressLogs: true, + MaxUploadSize: opts.MaxUploadSize, } if opts.Collection == logtail.CollectionNode { conf.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta @@ -620,7 +633,7 @@ func (opts Options) New() *Policy { } else { // Only attach an on-disk filch buffer if we are going to be sending logs. // No reason to persist them locally just to drop them later. - attachFilchBuffer(&conf, opts.Dir, opts.CmdName, opts.Logf) + attachFilchBuffer(&conf, opts.Dir, opts.CmdName, opts.MaxBufferSize, opts.Logf) conf.HTTPC = opts.HTTPC if conf.HTTPC == nil { @@ -676,9 +689,10 @@ func (opts Options) New() *Policy { // attachFilchBuffer creates an on-disk ring buffer using filch and attaches // it to the logtail config. Note that this is optional; if no buffer is set, // logtail will use an in-memory buffer. -func attachFilchBuffer(conf *logtail.Config, dir, cmdName string, logf logger.Logf) { +func attachFilchBuffer(conf *logtail.Config, dir, cmdName string, maxFileSize int, logf logger.Logf) { filchOptions := filch.Options{ ReplaceStderr: redirectStderrToLogPanics(), + MaxFileSize: maxFileSize, } filchPrefix := filepath.Join(dir, cmdName) diff --git a/logtail/logtail.go b/logtail/logtail.go index 0e9c4f288..a617397f9 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -6,6 +6,7 @@ package logtail import ( "bytes" + "cmp" "context" "crypto/rand" "encoding/binary" @@ -78,6 +79,7 @@ type Config struct { StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only Buffer Buffer // temp storage, if nil a MemoryBuffer CompressLogs bool // whether to compress the log uploads + MaxUploadSize int // maximum upload size; 0 means using the default // MetricsDelta, if non-nil, is a func that returns an encoding // delta in clientmetrics to upload alongside existing logs. @@ -157,6 +159,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix, lowMem: cfg.LowMemory, buffer: cfg.Buffer, + maxUploadSize: cfg.MaxUploadSize, skipClientTime: cfg.SkipClientTime, drainWake: make(chan struct{}, 1), sentinel: make(chan int32, 16), @@ -192,6 +195,7 @@ type Logger struct { skipClientTime bool netMonitor *netmon.Monitor buffer Buffer + maxUploadSize int drainWake chan struct{} // signal to speed up drain drainBuf []byte // owned by drainPending for reuse flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay @@ -325,7 +329,7 @@ func (l *Logger) drainPending() (b []byte) { } }() - maxLen := maxSize + maxLen := cmp.Or(l.maxUploadSize, maxSize) if l.lowMem { // When operating in a low memory environment, it is better to upload // in multiple operations than it is to allocate a large body and OOM. @@ -775,9 +779,10 @@ func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte { // That's okay as the Tailscale log service limit is actually 2*maxSize. // However, so long as logging applications aim to target the maxSize limit, // there should be no trouble eventually uploading logs. - if len(src) > maxSize { + maxLen := cmp.Or(l.maxUploadSize, maxSize) + if len(src) > maxLen { errDetail := fmt.Sprintf("entry too large: %d bytes", len(src)) - errData := appendTruncatedString(nil, src, maxSize/len(`\uffff`)) // escaping could increase size + errData := appendTruncatedString(nil, src, maxLen/len(`\uffff`)) // escaping could increase size dst = append(dst, '{') dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)