Fix calls to repo/backend.List() everywhere

This commit is contained in:
Alexander Neumann
2018-01-21 17:25:36 +01:00
parent e9ea268847
commit b0c6e53241
28 changed files with 318 additions and 254 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/pack"
@@ -192,13 +193,14 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
debug.Log("listing repository packs")
repoPacks := restic.NewIDSet()
for id := range c.repo.List(ctx, restic.DataFile) {
select {
case <-ctx.Done():
return
default:
}
err := c.repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error {
repoPacks.Insert(id)
return nil
})
if err != nil {
errChan <- err
}
// orphaned: present in the repo but not in c.packs
@@ -719,42 +721,58 @@ func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan
p.Start()
defer p.Done()
worker := func(wg *sync.WaitGroup, in <-chan restic.ID) {
defer wg.Done()
for {
var id restic.ID
var ok bool
g, ctx := errgroup.WithContext(ctx)
ch := make(chan restic.ID)
// start producer for channel ch
g.Go(func() error {
defer close(ch)
return c.repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
return
case id, ok = <-in:
if !ok {
return
case ch <- id:
}
return nil
})
})
// run workers
for i := 0; i < defaultParallelism; i++ {
g.Go(func() error {
for {
var id restic.ID
var ok bool
select {
case <-ctx.Done():
return nil
case id, ok = <-ch:
if !ok {
return nil
}
}
err := checkPack(ctx, c.repo, id)
p.Report(restic.Stat{Blobs: 1})
if err == nil {
continue
}
select {
case <-ctx.Done():
return nil
case errChan <- err:
}
}
})
}
err := checkPack(ctx, c.repo, id)
p.Report(restic.Stat{Blobs: 1})
if err == nil {
continue
}
select {
case <-ctx.Done():
return
case errChan <- err:
}
err := g.Wait()
if err != nil {
select {
case <-ctx.Done():
return
case errChan <- err:
}
}
ch := c.repo.List(ctx, restic.DataFile)
var wg sync.WaitGroup
for i := 0; i < defaultParallelism; i++ {
wg.Add(1)
go worker(&wg, ch)
}
wg.Wait()
}