From 62453f93568725db7a0fa78db58f9ca146fe65b2 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 22 Mar 2025 19:43:07 +0100 Subject: [PATCH] repository: randomly distribute blobs over two pack files --- internal/repository/packer_manager.go | 96 ++++++++++++++++------ internal/repository/packer_manager_test.go | 12 +-- internal/repository/prune_internal_test.go | 95 +++++++++++++++++++++ internal/repository/prune_test.go | 79 ------------------ internal/repository/repository.go | 20 +++-- 5 files changed, 182 insertions(+), 120 deletions(-) create mode 100644 internal/repository/prune_internal_test.go diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index d5c340e39..6ffec8352 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -3,8 +3,10 @@ package repository import ( "bufio" "context" + "crypto/rand" "crypto/sha256" "io" + "math/big" "os" "sync" @@ -33,17 +35,20 @@ type packerManager struct { queueFn func(ctx context.Context, t restic.BlobType, p *packer) error pm sync.Mutex - packer *packer + packers []*packer packSize uint } +const defaultPackerCount = 2 + // newPackerManager returns a new packer manager which writes temporary files // to a temporary directory -func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager { +func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, packerCount int, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager { return &packerManager{ tpe: tpe, key: key, queueFn: queueFn, + packers: make([]*packer, packerCount), packSize: packSize, } } @@ -52,13 +57,15 @@ func (r *packerManager) Flush(ctx context.Context) error { r.pm.Lock() defer r.pm.Unlock() - if r.packer != nil { - debug.Log("manually flushing pending pack") - err := r.queueFn(ctx, r.tpe, r.packer) - if err != nil { - return err + for i, packer := range r.packers { + if packer != nil { + debug.Log("manually flushing pending pack") + err := r.queueFn(ctx, r.tpe, packer) + if err != nil { + return err + } + r.packers[i] = nil } - r.packer = nil } return nil } @@ -67,20 +74,9 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest r.pm.Lock() defer r.pm.Unlock() - var err error - packer := r.packer - // use separate packer if compressed length is larger than the packsize - // this speeds up the garbage collection of oversized blobs and reduces the cache size - // as the oversize blobs are only downloaded if necessary - if len(ciphertext) >= int(r.packSize) || r.packer == nil { - packer, err = r.newPacker() - if err != nil { - return 0, err - } - // don't store packer for oversized blob - if r.packer == nil { - r.packer = packer - } + packer, err := r.pickPacker(len(ciphertext)) + if err != nil { + return 0, err } // save ciphertext @@ -95,10 +91,9 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest debug.Log("pack is not full enough (%d bytes)", packer.Size()) return size, nil } - if packer == r.packer { - // forget full packer - r.packer = nil - } + + // forget full packer + r.forgetPacker(packer) // call while holding lock to prevent findPacker from creating new packers if the uploaders are busy // else write the pack to the backend @@ -110,6 +105,55 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest return size + packer.HeaderOverhead(), nil } +func randomInt(max int) (int, error) { + rangeSize := big.NewInt(int64(max)) + randomInt, err := rand.Int(rand.Reader, rangeSize) + if err != nil { + return 0, err + } + return int(randomInt.Int64()), nil +} + +// pickPacker returns or creates a randomly selected packer into which the blob should be stored. If the +// ciphertext is larger than the packSize, a new packer is returned. +func (r *packerManager) pickPacker(ciphertextLen int) (*packer, error) { + // use separate packer if compressed length is larger than the packsize + // this speeds up the garbage collection of oversized blobs and reduces the cache size + // as the oversize blobs are only downloaded if necessary + if ciphertextLen >= int(r.packSize) { + return r.newPacker() + } + + // randomly distribute blobs onto multiple packer instances. This makes it harder for + // an attacker to learn at which points a file was chunked and therefore mitigates the attack described in + // https://www.daemonology.net/blog/chunking-attacks.pdf . + // See https://github.com/restic/restic/issues/5291#issuecomment-2746146193 for details on the mitigation. + idx, err := randomInt(len(r.packers)) + if err != nil { + return nil, err + } + + // retrieve packer or get a new one + packer := r.packers[idx] + if packer == nil { + packer, err = r.newPacker() + if err != nil { + return nil, err + } + r.packers[idx] = packer + } + return packer, nil +} + +// forgetPacker drops the given packer from the internal list. This is used to forget full packers. +func (r *packerManager) forgetPacker(packer *packer) { + for i, p := range r.packers { + if packer == p { + r.packers[i] = nil + } + } +} + // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. func (r *packerManager) newPacker() (pck *packer, err error) { diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 44bc8cfab..3dd6a079d 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -63,7 +63,7 @@ func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) savedBytes := 0 - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, defaultPackerCount, func(ctx context.Context, tp restic.BlobType, p *packer) error { err := p.Finalize() if err != nil { return err @@ -75,8 +75,8 @@ func testPackerManager(t testing.TB) int64 { blobBuf := make([]byte, maxBlobSize) bytes := fillPacks(t, rnd, pm, blobBuf) - // bytes does not include the last packs header - test.Equals(t, savedBytes, bytes+36) + // bytes does not include the last pack headers + test.Assert(t, savedBytes == (bytes+36) || savedBytes == (bytes+72), "unexpected number of saved bytes, got %v, expected %v", savedBytes, bytes) t.Logf("saved %d bytes", bytes) return int64(bytes) @@ -85,7 +85,7 @@ func testPackerManager(t testing.TB) int64 { func TestPackerManagerWithOversizeBlob(t *testing.T) { packFiles := 0 sizeLimit := uint(512 * 1024) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, defaultPackerCount, func(ctx context.Context, tp restic.BlobType, p *packer) error { packFiles++ return nil }) @@ -97,7 +97,7 @@ func TestPackerManagerWithOversizeBlob(t *testing.T) { test.OK(t, pm.Flush(context.TODO())) // oversized blob must be stored in a separate packfile - test.Equals(t, packFiles, 2) + test.Assert(t, packFiles == 2 || packFiles == 3, "unexpected number of packfiles %v, expected 2 or 3", packFiles) } func BenchmarkPackerManager(t *testing.B) { @@ -115,7 +115,7 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, defaultPackerCount, func(ctx context.Context, t restic.BlobType, p *packer) error { return nil }) fillPacks(t, rnd, pm, blobBuf) diff --git a/internal/repository/prune_internal_test.go b/internal/repository/prune_internal_test.go new file mode 100644 index 000000000..5c9d0572e --- /dev/null +++ b/internal/repository/prune_internal_test.go @@ -0,0 +1,95 @@ +package repository + +import ( + "context" + "math" + "math/rand" + "testing" + "time" + + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" + "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" +) + +// TestPruneMaxUnusedDuplicate checks that MaxUnused correctly accounts for duplicates. +// +// Create a repository containing blobs a to d that are stored in packs as follows: +// - a, d +// - b, d +// - c, d +// All blobs should be kept during prune, but the duplicates should be gone afterwards. +// The special construction ensures that each pack contains a used, non-duplicate blob. +// This ensures that special cases that delete completely duplicate packs files do not +// apply. +func TestPruneMaxUnusedDuplicate(t *testing.T) { + seed := time.Now().UnixNano() + random := rand.New(rand.NewSource(seed)) + t.Logf("rand initialized with seed %d", seed) + + repo, _, _ := TestRepositoryWithVersion(t, 0) + // ensure blobs are assembled into packs as expected + repo.packerCount = 1 + // large blobs to prevent repacking due to too small packsize + const blobSize = 1024 * 1024 + + bufs := [][]byte{} + for i := 0; i < 4; i++ { + // use uniform length for simpler control via MaxUnusedBytes + buf := make([]byte, blobSize) + random.Read(buf) + bufs = append(bufs, buf) + } + keep := restic.NewBlobSet() + + for _, blobs := range [][][]byte{ + {bufs[0], bufs[3]}, + {bufs[1], bufs[3]}, + {bufs[2], bufs[3]}, + } { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + + for _, blob := range blobs { + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, restic.ID{}, true) + keep.Insert(restic.BlobHandle{Type: restic.DataBlob, ID: id}) + rtest.OK(t, err) + } + + rtest.OK(t, repo.Flush(context.Background())) + } + + opts := PruneOptions{ + MaxRepackBytes: math.MaxUint64, + // non-zero number of unused bytes, that is nevertheless smaller than a single blob + // setting this to zero would bypass the unused/duplicate size accounting that should + // be tested here + MaxUnusedBytes: func(used uint64) (unused uint64) { return blobSize / 2 }, + } + + plan, err := PlanPrune(context.TODO(), opts, repo, func(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet) error { + for blob := range keep { + usedBlobs.Insert(blob) + } + return nil + }, &progress.NoopPrinter{}) + rtest.OK(t, err) + + rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{})) + + rsize := plan.Stats().Size + remainingUnusedSize := rsize.Duplicate + rsize.Unused - rsize.Remove - rsize.Repackrm + maxUnusedSize := opts.MaxUnusedBytes(rsize.Used) + rtest.Assert(t, remainingUnusedSize <= maxUnusedSize, "too much unused data remains got %v, expected less than %v", remainingUnusedSize, maxUnusedSize) + + // divide by blobSize to ignore pack file overhead + rtest.Equals(t, rsize.Used/blobSize, uint64(4)) + rtest.Equals(t, rsize.Duplicate/blobSize, uint64(2)) + rtest.Equals(t, rsize.Unused, uint64(0)) + rtest.Equals(t, rsize.Remove, uint64(0)) + rtest.Equals(t, rsize.Repack/blobSize, uint64(4)) + rtest.Equals(t, rsize.Repackrm/blobSize, uint64(2)) + rtest.Equals(t, rsize.Unref, uint64(0)) + rtest.Equals(t, rsize.Uncompressed, uint64(0)) +} diff --git a/internal/repository/prune_test.go b/internal/repository/prune_test.go index 63aa939cf..2b6bdc2f2 100644 --- a/internal/repository/prune_test.go +++ b/internal/repository/prune_test.go @@ -115,85 +115,6 @@ func TestPrune(t *testing.T) { } } -// TestPruneMaxUnusedDuplicate checks that MaxUnused correctly accounts for duplicates. -// -// Create a repository containing blobs a to d that are stored in packs as follows: -// - a, d -// - b, d -// - c, d -// All blobs should be kept during prune, but the duplicates should be gone afterwards. -// The special construction ensures that each pack contains a used, non-duplicate blob. -// This ensures that special cases that delete completely duplicate packs files do not -// apply. -func TestPruneMaxUnusedDuplicate(t *testing.T) { - seed := time.Now().UnixNano() - random := rand.New(rand.NewSource(seed)) - t.Logf("rand initialized with seed %d", seed) - - repo, _, _ := repository.TestRepositoryWithVersion(t, 0) - // large blobs to prevent repacking due to too small packsize - const blobSize = 1024 * 1024 - - bufs := [][]byte{} - for i := 0; i < 4; i++ { - // use uniform length for simpler control via MaxUnusedBytes - buf := make([]byte, blobSize) - random.Read(buf) - bufs = append(bufs, buf) - } - keep := restic.NewBlobSet() - - for _, blobs := range [][][]byte{ - {bufs[0], bufs[3]}, - {bufs[1], bufs[3]}, - {bufs[2], bufs[3]}, - } { - var wg errgroup.Group - repo.StartPackUploader(context.TODO(), &wg) - - for _, blob := range blobs { - id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, restic.ID{}, true) - keep.Insert(restic.BlobHandle{Type: restic.DataBlob, ID: id}) - rtest.OK(t, err) - } - - rtest.OK(t, repo.Flush(context.Background())) - } - - opts := repository.PruneOptions{ - MaxRepackBytes: math.MaxUint64, - // non-zero number of unused bytes, that is nevertheless smaller than a single blob - // setting this to zero would bypass the unused/duplicate size accounting that should - // be tested here - MaxUnusedBytes: func(used uint64) (unused uint64) { return blobSize / 2 }, - } - - plan, err := repository.PlanPrune(context.TODO(), opts, repo, func(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet) error { - for blob := range keep { - usedBlobs.Insert(blob) - } - return nil - }, &progress.NoopPrinter{}) - rtest.OK(t, err) - - rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{})) - - rsize := plan.Stats().Size - remainingUnusedSize := rsize.Duplicate + rsize.Unused - rsize.Remove - rsize.Repackrm - maxUnusedSize := opts.MaxUnusedBytes(rsize.Used) - rtest.Assert(t, remainingUnusedSize <= maxUnusedSize, "too much unused data remains got %v, expected less than %v", remainingUnusedSize, maxUnusedSize) - - // divide by blobSize to ignore pack file overhead - rtest.Equals(t, rsize.Used/blobSize, uint64(4)) - rtest.Equals(t, rsize.Duplicate/blobSize, uint64(2)) - rtest.Equals(t, rsize.Unused, uint64(0)) - rtest.Equals(t, rsize.Remove, uint64(0)) - rtest.Equals(t, rsize.Repack/blobSize, uint64(4)) - rtest.Equals(t, rsize.Repackrm/blobSize, uint64(2)) - rtest.Equals(t, rsize.Unref, uint64(0)) - rtest.Equals(t, rsize.Uncompressed, uint64(0)) -} - /* 1.) create repository with packsize of 2M. 2.) create enough data for 11 packfiles (31 packs) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 259f879be..066ba5ed5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -42,10 +42,11 @@ type Repository struct { opts Options - packerWg *errgroup.Group - uploader *packerUploader - treePM *packerManager - dataPM *packerManager + packerWg *errgroup.Group + uploader *packerUploader + treePM *packerManager + dataPM *packerManager + packerCount int allocEnc sync.Once allocDec sync.Once @@ -125,9 +126,10 @@ func New(be backend.Backend, opts Options) (*Repository, error) { } repo := &Repository{ - be: be, - opts: opts, - idx: index.NewMasterIndex(), + be: be, + opts: opts, + idx: index.NewMasterIndex(), + packerCount: defaultPackerCount, } return repo, nil @@ -553,8 +555,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections()) - r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker) - r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker) wg.Go(func() error { return innerWg.Wait()