mirror of
https://github.com/restic/restic.git
synced 2025-05-28 10:18:36 +00:00
repository: randomly distribute blobs over two pack files
This commit is contained in:
parent
c617364d15
commit
62453f9356
@ -3,8 +3,10 @@ package repository
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"io"
|
||||
"math/big"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
@ -33,17 +35,20 @@ type packerManager struct {
|
||||
queueFn func(ctx context.Context, t restic.BlobType, p *packer) error
|
||||
|
||||
pm sync.Mutex
|
||||
packer *packer
|
||||
packers []*packer
|
||||
packSize uint
|
||||
}
|
||||
|
||||
const defaultPackerCount = 2
|
||||
|
||||
// newPackerManager returns a new packer manager which writes temporary files
|
||||
// to a temporary directory
|
||||
func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager {
|
||||
func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, packerCount int, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager {
|
||||
return &packerManager{
|
||||
tpe: tpe,
|
||||
key: key,
|
||||
queueFn: queueFn,
|
||||
packers: make([]*packer, packerCount),
|
||||
packSize: packSize,
|
||||
}
|
||||
}
|
||||
@ -52,13 +57,15 @@ func (r *packerManager) Flush(ctx context.Context) error {
|
||||
r.pm.Lock()
|
||||
defer r.pm.Unlock()
|
||||
|
||||
if r.packer != nil {
|
||||
debug.Log("manually flushing pending pack")
|
||||
err := r.queueFn(ctx, r.tpe, r.packer)
|
||||
if err != nil {
|
||||
return err
|
||||
for i, packer := range r.packers {
|
||||
if packer != nil {
|
||||
debug.Log("manually flushing pending pack")
|
||||
err := r.queueFn(ctx, r.tpe, packer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.packers[i] = nil
|
||||
}
|
||||
r.packer = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -67,20 +74,9 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
|
||||
r.pm.Lock()
|
||||
defer r.pm.Unlock()
|
||||
|
||||
var err error
|
||||
packer := r.packer
|
||||
// use separate packer if compressed length is larger than the packsize
|
||||
// this speeds up the garbage collection of oversized blobs and reduces the cache size
|
||||
// as the oversize blobs are only downloaded if necessary
|
||||
if len(ciphertext) >= int(r.packSize) || r.packer == nil {
|
||||
packer, err = r.newPacker()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// don't store packer for oversized blob
|
||||
if r.packer == nil {
|
||||
r.packer = packer
|
||||
}
|
||||
packer, err := r.pickPacker(len(ciphertext))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// save ciphertext
|
||||
@ -95,10 +91,9 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
|
||||
debug.Log("pack is not full enough (%d bytes)", packer.Size())
|
||||
return size, nil
|
||||
}
|
||||
if packer == r.packer {
|
||||
// forget full packer
|
||||
r.packer = nil
|
||||
}
|
||||
|
||||
// forget full packer
|
||||
r.forgetPacker(packer)
|
||||
|
||||
// call while holding lock to prevent findPacker from creating new packers if the uploaders are busy
|
||||
// else write the pack to the backend
|
||||
@ -110,6 +105,55 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
|
||||
return size + packer.HeaderOverhead(), nil
|
||||
}
|
||||
|
||||
func randomInt(max int) (int, error) {
|
||||
rangeSize := big.NewInt(int64(max))
|
||||
randomInt, err := rand.Int(rand.Reader, rangeSize)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return int(randomInt.Int64()), nil
|
||||
}
|
||||
|
||||
// pickPacker returns or creates a randomly selected packer into which the blob should be stored. If the
|
||||
// ciphertext is larger than the packSize, a new packer is returned.
|
||||
func (r *packerManager) pickPacker(ciphertextLen int) (*packer, error) {
|
||||
// use separate packer if compressed length is larger than the packsize
|
||||
// this speeds up the garbage collection of oversized blobs and reduces the cache size
|
||||
// as the oversize blobs are only downloaded if necessary
|
||||
if ciphertextLen >= int(r.packSize) {
|
||||
return r.newPacker()
|
||||
}
|
||||
|
||||
// randomly distribute blobs onto multiple packer instances. This makes it harder for
|
||||
// an attacker to learn at which points a file was chunked and therefore mitigates the attack described in
|
||||
// https://www.daemonology.net/blog/chunking-attacks.pdf .
|
||||
// See https://github.com/restic/restic/issues/5291#issuecomment-2746146193 for details on the mitigation.
|
||||
idx, err := randomInt(len(r.packers))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// retrieve packer or get a new one
|
||||
packer := r.packers[idx]
|
||||
if packer == nil {
|
||||
packer, err = r.newPacker()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.packers[idx] = packer
|
||||
}
|
||||
return packer, nil
|
||||
}
|
||||
|
||||
// forgetPacker drops the given packer from the internal list. This is used to forget full packers.
|
||||
func (r *packerManager) forgetPacker(packer *packer) {
|
||||
for i, p := range r.packers {
|
||||
if packer == p {
|
||||
r.packers[i] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// findPacker returns a packer for a new blob of size bytes. Either a new one is
|
||||
// created or one is returned that already has some blobs.
|
||||
func (r *packerManager) newPacker() (pck *packer, err error) {
|
||||
|
@ -63,7 +63,7 @@ func testPackerManager(t testing.TB) int64 {
|
||||
rnd := rand.New(rand.NewSource(randomSeed))
|
||||
|
||||
savedBytes := 0
|
||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *packer) error {
|
||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, defaultPackerCount, func(ctx context.Context, tp restic.BlobType, p *packer) error {
|
||||
err := p.Finalize()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -75,8 +75,8 @@ func testPackerManager(t testing.TB) int64 {
|
||||
blobBuf := make([]byte, maxBlobSize)
|
||||
|
||||
bytes := fillPacks(t, rnd, pm, blobBuf)
|
||||
// bytes does not include the last packs header
|
||||
test.Equals(t, savedBytes, bytes+36)
|
||||
// bytes does not include the last pack headers
|
||||
test.Assert(t, savedBytes == (bytes+36) || savedBytes == (bytes+72), "unexpected number of saved bytes, got %v, expected %v", savedBytes, bytes)
|
||||
|
||||
t.Logf("saved %d bytes", bytes)
|
||||
return int64(bytes)
|
||||
@ -85,7 +85,7 @@ func testPackerManager(t testing.TB) int64 {
|
||||
func TestPackerManagerWithOversizeBlob(t *testing.T) {
|
||||
packFiles := 0
|
||||
sizeLimit := uint(512 * 1024)
|
||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *packer) error {
|
||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, defaultPackerCount, func(ctx context.Context, tp restic.BlobType, p *packer) error {
|
||||
packFiles++
|
||||
return nil
|
||||
})
|
||||
@ -97,7 +97,7 @@ func TestPackerManagerWithOversizeBlob(t *testing.T) {
|
||||
test.OK(t, pm.Flush(context.TODO()))
|
||||
|
||||
// oversized blob must be stored in a separate packfile
|
||||
test.Equals(t, packFiles, 2)
|
||||
test.Assert(t, packFiles == 2 || packFiles == 3, "unexpected number of packfiles %v, expected 2 or 3", packFiles)
|
||||
}
|
||||
|
||||
func BenchmarkPackerManager(t *testing.B) {
|
||||
@ -115,7 +115,7 @@ func BenchmarkPackerManager(t *testing.B) {
|
||||
|
||||
for i := 0; i < t.N; i++ {
|
||||
rnd.Seed(randomSeed)
|
||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *packer) error {
|
||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, defaultPackerCount, func(ctx context.Context, t restic.BlobType, p *packer) error {
|
||||
return nil
|
||||
})
|
||||
fillPacks(t, rnd, pm, blobBuf)
|
||||
|
95
internal/repository/prune_internal_test.go
Normal file
95
internal/repository/prune_internal_test.go
Normal file
@ -0,0 +1,95 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// TestPruneMaxUnusedDuplicate checks that MaxUnused correctly accounts for duplicates.
|
||||
//
|
||||
// Create a repository containing blobs a to d that are stored in packs as follows:
|
||||
// - a, d
|
||||
// - b, d
|
||||
// - c, d
|
||||
// All blobs should be kept during prune, but the duplicates should be gone afterwards.
|
||||
// The special construction ensures that each pack contains a used, non-duplicate blob.
|
||||
// This ensures that special cases that delete completely duplicate packs files do not
|
||||
// apply.
|
||||
func TestPruneMaxUnusedDuplicate(t *testing.T) {
|
||||
seed := time.Now().UnixNano()
|
||||
random := rand.New(rand.NewSource(seed))
|
||||
t.Logf("rand initialized with seed %d", seed)
|
||||
|
||||
repo, _, _ := TestRepositoryWithVersion(t, 0)
|
||||
// ensure blobs are assembled into packs as expected
|
||||
repo.packerCount = 1
|
||||
// large blobs to prevent repacking due to too small packsize
|
||||
const blobSize = 1024 * 1024
|
||||
|
||||
bufs := [][]byte{}
|
||||
for i := 0; i < 4; i++ {
|
||||
// use uniform length for simpler control via MaxUnusedBytes
|
||||
buf := make([]byte, blobSize)
|
||||
random.Read(buf)
|
||||
bufs = append(bufs, buf)
|
||||
}
|
||||
keep := restic.NewBlobSet()
|
||||
|
||||
for _, blobs := range [][][]byte{
|
||||
{bufs[0], bufs[3]},
|
||||
{bufs[1], bufs[3]},
|
||||
{bufs[2], bufs[3]},
|
||||
} {
|
||||
var wg errgroup.Group
|
||||
repo.StartPackUploader(context.TODO(), &wg)
|
||||
|
||||
for _, blob := range blobs {
|
||||
id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, restic.ID{}, true)
|
||||
keep.Insert(restic.BlobHandle{Type: restic.DataBlob, ID: id})
|
||||
rtest.OK(t, err)
|
||||
}
|
||||
|
||||
rtest.OK(t, repo.Flush(context.Background()))
|
||||
}
|
||||
|
||||
opts := PruneOptions{
|
||||
MaxRepackBytes: math.MaxUint64,
|
||||
// non-zero number of unused bytes, that is nevertheless smaller than a single blob
|
||||
// setting this to zero would bypass the unused/duplicate size accounting that should
|
||||
// be tested here
|
||||
MaxUnusedBytes: func(used uint64) (unused uint64) { return blobSize / 2 },
|
||||
}
|
||||
|
||||
plan, err := PlanPrune(context.TODO(), opts, repo, func(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet) error {
|
||||
for blob := range keep {
|
||||
usedBlobs.Insert(blob)
|
||||
}
|
||||
return nil
|
||||
}, &progress.NoopPrinter{})
|
||||
rtest.OK(t, err)
|
||||
|
||||
rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{}))
|
||||
|
||||
rsize := plan.Stats().Size
|
||||
remainingUnusedSize := rsize.Duplicate + rsize.Unused - rsize.Remove - rsize.Repackrm
|
||||
maxUnusedSize := opts.MaxUnusedBytes(rsize.Used)
|
||||
rtest.Assert(t, remainingUnusedSize <= maxUnusedSize, "too much unused data remains got %v, expected less than %v", remainingUnusedSize, maxUnusedSize)
|
||||
|
||||
// divide by blobSize to ignore pack file overhead
|
||||
rtest.Equals(t, rsize.Used/blobSize, uint64(4))
|
||||
rtest.Equals(t, rsize.Duplicate/blobSize, uint64(2))
|
||||
rtest.Equals(t, rsize.Unused, uint64(0))
|
||||
rtest.Equals(t, rsize.Remove, uint64(0))
|
||||
rtest.Equals(t, rsize.Repack/blobSize, uint64(4))
|
||||
rtest.Equals(t, rsize.Repackrm/blobSize, uint64(2))
|
||||
rtest.Equals(t, rsize.Unref, uint64(0))
|
||||
rtest.Equals(t, rsize.Uncompressed, uint64(0))
|
||||
}
|
@ -115,85 +115,6 @@ func TestPrune(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPruneMaxUnusedDuplicate checks that MaxUnused correctly accounts for duplicates.
|
||||
//
|
||||
// Create a repository containing blobs a to d that are stored in packs as follows:
|
||||
// - a, d
|
||||
// - b, d
|
||||
// - c, d
|
||||
// All blobs should be kept during prune, but the duplicates should be gone afterwards.
|
||||
// The special construction ensures that each pack contains a used, non-duplicate blob.
|
||||
// This ensures that special cases that delete completely duplicate packs files do not
|
||||
// apply.
|
||||
func TestPruneMaxUnusedDuplicate(t *testing.T) {
|
||||
seed := time.Now().UnixNano()
|
||||
random := rand.New(rand.NewSource(seed))
|
||||
t.Logf("rand initialized with seed %d", seed)
|
||||
|
||||
repo, _, _ := repository.TestRepositoryWithVersion(t, 0)
|
||||
// large blobs to prevent repacking due to too small packsize
|
||||
const blobSize = 1024 * 1024
|
||||
|
||||
bufs := [][]byte{}
|
||||
for i := 0; i < 4; i++ {
|
||||
// use uniform length for simpler control via MaxUnusedBytes
|
||||
buf := make([]byte, blobSize)
|
||||
random.Read(buf)
|
||||
bufs = append(bufs, buf)
|
||||
}
|
||||
keep := restic.NewBlobSet()
|
||||
|
||||
for _, blobs := range [][][]byte{
|
||||
{bufs[0], bufs[3]},
|
||||
{bufs[1], bufs[3]},
|
||||
{bufs[2], bufs[3]},
|
||||
} {
|
||||
var wg errgroup.Group
|
||||
repo.StartPackUploader(context.TODO(), &wg)
|
||||
|
||||
for _, blob := range blobs {
|
||||
id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, restic.ID{}, true)
|
||||
keep.Insert(restic.BlobHandle{Type: restic.DataBlob, ID: id})
|
||||
rtest.OK(t, err)
|
||||
}
|
||||
|
||||
rtest.OK(t, repo.Flush(context.Background()))
|
||||
}
|
||||
|
||||
opts := repository.PruneOptions{
|
||||
MaxRepackBytes: math.MaxUint64,
|
||||
// non-zero number of unused bytes, that is nevertheless smaller than a single blob
|
||||
// setting this to zero would bypass the unused/duplicate size accounting that should
|
||||
// be tested here
|
||||
MaxUnusedBytes: func(used uint64) (unused uint64) { return blobSize / 2 },
|
||||
}
|
||||
|
||||
plan, err := repository.PlanPrune(context.TODO(), opts, repo, func(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet) error {
|
||||
for blob := range keep {
|
||||
usedBlobs.Insert(blob)
|
||||
}
|
||||
return nil
|
||||
}, &progress.NoopPrinter{})
|
||||
rtest.OK(t, err)
|
||||
|
||||
rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{}))
|
||||
|
||||
rsize := plan.Stats().Size
|
||||
remainingUnusedSize := rsize.Duplicate + rsize.Unused - rsize.Remove - rsize.Repackrm
|
||||
maxUnusedSize := opts.MaxUnusedBytes(rsize.Used)
|
||||
rtest.Assert(t, remainingUnusedSize <= maxUnusedSize, "too much unused data remains got %v, expected less than %v", remainingUnusedSize, maxUnusedSize)
|
||||
|
||||
// divide by blobSize to ignore pack file overhead
|
||||
rtest.Equals(t, rsize.Used/blobSize, uint64(4))
|
||||
rtest.Equals(t, rsize.Duplicate/blobSize, uint64(2))
|
||||
rtest.Equals(t, rsize.Unused, uint64(0))
|
||||
rtest.Equals(t, rsize.Remove, uint64(0))
|
||||
rtest.Equals(t, rsize.Repack/blobSize, uint64(4))
|
||||
rtest.Equals(t, rsize.Repackrm/blobSize, uint64(2))
|
||||
rtest.Equals(t, rsize.Unref, uint64(0))
|
||||
rtest.Equals(t, rsize.Uncompressed, uint64(0))
|
||||
}
|
||||
|
||||
/*
|
||||
1.) create repository with packsize of 2M.
|
||||
2.) create enough data for 11 packfiles (31 packs)
|
||||
|
@ -42,10 +42,11 @@ type Repository struct {
|
||||
|
||||
opts Options
|
||||
|
||||
packerWg *errgroup.Group
|
||||
uploader *packerUploader
|
||||
treePM *packerManager
|
||||
dataPM *packerManager
|
||||
packerWg *errgroup.Group
|
||||
uploader *packerUploader
|
||||
treePM *packerManager
|
||||
dataPM *packerManager
|
||||
packerCount int
|
||||
|
||||
allocEnc sync.Once
|
||||
allocDec sync.Once
|
||||
@ -125,9 +126,10 @@ func New(be backend.Backend, opts Options) (*Repository, error) {
|
||||
}
|
||||
|
||||
repo := &Repository{
|
||||
be: be,
|
||||
opts: opts,
|
||||
idx: index.NewMasterIndex(),
|
||||
be: be,
|
||||
opts: opts,
|
||||
idx: index.NewMasterIndex(),
|
||||
packerCount: defaultPackerCount,
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
@ -553,8 +555,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
|
||||
innerWg, ctx := errgroup.WithContext(ctx)
|
||||
r.packerWg = innerWg
|
||||
r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections())
|
||||
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker)
|
||||
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker)
|
||||
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker)
|
||||
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker)
|
||||
|
||||
wg.Go(func() error {
|
||||
return innerWg.Wait()
|
||||
|
Loading…
x
Reference in New Issue
Block a user