logtail: perform zstd compression with single-segment mode

The "single-segment" mode encodes the expected uncompressed size
in the zstd frame, which the server can use as an optimization.

Also, add a new Config.CompressTransport flag.
Compression is an implementation detail of log uploading.
It could be zstd today or a different format in the future.
Most users of logtail should not care what compression is used.

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
This commit is contained in:
Joe Tsai 2022-07-25 16:53:37 -07:00
parent 96f73b3894
commit 7b93b5f80b
4 changed files with 39 additions and 36 deletions

View File

@ -81,7 +81,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
github.com/klauspost/compress/huff0 from github.com/klauspost/compress/zstd
github.com/klauspost/compress/internal/cpuinfo from github.com/klauspost/compress/zstd
github.com/klauspost/compress/internal/snapref from github.com/klauspost/compress/zstd
github.com/klauspost/compress/zstd from tailscale.com/smallzstd
github.com/klauspost/compress/zstd from tailscale.com/smallzstd+
github.com/klauspost/compress/zstd/internal/xxhash from github.com/klauspost/compress/zstd
github.com/kortschak/wol from tailscale.com/ipn/ipnlocal
LD github.com/kr/fs from github.com/pkg/sftp

View File

@ -43,7 +43,6 @@
"tailscale.com/net/tshttpproxy"
"tailscale.com/paths"
"tailscale.com/safesocket"
"tailscale.com/smallzstd"
"tailscale.com/types/logger"
"tailscale.com/util/clientmetric"
"tailscale.com/util/racebuild"
@ -520,17 +519,11 @@ func New(collection string) *Policy {
}
c := logtail.Config{
Collection: newc.Collection,
PrivateID: newc.PrivateID,
Stderr: logWriter{console},
NewZstdEncoder: func() logtail.Encoder {
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
HTTPC: &http.Client{Transport: NewLogtailTransport(logtail.DefaultHost)},
Collection: newc.Collection,
PrivateID: newc.PrivateID,
Stderr: logWriter{console},
CompressTransport: true,
HTTPC: &http.Client{Transport: NewLogtailTransport(logtail.DefaultHost)},
}
if collection == logtail.CollectionNode {
c.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta

View File

@ -21,8 +21,10 @@
"sync/atomic"
"time"
"github.com/klauspost/compress/zstd"
"tailscale.com/logtail/backoff"
"tailscale.com/net/interfaces"
"tailscale.com/smallzstd"
"tailscale.com/syncs"
tslogger "tailscale.com/types/logger"
"tailscale.com/wgengine/monitor"
@ -44,17 +46,18 @@ type Encoder interface {
}
type Config struct {
Collection string // collection name, a domain name
PrivateID PrivateID // machine-specific private identifier
BaseURL string // if empty defaults to "https://log.tailscale.io"
HTTPC *http.Client // if empty defaults to http.DefaultClient
SkipClientTime bool // if true, client_time is not written to logs
LowMemory bool // if true, logtail minimizes memory use
TimeNow func() time.Time // if set, subsitutes uses of time.Now
Stderr io.Writer // if set, logs are sent here instead of os.Stderr
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
Buffer Buffer // temp storage, if nil a MemoryBuffer
NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
Collection string // collection name, a domain name
PrivateID PrivateID // machine-specific private identifier
BaseURL string // if empty defaults to "https://log.tailscale.io"
HTTPC *http.Client // if empty defaults to http.DefaultClient
SkipClientTime bool // if true, client_time is not written to logs
LowMemory bool // if true, logtail minimizes memory use
TimeNow func() time.Time // if set, subsitutes uses of time.Now
Stderr io.Writer // if set, logs are sent here instead of os.Stderr
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
Buffer Buffer // temp storage, if nil a MemoryBuffer
NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
CompressTransport bool // if true, uses compression for logs transmission
// MetricsDelta, if non-nil, is a func that returns an encoding
// delta in clientmetrics to upload alongside existing logs.
@ -131,6 +134,19 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
shutdownStart: make(chan struct{}),
shutdownDone: make(chan struct{}),
}
if cfg.CompressTransport && cfg.NewZstdEncoder == nil {
cfg.NewZstdEncoder = func() Encoder {
// Per RFC 8478, section 3.1.1.1.1.1, the "SingleSegment" format
// ensures that the frame contains the uncompressed content size.
// This is a useful signal to the server for what
// uncompressed output size to expect.
w, err := smallzstd.NewEncoder(nil, zstd.WithSingleSegment(true))
if err != nil {
panic(err)
}
return w
}
}
if cfg.NewZstdEncoder != nil {
l.zstdEncoder = cfg.NewZstdEncoder()
}

View File

@ -210,18 +210,12 @@ func (s *Server) start() error {
return fmt.Errorf("error creating filch: %w", err)
}
c := logtail.Config{
Collection: lpc.Collection,
PrivateID: lpc.PrivateID,
Stderr: ioutil.Discard, // log everything to Buffer
Buffer: f,
NewZstdEncoder: func() logtail.Encoder {
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
HTTPC: &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost)},
Collection: lpc.Collection,
PrivateID: lpc.PrivateID,
Stderr: ioutil.Discard, // log everything to Buffer
Buffer: f,
CompressTransport: true,
HTTPC: &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost)},
}
s.logtail = logtail.NewLogger(c, logf)