Replace FilesInParallel with an errgroup.Group

This commit is contained in:
Alexander Neumann
2019-03-24 21:27:28 +01:00
parent 75906edef5
commit e046428c94
5 changed files with 227 additions and 300 deletions

View File

@@ -74,82 +74,107 @@ func (err ErrOldIndexFormat) Error() string {
// LoadIndex loads all index files.
func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
debug.Log("Start")
type indexRes struct {
Index *repository.Index
err error
ID string
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
type FileInfo struct {
restic.ID
Size int64
}
indexCh := make(chan indexRes)
type Result struct {
*repository.Index
restic.ID
Err error
}
worker := func(ctx context.Context, id restic.ID) error {
debug.Log("worker got index %v", id)
idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeIndex)
if errors.Cause(err) == repository.ErrOldIndexFormat {
debug.Log("index %v has old format", id)
hints = append(hints, ErrOldIndexFormat{id})
ch := make(chan FileInfo)
resultCh := make(chan Result)
idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeOldIndex)
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return c.repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
return nil
case ch <- FileInfo{id, size}:
}
return nil
})
})
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
worker := func() error {
for fi := range ch {
debug.Log("worker got file %v", fi.ID.Str())
idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeIndex)
if errors.Cause(err) == repository.ErrOldIndexFormat {
debug.Log("index %v has old format", fi.ID.Str())
hints = append(hints, ErrOldIndexFormat{fi.ID})
idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeOldIndex)
}
err = errors.Wrapf(err, "error loading index %v", fi.ID.Str())
select {
case resultCh <- Result{idx, fi.ID, err}:
case <-ctx.Done():
}
}
err = errors.Wrapf(err, "error loading index %v", id.Str())
select {
case indexCh <- indexRes{Index: idx, ID: id.String(), err: err}:
case <-ctx.Done():
}
return nil
}
go func() {
defer close(indexCh)
debug.Log("start loading indexes in parallel")
err := repository.FilesInParallel(ctx, c.repo.Backend(), restic.IndexFile, defaultParallelism,
repository.ParallelWorkFuncParseID(worker))
debug.Log("loading indexes finished, error: %v", err)
if err != nil {
panic(err)
}
}()
// final closes indexCh after all workers have terminated
final := func() error {
close(resultCh)
return nil
}
done := make(chan struct{})
defer close(done)
// run workers on ch
wg.Go(func() error {
return repository.RunWorkers(ctx, defaultParallelism, worker, final)
})
// receive decoded indexes
packToIndex := make(map[restic.ID]restic.IDSet)
wg.Go(func() error {
for res := range resultCh {
debug.Log("process index %v, err %v", res.ID, res.Err)
for res := range indexCh {
debug.Log("process index %v, err %v", res.ID, res.err)
if res.err != nil {
errs = append(errs, res.err)
continue
}
idxID, err := restic.ParseID(res.ID)
if err != nil {
errs = append(errs, errors.Errorf("unable to parse as index ID: %v", res.ID))
continue
}
c.indexes[idxID] = res.Index
c.masterIndex.Insert(res.Index)
debug.Log("process blobs")
cnt := 0
for blob := range res.Index.Each(ctx) {
c.packs.Insert(blob.PackID)
c.blobs.Insert(blob.ID)
c.blobRefs.M[blob.ID] = 0
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
packToIndex[blob.PackID] = restic.NewIDSet()
if res.Err != nil {
errs = append(errs, res.Err)
continue
}
packToIndex[blob.PackID].Insert(idxID)
}
debug.Log("%d blobs processed", cnt)
c.indexes[res.ID] = res.Index
c.masterIndex.Insert(res.Index)
debug.Log("process blobs")
cnt := 0
for blob := range res.Index.Each(ctx) {
c.packs.Insert(blob.PackID)
c.blobs.Insert(blob.ID)
c.blobRefs.M[blob.ID] = 0
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
packToIndex[blob.PackID] = restic.NewIDSet()
}
packToIndex[blob.PackID].Insert(res.ID)
}
debug.Log("%d blobs processed", cnt)
}
return nil
})
err := wg.Wait()
if err != nil {
errs = append(errs, err)
}
debug.Log("checking for duplicate packs")
@@ -163,7 +188,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
}
}
err := c.repo.SetIndex(c.masterIndex)
err = c.repo.SetIndex(c.masterIndex)
if err != nil {
debug.Log("SetIndex returned error: %v", err)
errs = append(errs, err)
@@ -281,31 +306,52 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.ID
sync.Mutex
}
snapshotWorker := func(ctx context.Context, strID string) error {
id, err := restic.ParseID(strID)
if err != nil {
return err
}
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
debug.Log("load snapshot %v", id)
ch := make(chan restic.ID)
treeID, err := loadTreeFromSnapshot(ctx, repo, id)
if err != nil {
errs.Lock()
errs.errs = append(errs.errs, err)
errs.Unlock()
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
return nil
case ch <- id:
}
return nil
})
})
// a worker receives an index ID from ch, loads the snapshot and the tree,
// and adds the result to errs and trees.
worker := func() error {
for id := range ch {
debug.Log("load snapshot %v", id)
treeID, err := loadTreeFromSnapshot(ctx, repo, id)
if err != nil {
errs.Lock()
errs.errs = append(errs.errs, err)
errs.Unlock()
continue
}
debug.Log("snapshot %v has tree %v", id, treeID)
trees.Lock()
trees.IDs = append(trees.IDs, treeID)
trees.Unlock()
}
debug.Log("snapshot %v has tree %v", id, treeID)
trees.Lock()
trees.IDs = append(trees.IDs, treeID)
trees.Unlock()
return nil
}
err := repository.FilesInParallel(ctx, repo.Backend(), restic.SnapshotFile, defaultParallelism, snapshotWorker)
for i := 0; i < defaultParallelism; i++ {
wg.Go(worker)
}
err := wg.Wait()
if err != nil {
errs.errs = append(errs.errs, err)
}