index: Correctly process errors listing all files

This also removes the now unused `list` and `worker` packages.
This commit is contained in:
Alexander Neumann
2018-10-28 11:16:29 +01:00
parent d10754e2b4
commit b096fc7abf
5 changed files with 84 additions and 289 deletions

View File

@@ -5,14 +5,13 @@ import (
"context"
"fmt"
"os"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/list"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/worker"
"github.com/restic/restic/internal/errors"
"golang.org/x/sync/errgroup"
)
// Pack contains information about the contents of a pack.
@@ -35,38 +34,108 @@ func newIndex() *Index {
}
}
const listPackWorkers = 10
// New creates a new index for repo from scratch. InvalidFiles contains all IDs
// of files that cannot be listed successfully.
func New(ctx context.Context, repo restic.Repository, ignorePacks restic.IDSet, p *restic.Progress) (idx *Index, invalidFiles restic.IDs, err error) {
p.Start()
defer p.Done()
ch := make(chan worker.Job)
go list.AllPacks(ctx, repo, ignorePacks, ch)
type Job struct {
PackID restic.ID
Size int64
}
type Result struct {
Error error
PackID restic.ID
Size int64
Entries []restic.Blob
}
inputCh := make(chan Job)
outputCh := make(chan Result)
wg, ctx := errgroup.WithContext(ctx)
// list the files in the repo, send to inputCh
wg.Go(func() error {
defer close(inputCh)
return repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error {
if ignorePacks.Has(id) {
return nil
}
job := Job{
PackID: id,
Size: size,
}
select {
case inputCh <- job:
case <-ctx.Done():
}
return nil
})
})
// run the workers listing the files, read from inputCh, send to outputCh
var workers sync.WaitGroup
for i := 0; i < listPackWorkers; i++ {
workers.Add(1)
go func() {
defer workers.Done()
for job := range inputCh {
res := Result{PackID: job.PackID}
res.Entries, res.Size, res.Error = repo.ListPack(ctx, job.PackID, job.Size)
select {
case outputCh <- res:
case <-ctx.Done():
return
}
}
}()
}
// wait until all the workers are done, then close outputCh
wg.Go(func() error {
workers.Wait()
close(outputCh)
return nil
})
idx = newIndex()
for job := range ch {
for res := range outputCh {
p.Report(restic.Stat{Blobs: 1})
j := job.Result.(list.Result)
if job.Error != nil {
cause := errors.Cause(job.Error)
if res.Error != nil {
cause := errors.Cause(res.Error)
if _, ok := cause.(pack.InvalidFileError); ok {
invalidFiles = append(invalidFiles, j.PackID())
invalidFiles = append(invalidFiles, res.PackID)
continue
}
fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", j.PackID(), job.Error)
fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", res.PackID, res.Error)
continue
}
debug.Log("pack %v contains %d blobs", j.PackID(), len(j.Entries()))
debug.Log("pack %v contains %d blobs", res.PackID, len(res.Entries))
err := idx.AddPack(j.PackID(), j.Size(), j.Entries())
err := idx.AddPack(res.PackID, res.Size, res.Entries)
if err != nil {
return nil, nil, err
}
select {
case <-ctx.Done(): // an error occurred
default:
}
}
err = wg.Wait()
if err != nil {
return nil, nil, err
}
return idx, invalidFiles, nil