mirror of
https://github.com/tailscale/tailscale.git
synced 2025-02-21 04:18:38 +00:00
logpolicy: expose MaxBufferSize and MaxUploadSize options (#14903)
Updates tailscale/corp#26342 Signed-off-by: Joe Tsai <joetsai@digital-static.net>
This commit is contained in:
parent
00fe8845b1
commit
0b7087c401
@ -503,6 +503,18 @@ type Options struct {
|
|||||||
// If nil, [TransportOptions.New] is used to construct a new client
|
// If nil, [TransportOptions.New] is used to construct a new client
|
||||||
// with that particular transport sending logs to the default logs server.
|
// with that particular transport sending logs to the default logs server.
|
||||||
HTTPC *http.Client
|
HTTPC *http.Client
|
||||||
|
|
||||||
|
// MaxBufferSize is the maximum size of the log buffer.
|
||||||
|
// This controls the amount of logs that can be temporarily stored
|
||||||
|
// before the logs can be successfully upload.
|
||||||
|
// If zero, a default buffer size is chosen.
|
||||||
|
MaxBufferSize int
|
||||||
|
|
||||||
|
// MaxUploadSize is the maximum size per upload.
|
||||||
|
// This should only be set by clients that have been authenticated
|
||||||
|
// with the logging service as having a higher upload limit.
|
||||||
|
// If zero, a default upload size is chosen.
|
||||||
|
MaxUploadSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new log policy (a logger and its instance ID).
|
// New returns a new log policy (a logger and its instance ID).
|
||||||
@ -603,10 +615,11 @@ func (opts Options) New() *Policy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
conf := logtail.Config{
|
conf := logtail.Config{
|
||||||
Collection: newc.Collection,
|
Collection: newc.Collection,
|
||||||
PrivateID: newc.PrivateID,
|
PrivateID: newc.PrivateID,
|
||||||
Stderr: logWriter{console},
|
Stderr: logWriter{console},
|
||||||
CompressLogs: true,
|
CompressLogs: true,
|
||||||
|
MaxUploadSize: opts.MaxUploadSize,
|
||||||
}
|
}
|
||||||
if opts.Collection == logtail.CollectionNode {
|
if opts.Collection == logtail.CollectionNode {
|
||||||
conf.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta
|
conf.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta
|
||||||
@ -620,7 +633,7 @@ func (opts Options) New() *Policy {
|
|||||||
} else {
|
} else {
|
||||||
// Only attach an on-disk filch buffer if we are going to be sending logs.
|
// Only attach an on-disk filch buffer if we are going to be sending logs.
|
||||||
// No reason to persist them locally just to drop them later.
|
// No reason to persist them locally just to drop them later.
|
||||||
attachFilchBuffer(&conf, opts.Dir, opts.CmdName, opts.Logf)
|
attachFilchBuffer(&conf, opts.Dir, opts.CmdName, opts.MaxBufferSize, opts.Logf)
|
||||||
conf.HTTPC = opts.HTTPC
|
conf.HTTPC = opts.HTTPC
|
||||||
|
|
||||||
if conf.HTTPC == nil {
|
if conf.HTTPC == nil {
|
||||||
@ -676,9 +689,10 @@ func (opts Options) New() *Policy {
|
|||||||
// attachFilchBuffer creates an on-disk ring buffer using filch and attaches
|
// attachFilchBuffer creates an on-disk ring buffer using filch and attaches
|
||||||
// it to the logtail config. Note that this is optional; if no buffer is set,
|
// it to the logtail config. Note that this is optional; if no buffer is set,
|
||||||
// logtail will use an in-memory buffer.
|
// logtail will use an in-memory buffer.
|
||||||
func attachFilchBuffer(conf *logtail.Config, dir, cmdName string, logf logger.Logf) {
|
func attachFilchBuffer(conf *logtail.Config, dir, cmdName string, maxFileSize int, logf logger.Logf) {
|
||||||
filchOptions := filch.Options{
|
filchOptions := filch.Options{
|
||||||
ReplaceStderr: redirectStderrToLogPanics(),
|
ReplaceStderr: redirectStderrToLogPanics(),
|
||||||
|
MaxFileSize: maxFileSize,
|
||||||
}
|
}
|
||||||
filchPrefix := filepath.Join(dir, cmdName)
|
filchPrefix := filepath.Join(dir, cmdName)
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ package logtail
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
@ -78,6 +79,7 @@ type Config struct {
|
|||||||
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
|
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
|
||||||
Buffer Buffer // temp storage, if nil a MemoryBuffer
|
Buffer Buffer // temp storage, if nil a MemoryBuffer
|
||||||
CompressLogs bool // whether to compress the log uploads
|
CompressLogs bool // whether to compress the log uploads
|
||||||
|
MaxUploadSize int // maximum upload size; 0 means using the default
|
||||||
|
|
||||||
// MetricsDelta, if non-nil, is a func that returns an encoding
|
// MetricsDelta, if non-nil, is a func that returns an encoding
|
||||||
// delta in clientmetrics to upload alongside existing logs.
|
// delta in clientmetrics to upload alongside existing logs.
|
||||||
@ -157,6 +159,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
|
|||||||
url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
|
url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
|
||||||
lowMem: cfg.LowMemory,
|
lowMem: cfg.LowMemory,
|
||||||
buffer: cfg.Buffer,
|
buffer: cfg.Buffer,
|
||||||
|
maxUploadSize: cfg.MaxUploadSize,
|
||||||
skipClientTime: cfg.SkipClientTime,
|
skipClientTime: cfg.SkipClientTime,
|
||||||
drainWake: make(chan struct{}, 1),
|
drainWake: make(chan struct{}, 1),
|
||||||
sentinel: make(chan int32, 16),
|
sentinel: make(chan int32, 16),
|
||||||
@ -192,6 +195,7 @@ type Logger struct {
|
|||||||
skipClientTime bool
|
skipClientTime bool
|
||||||
netMonitor *netmon.Monitor
|
netMonitor *netmon.Monitor
|
||||||
buffer Buffer
|
buffer Buffer
|
||||||
|
maxUploadSize int
|
||||||
drainWake chan struct{} // signal to speed up drain
|
drainWake chan struct{} // signal to speed up drain
|
||||||
drainBuf []byte // owned by drainPending for reuse
|
drainBuf []byte // owned by drainPending for reuse
|
||||||
flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
|
flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
|
||||||
@ -325,7 +329,7 @@ func (l *Logger) drainPending() (b []byte) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
maxLen := maxSize
|
maxLen := cmp.Or(l.maxUploadSize, maxSize)
|
||||||
if l.lowMem {
|
if l.lowMem {
|
||||||
// When operating in a low memory environment, it is better to upload
|
// When operating in a low memory environment, it is better to upload
|
||||||
// in multiple operations than it is to allocate a large body and OOM.
|
// in multiple operations than it is to allocate a large body and OOM.
|
||||||
@ -775,9 +779,10 @@ func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
|
|||||||
// That's okay as the Tailscale log service limit is actually 2*maxSize.
|
// That's okay as the Tailscale log service limit is actually 2*maxSize.
|
||||||
// However, so long as logging applications aim to target the maxSize limit,
|
// However, so long as logging applications aim to target the maxSize limit,
|
||||||
// there should be no trouble eventually uploading logs.
|
// there should be no trouble eventually uploading logs.
|
||||||
if len(src) > maxSize {
|
maxLen := cmp.Or(l.maxUploadSize, maxSize)
|
||||||
|
if len(src) > maxLen {
|
||||||
errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
|
errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
|
||||||
errData := appendTruncatedString(nil, src, maxSize/len(`\uffff`)) // escaping could increase size
|
errData := appendTruncatedString(nil, src, maxLen/len(`\uffff`)) // escaping could increase size
|
||||||
|
|
||||||
dst = append(dst, '{')
|
dst = append(dst, '{')
|
||||||
dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
|
dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user