tailscale/util/zstdframe/options.go
Joe Tsai d4bfe34ba7
util/zstdframe: add package for stateless zstd compression (#11481)
The Go zstd package is not friendly for stateless zstd compression.
Passing around multiple zstd.Encoder just for stateless compression
is a waste of memory since the memory is never freed and seldom
used if no compression operations are happening.

For performance, we pool the relevant Encoder/Decoder
with the specific options set.

Functionally, this package is a wrapper over the Go zstd package
with a more ergonomic API for stateless operations.

This package can be used to cleanup various pre-existing zstd.Encoder
pools or one-off handlers spread throughout our codebases.

Performance:

	BenchmarkEncode/Best               1690        610926 ns/op      25.78 MB/s           1 B/op          0 allocs/op
	    zstd_test.go:137: memory: 50.336 MiB
	    zstd_test.go:138: ratio:  3.269x
	BenchmarkEncode/Better            10000        100939 ns/op     156.04 MB/s           0 B/op          0 allocs/op
	    zstd_test.go:137: memory: 20.399 MiB
	    zstd_test.go:138: ratio:  3.131x
	BenchmarkEncode/Default            15775         74976 ns/op     210.08 MB/s         105 B/op          0 allocs/op
	    zstd_test.go:137: memory: 1.586 MiB
	    zstd_test.go:138: ratio:  3.064x
	BenchmarkEncode/Fastest            23222         53977 ns/op     291.81 MB/s          26 B/op          0 allocs/op
	    zstd_test.go:137: memory: 599.458 KiB
	    zstd_test.go:138: ratio:  2.898x
	BenchmarkEncode/FastestLowMemory                   23361         50789 ns/op     310.13 MB/s          15 B/op          0 allocs/op
	    zstd_test.go:137: memory: 334.458 KiB
	    zstd_test.go:138: ratio:  2.898x
	BenchmarkEncode/FastestNoChecksum                  23086         50253 ns/op     313.44 MB/s          26 B/op          0 allocs/op
	    zstd_test.go:137: memory: 599.458 KiB
	    zstd_test.go:138: ratio:  2.900x

	BenchmarkDecode/Checksum                           70794         17082 ns/op     300.96 MB/s           4 B/op          0 allocs/op
	    zstd_test.go:163: memory: 316.438 KiB
	BenchmarkDecode/NoChecksum                         74935         15990 ns/op     321.51 MB/s           4 B/op          0 allocs/op
	    zstd_test.go:163: memory: 316.438 KiB
	BenchmarkDecode/LowMemory                          71043         16739 ns/op     307.13 MB/s           0 B/op          0 allocs/op
	    zstd_test.go:163: memory: 79.347 KiB

We can see that the options are taking effect where compression ratio improves
with higher levels and compression speed diminishes.
We can also see that LowMemory takes effect where the pooled coder object
references less memory than other cases.
We can see that the pooling is taking effect as there are 0 amortized allocations.

Additional performance:

	BenchmarkEncodeParallel/zstd-24                     1857        619264 ns/op        1796 B/op         49 allocs/op
	BenchmarkEncodeParallel/zstdframe-24                1954        532023 ns/op        4293 B/op         49 allocs/op
	BenchmarkDecodeParallel/zstd-24                     5288        197281 ns/op        2516 B/op         49 allocs/op
	BenchmarkDecodeParallel/zstdframe-24                6441        196254 ns/op        2513 B/op         49 allocs/op

In concurrent usage, handling the pooling in this package
has a marginal benefit over the zstd package,
which relies on a Go channel as the pooling mechanism.
In particular, coders can be freed by the GC when not in use.
Coders can be shared throughout the program if they use this package
instead of multiple independent pools doing the same thing.
The allocations are unrelated to pooling as they're caused by the spawning of goroutines.

Updates #cleanup
Updates tailscale/corp#18514
Updates tailscale/corp#17653
Updates tailscale/corp#18005

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
2024-03-21 11:39:20 -07:00

184 lines
5.1 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package zstdframe
import (
"math/bits"
"sync"
"github.com/klauspost/compress/zstd"
"tailscale.com/util/must"
)
// Option is an option that can be passed to [AppendEncode] or [AppendDecode].
type Option interface{ isOption() }
type encoderLevel int
// Constants that implement [Option] and can be passed to [AppendEncode].
const (
FastestCompression = encoderLevel(zstd.SpeedFastest)
DefaultCompression = encoderLevel(zstd.SpeedDefault)
BetterCompression = encoderLevel(zstd.SpeedBetterCompression)
BestCompression = encoderLevel(zstd.SpeedBestCompression)
)
func (encoderLevel) isOption() {}
// EncoderLevel specifies the compression level when encoding.
//
// This exists for compatibility with [zstd.EncoderLevel] values.
// Most usages should directly use one of the following constants:
// - [FastestCompression]
// - [DefaultCompression]
// - [BetterCompression]
// - [BestCompression]
//
// By default, [DefaultCompression] is chosen.
// This option is ignored when decoding.
func EncoderLevel(level zstd.EncoderLevel) Option { return encoderLevel(level) }
type withChecksum bool
func (withChecksum) isOption() {}
// WithChecksum specifies whether to produce a checksum when encoding,
// or whether to verify the checksum when decoding.
// By default, checksums are produced and verified.
func WithChecksum(check bool) Option { return withChecksum(check) }
type maxDecodedSize uint64
func (maxDecodedSize) isOption() {}
// MaxDecodedSize specifies the maximum decoded size and
// is used to protect against hostile content.
// By default, there is no limit.
// This option is ignored when encoding.
func MaxDecodedSize(maxSize uint64) Option {
return maxDecodedSize(maxSize)
}
type lowMemory bool
func (lowMemory) isOption() {}
// LowMemory specifies that the encoder and decoder should aim to use
// lower amounts of memory at the cost of speed.
// By default, more memory used for better speed.
func LowMemory(low bool) Option { return lowMemory(low) }
var encoderPools sync.Map // map[encoderOptions]*sync.Pool -> *zstd.Encoder
type encoderOptions struct {
level zstd.EncoderLevel
checksum bool
lowMemory bool
}
type encoder struct {
pool *sync.Pool
*zstd.Encoder
}
func getEncoder(opts ...Option) encoder {
eopts := encoderOptions{level: zstd.SpeedDefault, checksum: true}
for _, opt := range opts {
switch opt := opt.(type) {
case encoderLevel:
eopts.level = zstd.EncoderLevel(opt)
case withChecksum:
eopts.checksum = bool(opt)
case lowMemory:
eopts.lowMemory = bool(opt)
}
}
vpool, ok := encoderPools.Load(eopts)
if !ok {
vpool, _ = encoderPools.LoadOrStore(eopts, new(sync.Pool))
}
pool := vpool.(*sync.Pool)
enc, _ := pool.Get().(*zstd.Encoder)
if enc == nil {
enc = must.Get(zstd.NewWriter(nil,
// Set concurrency=1 to ensure synchronous operation.
zstd.WithEncoderConcurrency(1),
// In stateless compression, the data is already in a single buffer,
// so we might as well encode it as a single segment,
// which ensures that the Frame_Content_Size is always populated,
// informing decoders up-front the expected decompressed size.
zstd.WithSingleSegment(true),
// Ensure strict compliance with RFC 8878, section 3.1.,
// where zstandard "is made up of one or more frames".
zstd.WithZeroFrames(true),
zstd.WithEncoderLevel(eopts.level),
zstd.WithEncoderCRC(eopts.checksum),
zstd.WithLowerEncoderMem(eopts.lowMemory)))
}
return encoder{pool, enc}
}
func putEncoder(e encoder) { e.pool.Put(e.Encoder) }
var decoderPools sync.Map // map[decoderOptions]*sync.Pool -> *zstd.Decoder
type decoderOptions struct {
maxSizeLog2 int
checksum bool
lowMemory bool
}
type decoder struct {
pool *sync.Pool
*zstd.Decoder
maxSize uint64
}
func getDecoder(opts ...Option) decoder {
maxSize := uint64(1 << 63)
dopts := decoderOptions{maxSizeLog2: 63, checksum: true}
for _, opt := range opts {
switch opt := opt.(type) {
case maxDecodedSize:
maxSize = uint64(opt)
dopts.maxSizeLog2 = 64 - bits.LeadingZeros64(maxSize-1)
dopts.maxSizeLog2 = min(max(10, dopts.maxSizeLog2), 63)
case withChecksum:
dopts.checksum = bool(opt)
case lowMemory:
dopts.lowMemory = bool(opt)
}
}
vpool, ok := decoderPools.Load(dopts)
if !ok {
vpool, _ = decoderPools.LoadOrStore(dopts, new(sync.Pool))
}
pool := vpool.(*sync.Pool)
dec, _ := pool.Get().(*zstd.Decoder)
if dec == nil {
dec = must.Get(zstd.NewReader(nil,
// Set concurrency=1 to ensure synchronous operation.
zstd.WithDecoderConcurrency(1),
zstd.WithDecoderMaxMemory(1<<dopts.maxSizeLog2),
zstd.IgnoreChecksum(!dopts.checksum),
zstd.WithDecoderLowmem(dopts.lowMemory)))
}
return decoder{pool, dec, maxSize}
}
func putDecoder(d decoder) { d.pool.Put(d.Decoder) }
func (d decoder) DecodeAll(src, dst []byte) ([]byte, error) {
// We only configure DecodeAll to enforce MaxDecodedSize by powers-of-two.
// Perform a more fine grain check based on the exact value.
dst2, err := d.Decoder.DecodeAll(src, dst)
if err == nil && uint64(len(dst2)-len(dst)) > d.maxSize {
err = zstd.ErrDecoderSizeExceeded
}
return dst2, err
}