Merge pull request #3927 from MichaelEischer/faster-index-each

Speed up MasterIndex.Each
This commit is contained in:
Michael Eischer
2022-09-24 12:35:23 +02:00
committed by GitHub
14 changed files with 80 additions and 89 deletions

View File

@@ -124,14 +124,14 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
debug.Log("process blobs")
cnt := 0
for blob := range index.Each(ctx) {
index.Each(ctx, func(blob restic.PackedBlob) {
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
packToIndex[blob.PackID] = restic.NewIDSet()
}
packToIndex[blob.PackID].Insert(id)
}
})
debug.Log("%d blobs processed", cnt)
return nil
@@ -458,13 +458,13 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for blob := range c.repo.Index().Each(ctx) {
c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
if !c.blobRefs.M.Has(h) {
debug.Log("blob %v not referenced", h)
blobs = append(blobs, h)
}
}
})
return blobs
}

View File

@@ -370,7 +370,7 @@ func CalculateHeaderSize(blobs []restic.Blob) int {
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.ID]int64 {
packSize := make(map[restic.ID]int64)
for blob := range mi.Each(ctx) {
mi.Each(ctx, func(blob restic.PackedBlob) {
size, ok := packSize[blob.PackID]
if !ok {
size = headerSize
@@ -379,7 +379,7 @@ func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.I
size += int64(blob.Length)
}
packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob))
}
})
return packSize
}

View File

@@ -217,34 +217,22 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
return nil
}
// Each returns a channel that yields all blobs known to the index. When the
// context is cancelled, the background goroutine terminates. This blocks any
// Each passes all blobs known to the index to the callback fn. This blocks any
// modification of the index.
func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob {
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
idx.m.Lock()
defer idx.m.Unlock()
ch := make(chan restic.PackedBlob)
go func() {
defer idx.m.Unlock()
defer func() {
close(ch)
}()
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
select {
case <-ctx.Done():
return false
case ch <- idx.toPackedBlob(e, restic.BlobType(typ)):
return true
}
})
}
}()
return ch
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
if ctx.Err() != nil {
return false
}
fn(idx.toPackedBlob(e, restic.BlobType(typ)))
return true
})
}
}
type EachByPackResult struct {

View File

@@ -355,11 +355,11 @@ func TestIndexUnserialize(t *testing.T) {
}
func listPack(idx *repository.Index, id restic.ID) (pbs []restic.PackedBlob) {
for pb := range idx.Each(context.TODO()) {
idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.PackID.Equal(id) {
pbs = append(pbs, pb)
}
}
})
return pbs
}

View File

@@ -234,30 +234,15 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index {
return list
}
// Each returns a channel that yields all blobs known to the index. When the
// context is cancelled, the background goroutine terminates. This blocks any
// modification of the index.
func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob {
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
ch := make(chan restic.PackedBlob)
go func() {
defer mi.idxMutex.RUnlock()
defer close(ch)
for _, idx := range mi.idx {
for pb := range idx.Each(ctx) {
select {
case <-ctx.Done():
return
case ch <- pb:
}
}
}
}()
return ch
for _, idx := range mi.idx {
idx.Each(ctx, fn)
}
}
// MergeFinalIndexes merges all final indexes together.
@@ -450,11 +435,11 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
if len(packBlob) == 0 {
continue
}
for pb := range mi.Each(ctx) {
mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
}
})
// pass on packs
for packID, pbs := range packBlob {

View File

@@ -163,9 +163,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, 1, idxCount)
blobCount := 0
for range mIdx.Each(context.TODO()) {
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
}
})
rtest.Equals(t, 2, blobCount)
blobs := mIdx.Lookup(bhInIdx1)
@@ -195,9 +195,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobCount = 0
for range mIdx.Each(context.TODO()) {
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
}
})
rtest.Equals(t, 2, blobCount)
}
@@ -308,6 +308,20 @@ func BenchmarkMasterIndexLookupBlobSize(b *testing.B) {
}
}
func BenchmarkMasterIndexEach(b *testing.B) {
rng := rand.New(rand.NewSource(0))
mIdx, _ := createRandomMasterIndex(b, rand.New(rng), 5, 200000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
entries := 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
entries++
})
}
}
var (
snapshotTime = time.Unix(1470492820, 207401672)
depth = 3

View File

@@ -595,10 +595,15 @@ func (r *Repository) LoadIndex(ctx context.Context) error {
// sanity check
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for blob := range r.idx.Each(ctx) {
invalidIndex := false
r.idx.Each(ctx, func(blob restic.PackedBlob) {
if blob.IsCompressed() {
return errors.Fatal("index uses feature not supported by repository version 1")
invalidIndex = true
}
})
if invalidIndex {
return errors.Fatal("index uses feature not supported by repository version 1")
}
}

View File

@@ -362,13 +362,13 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) {
idx, err := loadIndex(context.TODO(), repo, id)
rtest.OK(t, err)
for pb := range idx.Each(context.TODO()) {
idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := packEntries[pb.PackID]; !ok {
packEntries[pb.PackID] = make(map[restic.ID]struct{})
}
packEntries[pb.PackID][id] = struct{}{}
}
})
return nil
})
if err != nil {

View File

@@ -83,10 +83,9 @@ type MasterIndex interface {
Has(BlobHandle) bool
Lookup(BlobHandle) []PackedBlob
// Each returns a channel that yields all blobs known to the index. When
// the context is cancelled, the background goroutine terminates. This
// blocks any modification of the index.
Each(ctx context.Context) <-chan PackedBlob
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob))
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo SaverUnpacked, packBlacklist IDSet, extraObsolete IDs, p *progress.Counter) (obsolete IDSet, err error)