mirror of
				https://github.com/restic/restic.git
				synced 2025-10-31 03:02:05 +00:00 
			
		
		
		
	Merge pull request #3006 from aawsome/new-rebuild-index
Reimplement rebuild-index and remove /internal/index
This commit is contained in:
		
							
								
								
									
										9
									
								
								changelog/unreleased/pull-3006
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								changelog/unreleased/pull-3006
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,9 @@ | ||||
| Enhancement: Speed up rebuild-index | ||||
|  | ||||
| We've optimized the command rebuild-index. Now, existing index entries are used | ||||
| to minimize the number of pack files that must be read. This speeds up the index | ||||
| rebuild a lot. | ||||
| Also the option --read-all-packs was added which implements the previous behavior. | ||||
|  | ||||
| https://github.com/restic/restic/issue/2547 | ||||
| https://github.com/restic/restic/pull/3006 | ||||
| @@ -498,7 +498,7 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB | ||||
| 	if len(removePacks) != 0 { | ||||
| 		totalpacks := int(stats.packs.used+stats.packs.partlyUsed+stats.packs.unused) - | ||||
| 			len(removePacks) + packsAddedByRepack | ||||
| 		err = rebuildIndexFiles(gopts, repo, removePacks, uint64(totalpacks)) | ||||
| 		err = rebuildIndexFiles(gopts, repo, removePacks, nil, uint64(totalpacks)) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| @@ -511,12 +511,12 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, packcount uint64) error { | ||||
| func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, packcount uint64) error { | ||||
| 	Verbosef("rebuilding index\n") | ||||
|  | ||||
| 	bar := newProgressMax(!gopts.Quiet, packcount, "packs processed") | ||||
| 	obsoleteIndexes, err := (repo.Index()).(*repository.MasterIndex). | ||||
| 		Save(gopts.ctx, repo, removePacks, bar) | ||||
| 		Save(gopts.ctx, repo, removePacks, extraObsolete, bar) | ||||
| 	bar.Done() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
|   | ||||
| @@ -1,10 +1,8 @@ | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
|  | ||||
| 	"github.com/restic/restic/internal/errors" | ||||
| 	"github.com/restic/restic/internal/index" | ||||
| 	"github.com/restic/restic/internal/pack" | ||||
| 	"github.com/restic/restic/internal/repository" | ||||
| 	"github.com/restic/restic/internal/restic" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| @@ -12,7 +10,7 @@ import ( | ||||
|  | ||||
| var cmdRebuildIndex = &cobra.Command{ | ||||
| 	Use:   "rebuild-index [flags]", | ||||
| 	Short: "Build a new index file", | ||||
| 	Short: "Build a new index", | ||||
| 	Long: ` | ||||
| The "rebuild-index" command creates a new index based on the pack files in the | ||||
| repository. | ||||
| @@ -24,15 +22,25 @@ Exit status is 0 if the command was successful, and non-zero if there was any er | ||||
| `, | ||||
| 	DisableAutoGenTag: true, | ||||
| 	RunE: func(cmd *cobra.Command, args []string) error { | ||||
| 		return runRebuildIndex(globalOptions) | ||||
| 		return runRebuildIndex(rebuildIndexOptions, globalOptions) | ||||
| 	}, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	cmdRoot.AddCommand(cmdRebuildIndex) | ||||
| // RebuildIndexOptions collects all options for the rebuild-index command. | ||||
| type RebuildIndexOptions struct { | ||||
| 	ReadAllPacks bool | ||||
| } | ||||
|  | ||||
| func runRebuildIndex(gopts GlobalOptions) error { | ||||
| var rebuildIndexOptions RebuildIndexOptions | ||||
|  | ||||
| func init() { | ||||
| 	cmdRoot.AddCommand(cmdRebuildIndex) | ||||
| 	f := cmdRebuildIndex.Flags() | ||||
| 	f.BoolVar(&rebuildIndexOptions.ReadAllPacks, "read-all-packs", false, "read all pack files to generate new index from scratch") | ||||
|  | ||||
| } | ||||
|  | ||||
| func runRebuildIndex(opts RebuildIndexOptions, gopts GlobalOptions) error { | ||||
| 	repo, err := OpenRepository(gopts) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -44,59 +52,97 @@ func runRebuildIndex(gopts GlobalOptions) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := context.WithCancel(gopts.ctx) | ||||
| 	defer cancel() | ||||
| 	return rebuildIndex(ctx, repo, restic.NewIDSet()) | ||||
| 	return rebuildIndex(opts, gopts, repo, restic.NewIDSet()) | ||||
| } | ||||
|  | ||||
| func rebuildIndex(ctx context.Context, repo restic.Repository, ignorePacks restic.IDSet) error { | ||||
| 	Verbosef("counting files in repo\n") | ||||
| func rebuildIndex(opts RebuildIndexOptions, gopts GlobalOptions, repo *repository.Repository, ignorePacks restic.IDSet) error { | ||||
| 	ctx := gopts.ctx | ||||
|  | ||||
| 	var packs uint64 | ||||
| 	err := repo.List(ctx, restic.PackFile, func(restic.ID, int64) error { | ||||
| 		packs++ | ||||
| 	var obsoleteIndexes restic.IDs | ||||
| 	packSizeFromList := make(map[restic.ID]int64) | ||||
| 	removePacks := restic.NewIDSet() | ||||
| 	totalPacks := 0 | ||||
|  | ||||
| 	if opts.ReadAllPacks { | ||||
| 		// get old index files | ||||
| 		err := repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { | ||||
| 			obsoleteIndexes = append(obsoleteIndexes, id) | ||||
| 			return nil | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 	bar := newProgressMax(!globalOptions.Quiet, packs-uint64(len(ignorePacks)), "packs") | ||||
| 	idx, invalidFiles, err := index.New(ctx, repo, ignorePacks, bar) | ||||
| 	bar.Done() | ||||
| 		Verbosef("finding pack files in repo...\n") | ||||
| 		err = repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { | ||||
| 			packSizeFromList[id] = size | ||||
| 			removePacks.Insert(id) | ||||
| 			totalPacks++ | ||||
| 			return nil | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} else { | ||||
| 		Verbosef("loading indexes...\n") | ||||
| 		err := repo.LoadIndex(gopts.ctx) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		Verbosef("getting pack files to read...\n") | ||||
|  | ||||
| 		// Compute size of each pack from index entries | ||||
| 		packSizeFromIndex := make(map[restic.ID]int64) | ||||
| 		for blob := range repo.Index().Each(ctx) { | ||||
| 			size, ok := packSizeFromIndex[blob.PackID] | ||||
| 			if !ok { | ||||
| 				size = pack.HeaderSize | ||||
| 			} | ||||
| 			// update packSizeFromIndex | ||||
| 			packSizeFromIndex[blob.PackID] = size + int64(pack.PackedSizeOfBlob(blob.Length)) | ||||
| 		} | ||||
|  | ||||
| 		err = repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error { | ||||
| 			size, ok := packSizeFromIndex[id] | ||||
| 			if !ok || size != packSize { | ||||
| 				// Pack was not referenced in index or size does not match | ||||
| 				packSizeFromList[id] = size | ||||
| 				removePacks.Insert(id) | ||||
| 			} | ||||
| 			totalPacks++ | ||||
| 			delete(packSizeFromIndex, id) | ||||
| 			return nil | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		for id := range packSizeFromIndex { | ||||
| 			// forget pack files that are referenced in the index but do not exist | ||||
| 			// when rebuilding the index | ||||
| 			removePacks.Insert(id) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if len(packSizeFromList) > 0 { | ||||
| 		Verbosef("reading pack files\n") | ||||
| 		bar := newProgressMax(!globalOptions.Quiet, uint64(len(packSizeFromList)), "packs") | ||||
| 		invalidFiles, err := repo.CreateIndexFromPacks(ctx, packSizeFromList, bar) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 	if globalOptions.verbosity >= 2 { | ||||
| 		for _, id := range invalidFiles { | ||||
| 			Printf("skipped incomplete pack file: %v\n", id) | ||||
| 			Verboseff("skipped incomplete pack file: %v\n", id) | ||||
| 			totalPacks-- | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	Verbosef("finding old index files\n") | ||||
|  | ||||
| 	var supersedes restic.IDs | ||||
| 	err = repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { | ||||
| 		supersedes = append(supersedes, id) | ||||
| 		return nil | ||||
| 	}) | ||||
| 	err := rebuildIndexFiles(gopts, repo, removePacks, obsoleteIndexes, uint64(totalPacks)) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	ids, err := idx.Save(ctx, repo, supersedes) | ||||
| 	if err != nil { | ||||
| 		return errors.Fatalf("unable to save index, last error was: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	Verbosef("saved new indexes as %v\n", ids) | ||||
|  | ||||
| 	Verbosef("remove %d old index files\n", len(supersedes)) | ||||
| 	err = DeleteFilesChecked(globalOptions, repo, restic.NewIDSet(supersedes...), restic.IndexFile) | ||||
| 	if err != nil { | ||||
| 		return errors.Fatalf("unable to remove an old index: %v\n", err) | ||||
| 	} | ||||
| 	Verbosef("done\n") | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -175,7 +175,7 @@ func testRunRebuildIndex(t testing.TB, gopts GlobalOptions) { | ||||
| 		globalOptions.stdout = os.Stdout | ||||
| 	}() | ||||
|  | ||||
| 	rtest.OK(t, runRebuildIndex(gopts)) | ||||
| 	rtest.OK(t, runRebuildIndex(RebuildIndexOptions{}, gopts)) | ||||
| } | ||||
|  | ||||
| func testRunLs(t testing.TB, gopts GlobalOptions, snapshotID string) []string { | ||||
| @@ -1351,7 +1351,7 @@ func TestRebuildIndexFailsOnAppendOnly(t *testing.T) { | ||||
| 	env.gopts.backendTestHook = func(r restic.Backend) (restic.Backend, error) { | ||||
| 		return &appendOnlyBackend{r}, nil | ||||
| 	} | ||||
| 	err := runRebuildIndex(env.gopts) | ||||
| 	err := runRebuildIndex(RebuildIndexOptions{}, env.gopts) | ||||
| 	if err == nil { | ||||
| 		t.Error("expected rebuildIndex to fail") | ||||
| 	} | ||||
| @@ -1583,7 +1583,7 @@ func (be *listOnceBackend) List(ctx context.Context, t restic.FileType, fn func( | ||||
| 	return be.Backend.List(ctx, t, fn) | ||||
| } | ||||
|  | ||||
| func TestPruneListOnce(t *testing.T) { | ||||
| func TestListOnce(t *testing.T) { | ||||
| 	env, cleanup := withTestEnvironment(t) | ||||
| 	defer cleanup() | ||||
|  | ||||
| @@ -1613,6 +1613,9 @@ func TestPruneListOnce(t *testing.T) { | ||||
| 	testRunForget(t, env.gopts, firstSnapshot[0].String()) | ||||
| 	testRunPrune(t, env.gopts, pruneOpts) | ||||
| 	rtest.OK(t, runCheck(checkOpts, env.gopts, nil)) | ||||
|  | ||||
| 	rtest.OK(t, runRebuildIndex(RebuildIndexOptions{}, env.gopts)) | ||||
| 	rtest.OK(t, runRebuildIndex(RebuildIndexOptions{ReadAllPacks: true}, env.gopts)) | ||||
| } | ||||
|  | ||||
| func TestHardLink(t *testing.T) { | ||||
|   | ||||
| @@ -1,373 +0,0 @@ | ||||
| // Package index contains various data structures for indexing content in a repository or backend. | ||||
| package index | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/restic/restic/internal/debug" | ||||
| 	"github.com/restic/restic/internal/errors" | ||||
| 	"github.com/restic/restic/internal/pack" | ||||
| 	"github.com/restic/restic/internal/restic" | ||||
| 	"github.com/restic/restic/internal/ui/progress" | ||||
|  | ||||
| 	"golang.org/x/sync/errgroup" | ||||
| ) | ||||
|  | ||||
| // Pack contains information about the contents of a pack. | ||||
| type Pack struct { | ||||
| 	ID      restic.ID | ||||
| 	Size    int64 | ||||
| 	Entries []restic.Blob | ||||
| } | ||||
|  | ||||
| // Index contains information about blobs and packs stored in a repo. | ||||
| type Index struct { | ||||
| 	Packs    map[restic.ID]Pack | ||||
| 	IndexIDs restic.IDSet | ||||
| } | ||||
|  | ||||
| func newIndex() *Index { | ||||
| 	return &Index{ | ||||
| 		Packs:    make(map[restic.ID]Pack), | ||||
| 		IndexIDs: restic.NewIDSet(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| const listPackWorkers = 10 | ||||
|  | ||||
| // Lister lists files and their contents | ||||
| type Lister interface { | ||||
| 	// List runs fn for all files of type t in the repo. | ||||
| 	List(ctx context.Context, t restic.FileType, fn func(restic.ID, int64) error) error | ||||
|  | ||||
| 	// ListPack returns the list of blobs saved in the pack id and the length | ||||
| 	// of the file as stored in the backend. | ||||
| 	ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, int64, error) | ||||
| } | ||||
|  | ||||
| // New creates a new index for repo from scratch. InvalidFiles contains all IDs | ||||
| // of files  that cannot be listed successfully. | ||||
| func New(ctx context.Context, repo Lister, ignorePacks restic.IDSet, p *progress.Counter) (idx *Index, invalidFiles restic.IDs, err error) { | ||||
| 	type Job struct { | ||||
| 		PackID restic.ID | ||||
| 		Size   int64 | ||||
| 	} | ||||
|  | ||||
| 	type Result struct { | ||||
| 		Error   error | ||||
| 		PackID  restic.ID | ||||
| 		Size    int64 | ||||
| 		Entries []restic.Blob | ||||
| 	} | ||||
|  | ||||
| 	inputCh := make(chan Job) | ||||
| 	outputCh := make(chan Result) | ||||
| 	wg, ctx := errgroup.WithContext(ctx) | ||||
|  | ||||
| 	// list the files in the repo, send to inputCh | ||||
| 	wg.Go(func() error { | ||||
| 		defer close(inputCh) | ||||
| 		return repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { | ||||
| 			if ignorePacks.Has(id) { | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			job := Job{ | ||||
| 				PackID: id, | ||||
| 				Size:   size, | ||||
| 			} | ||||
|  | ||||
| 			select { | ||||
| 			case inputCh <- job: | ||||
| 			case <-ctx.Done(): | ||||
| 			} | ||||
| 			return nil | ||||
| 		}) | ||||
| 	}) | ||||
|  | ||||
| 	// run the workers listing the files, read from inputCh, send to outputCh | ||||
| 	var workers sync.WaitGroup | ||||
| 	for i := 0; i < listPackWorkers; i++ { | ||||
| 		workers.Add(1) | ||||
| 		go func() { | ||||
| 			defer workers.Done() | ||||
| 			for job := range inputCh { | ||||
| 				res := Result{PackID: job.PackID} | ||||
| 				res.Entries, res.Size, res.Error = repo.ListPack(ctx, job.PackID, job.Size) | ||||
|  | ||||
| 				select { | ||||
| 				case outputCh <- res: | ||||
| 				case <-ctx.Done(): | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	// wait until all the workers are done, then close outputCh | ||||
| 	wg.Go(func() error { | ||||
| 		workers.Wait() | ||||
| 		close(outputCh) | ||||
| 		return nil | ||||
| 	}) | ||||
|  | ||||
| 	idx = newIndex() | ||||
|  | ||||
| 	for res := range outputCh { | ||||
| 		p.Add(1) | ||||
| 		if res.Error != nil { | ||||
| 			cause := errors.Cause(res.Error) | ||||
| 			if _, ok := cause.(pack.InvalidFileError); ok { | ||||
| 				invalidFiles = append(invalidFiles, res.PackID) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", res.PackID, res.Error) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		debug.Log("pack %v contains %d blobs", res.PackID, len(res.Entries)) | ||||
|  | ||||
| 		err := idx.AddPack(res.PackID, res.Size, res.Entries) | ||||
| 		if err != nil { | ||||
| 			return nil, nil, err | ||||
| 		} | ||||
|  | ||||
| 		select { | ||||
| 		case <-ctx.Done(): // an error occurred | ||||
| 		default: | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	err = wg.Wait() | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
|  | ||||
| 	return idx, invalidFiles, nil | ||||
| } | ||||
|  | ||||
| type packJSON struct { | ||||
| 	ID    restic.ID  `json:"id"` | ||||
| 	Blobs []blobJSON `json:"blobs"` | ||||
| } | ||||
|  | ||||
| type blobJSON struct { | ||||
| 	ID     restic.ID       `json:"id"` | ||||
| 	Type   restic.BlobType `json:"type"` | ||||
| 	Offset uint            `json:"offset"` | ||||
| 	Length uint            `json:"length"` | ||||
| } | ||||
|  | ||||
| type indexJSON struct { | ||||
| 	Supersedes restic.IDs `json:"supersedes,omitempty"` | ||||
| 	Packs      []packJSON `json:"packs"` | ||||
| } | ||||
|  | ||||
| // ListLoader allows listing files and their content, in addition to loading and unmarshaling JSON files. | ||||
| type ListLoader interface { | ||||
| 	Lister | ||||
| 	LoadJSONUnpacked(context.Context, restic.FileType, restic.ID, interface{}) error | ||||
| } | ||||
|  | ||||
| func loadIndexJSON(ctx context.Context, repo ListLoader, id restic.ID) (*indexJSON, error) { | ||||
| 	debug.Log("process index %v\n", id) | ||||
|  | ||||
| 	var idx indexJSON | ||||
| 	err := repo.LoadJSONUnpacked(ctx, restic.IndexFile, id, &idx) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return &idx, nil | ||||
| } | ||||
|  | ||||
| // Load creates an index by loading all index files from the repo. | ||||
| func Load(ctx context.Context, repo ListLoader, p *progress.Counter) (*Index, error) { | ||||
| 	debug.Log("loading indexes") | ||||
|  | ||||
| 	supersedes := make(map[restic.ID]restic.IDSet) | ||||
| 	results := make(map[restic.ID]map[restic.ID]Pack) | ||||
|  | ||||
| 	index := newIndex() | ||||
|  | ||||
| 	err := repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { | ||||
| 		p.Add(1) | ||||
|  | ||||
| 		debug.Log("Load index %v", id) | ||||
| 		idx, err := loadIndexJSON(ctx, repo, id) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		res := make(map[restic.ID]Pack) | ||||
| 		supersedes[id] = restic.NewIDSet() | ||||
| 		for _, sid := range idx.Supersedes { | ||||
| 			debug.Log("  index %v supersedes %v", id, sid) | ||||
| 			supersedes[id].Insert(sid) | ||||
| 		} | ||||
|  | ||||
| 		for _, jpack := range idx.Packs { | ||||
| 			entries := make([]restic.Blob, 0, len(jpack.Blobs)) | ||||
| 			for _, blob := range jpack.Blobs { | ||||
| 				entry := restic.Blob{ | ||||
| 					ID:     blob.ID, | ||||
| 					Type:   blob.Type, | ||||
| 					Offset: blob.Offset, | ||||
| 					Length: blob.Length, | ||||
| 				} | ||||
| 				entries = append(entries, entry) | ||||
| 			} | ||||
|  | ||||
| 			if err = index.AddPack(jpack.ID, 0, entries); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		results[id] = res | ||||
| 		index.IndexIDs.Insert(id) | ||||
|  | ||||
| 		return nil | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	for superID, list := range supersedes { | ||||
| 		for indexID := range list { | ||||
| 			if _, ok := results[indexID]; !ok { | ||||
| 				continue | ||||
| 			} | ||||
| 			debug.Log("  removing index %v, superseded by %v", indexID, superID) | ||||
| 			fmt.Fprintf(os.Stderr, "index %v can be removed, superseded by index %v\n", indexID.Str(), superID.Str()) | ||||
| 			delete(results, indexID) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return index, nil | ||||
| } | ||||
|  | ||||
| // AddPack adds a pack to the index. If this pack is already in the index, an | ||||
| // error is returned. | ||||
| func (idx *Index) AddPack(id restic.ID, size int64, entries []restic.Blob) error { | ||||
| 	if _, ok := idx.Packs[id]; ok { | ||||
| 		return errors.Errorf("pack %v already present in the index", id.Str()) | ||||
| 	} | ||||
|  | ||||
| 	idx.Packs[id] = Pack{ID: id, Size: size, Entries: entries} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // RemovePack deletes a pack from the index. | ||||
| func (idx *Index) RemovePack(id restic.ID) error { | ||||
| 	if _, ok := idx.Packs[id]; !ok { | ||||
| 		return errors.Errorf("pack %v not found in the index", id.Str()) | ||||
| 	} | ||||
|  | ||||
| 	delete(idx.Packs, id) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // DuplicateBlobs returns a list of blobs that are stored more than once in the | ||||
| // repo. | ||||
| func (idx *Index) DuplicateBlobs() (dups restic.BlobSet) { | ||||
| 	dups = restic.NewBlobSet() | ||||
| 	seen := restic.NewBlobSet() | ||||
|  | ||||
| 	for _, p := range idx.Packs { | ||||
| 		for _, entry := range p.Entries { | ||||
| 			h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} | ||||
| 			if seen.Has(h) { | ||||
| 				dups.Insert(h) | ||||
| 			} | ||||
| 			seen.Insert(h) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return dups | ||||
| } | ||||
|  | ||||
| // PacksForBlobs returns the set of packs in which the blobs are contained. | ||||
| func (idx *Index) PacksForBlobs(blobs restic.BlobSet) (packs restic.IDSet) { | ||||
| 	packs = restic.NewIDSet() | ||||
|  | ||||
| 	for id, p := range idx.Packs { | ||||
| 		for _, entry := range p.Entries { | ||||
| 			if blobs.Has(restic.BlobHandle{ID: entry.ID, Type: entry.Type}) { | ||||
| 				packs.Insert(id) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return packs | ||||
| } | ||||
|  | ||||
| const maxEntries = 3000 | ||||
|  | ||||
| // Saver saves structures as JSON. | ||||
| type Saver interface { | ||||
| 	SaveJSONUnpacked(ctx context.Context, t restic.FileType, item interface{}) (restic.ID, error) | ||||
| } | ||||
|  | ||||
| // Save writes the complete index to the repo. | ||||
| func (idx *Index) Save(ctx context.Context, repo Saver, supersedes restic.IDs) (restic.IDs, error) { | ||||
| 	debug.Log("pack files: %d\n", len(idx.Packs)) | ||||
|  | ||||
| 	var indexIDs []restic.ID | ||||
|  | ||||
| 	packs := 0 | ||||
| 	jsonIDX := &indexJSON{ | ||||
| 		Supersedes: supersedes, | ||||
| 		Packs:      make([]packJSON, 0, maxEntries), | ||||
| 	} | ||||
|  | ||||
| 	for packID, pack := range idx.Packs { | ||||
| 		debug.Log("%04d add pack %v with %d entries", packs, packID, len(pack.Entries)) | ||||
| 		b := make([]blobJSON, 0, len(pack.Entries)) | ||||
| 		for _, blob := range pack.Entries { | ||||
| 			b = append(b, blobJSON{ | ||||
| 				ID:     blob.ID, | ||||
| 				Type:   blob.Type, | ||||
| 				Offset: blob.Offset, | ||||
| 				Length: blob.Length, | ||||
| 			}) | ||||
| 		} | ||||
|  | ||||
| 		p := packJSON{ | ||||
| 			ID:    packID, | ||||
| 			Blobs: b, | ||||
| 		} | ||||
|  | ||||
| 		jsonIDX.Packs = append(jsonIDX.Packs, p) | ||||
|  | ||||
| 		packs++ | ||||
| 		if packs == maxEntries { | ||||
| 			id, err := repo.SaveJSONUnpacked(ctx, restic.IndexFile, jsonIDX) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			debug.Log("saved new index as %v", id) | ||||
|  | ||||
| 			indexIDs = append(indexIDs, id) | ||||
| 			packs = 0 | ||||
| 			jsonIDX.Packs = jsonIDX.Packs[:0] | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if packs > 0 { | ||||
| 		id, err := repo.SaveJSONUnpacked(ctx, restic.IndexFile, jsonIDX) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		debug.Log("saved new index as %v", id) | ||||
| 		indexIDs = append(indexIDs, id) | ||||
| 	} | ||||
|  | ||||
| 	return indexIDs, nil | ||||
| } | ||||
| @@ -1,497 +0,0 @@ | ||||
| package index | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/restic/restic/internal/checker" | ||||
| 	"github.com/restic/restic/internal/errors" | ||||
| 	"github.com/restic/restic/internal/repository" | ||||
| 	"github.com/restic/restic/internal/restic" | ||||
| 	"github.com/restic/restic/internal/test" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	snapshotTime = time.Unix(1470492820, 207401672) | ||||
| 	depth        = 3 | ||||
| ) | ||||
|  | ||||
| func createFilledRepo(t testing.TB, snapshots int, dup float32) (restic.Repository, func()) { | ||||
| 	repo, cleanup := repository.TestRepository(t) | ||||
|  | ||||
| 	for i := 0; i < 3; i++ { | ||||
| 		restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth, dup) | ||||
| 	} | ||||
|  | ||||
| 	return repo, cleanup | ||||
| } | ||||
|  | ||||
| func validateIndex(t testing.TB, repo restic.Repository, idx *Index) { | ||||
| 	err := repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { | ||||
| 		p, ok := idx.Packs[id] | ||||
| 		if !ok { | ||||
| 			t.Errorf("pack %v missing from index", id.Str()) | ||||
| 		} | ||||
|  | ||||
| 		if !p.ID.Equal(id) { | ||||
| 			t.Errorf("pack %v has invalid ID: want %v, got %v", id.Str(), id, p.ID) | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestIndexNew(t *testing.T) { | ||||
| 	repo, cleanup := createFilledRepo(t, 3, 0) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	idx, invalid, err := New(context.TODO(), repo, restic.NewIDSet(), nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("New() returned error %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if idx == nil { | ||||
| 		t.Fatalf("New() returned nil index") | ||||
| 	} | ||||
|  | ||||
| 	if len(invalid) > 0 { | ||||
| 		t.Fatalf("New() returned invalid files: %v", invalid) | ||||
| 	} | ||||
|  | ||||
| 	validateIndex(t, repo, idx) | ||||
| } | ||||
|  | ||||
| type ErrorRepo struct { | ||||
| 	restic.Repository | ||||
| 	MaxListFiles int | ||||
|  | ||||
| 	MaxPacks      int | ||||
| 	MaxPacksMutex sync.Mutex | ||||
| } | ||||
|  | ||||
| // List returns an error after repo.MaxListFiles files. | ||||
| func (repo *ErrorRepo) List(ctx context.Context, t restic.FileType, fn func(restic.ID, int64) error) error { | ||||
| 	if repo.MaxListFiles == 0 { | ||||
| 		return errors.New("test error, max is zero") | ||||
| 	} | ||||
|  | ||||
| 	max := repo.MaxListFiles | ||||
| 	return repo.Repository.List(ctx, t, func(id restic.ID, size int64) error { | ||||
| 		if max == 0 { | ||||
| 			return errors.New("test error, max reached zero") | ||||
| 		} | ||||
|  | ||||
| 		max-- | ||||
| 		return fn(id, size) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // ListPack returns an error after repo.MaxPacks files. | ||||
| func (repo *ErrorRepo) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, int64, error) { | ||||
| 	repo.MaxPacksMutex.Lock() | ||||
| 	max := repo.MaxPacks | ||||
| 	if max > 0 { | ||||
| 		repo.MaxPacks-- | ||||
| 	} | ||||
| 	repo.MaxPacksMutex.Unlock() | ||||
|  | ||||
| 	if max == 0 { | ||||
| 		return nil, 0, errors.New("test list pack error") | ||||
| 	} | ||||
|  | ||||
| 	return repo.Repository.ListPack(ctx, id, size) | ||||
| } | ||||
|  | ||||
| func TestIndexNewListErrors(t *testing.T) { | ||||
| 	repo, cleanup := createFilledRepo(t, 3, 0) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	for _, max := range []int{0, 3, 5} { | ||||
| 		errRepo := &ErrorRepo{ | ||||
| 			Repository:   repo, | ||||
| 			MaxListFiles: max, | ||||
| 		} | ||||
| 		idx, invalid, err := New(context.TODO(), errRepo, restic.NewIDSet(), nil) | ||||
| 		if err == nil { | ||||
| 			t.Errorf("expected error not found, got nil") | ||||
| 		} | ||||
|  | ||||
| 		if idx != nil { | ||||
| 			t.Errorf("expected nil index, got %v", idx) | ||||
| 		} | ||||
|  | ||||
| 		if len(invalid) != 0 { | ||||
| 			t.Errorf("expected empty invalid list, got %v", invalid) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestIndexNewPackErrors(t *testing.T) { | ||||
| 	repo, cleanup := createFilledRepo(t, 3, 0) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	for _, max := range []int{0, 3, 5} { | ||||
| 		errRepo := &ErrorRepo{ | ||||
| 			Repository: repo, | ||||
| 			MaxPacks:   max, | ||||
| 		} | ||||
| 		idx, invalid, err := New(context.TODO(), errRepo, restic.NewIDSet(), nil) | ||||
| 		if err == nil { | ||||
| 			t.Errorf("expected error not found, got nil") | ||||
| 		} | ||||
|  | ||||
| 		if idx != nil { | ||||
| 			t.Errorf("expected nil index, got %v", idx) | ||||
| 		} | ||||
|  | ||||
| 		if len(invalid) != 0 { | ||||
| 			t.Errorf("expected empty invalid list, got %v", invalid) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestIndexLoad(t *testing.T) { | ||||
| 	repo, cleanup := createFilledRepo(t, 3, 0) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	loadIdx, err := Load(context.TODO(), repo, nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Load() returned error %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if loadIdx == nil { | ||||
| 		t.Fatalf("Load() returned nil index") | ||||
| 	} | ||||
|  | ||||
| 	validateIndex(t, repo, loadIdx) | ||||
|  | ||||
| 	newIdx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("New() returned error %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if len(loadIdx.Packs) != len(newIdx.Packs) { | ||||
| 		t.Errorf("number of packs does not match: want %v, got %v", | ||||
| 			len(loadIdx.Packs), len(newIdx.Packs)) | ||||
| 	} | ||||
|  | ||||
| 	validateIndex(t, repo, newIdx) | ||||
|  | ||||
| 	for packID, packNew := range newIdx.Packs { | ||||
| 		packLoad, ok := loadIdx.Packs[packID] | ||||
|  | ||||
| 		if !ok { | ||||
| 			t.Errorf("loaded index does not list pack %v", packID.Str()) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if len(packNew.Entries) != len(packLoad.Entries) { | ||||
| 			t.Errorf("  number of entries in pack %v does not match: %d != %d\n  %v\n  %v", | ||||
| 				packID.Str(), len(packNew.Entries), len(packLoad.Entries), | ||||
| 				packNew.Entries, packLoad.Entries) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		for _, entryNew := range packNew.Entries { | ||||
| 			found := false | ||||
| 			for _, entryLoad := range packLoad.Entries { | ||||
| 				if !entryLoad.ID.Equal(entryNew.ID) { | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				if entryLoad.Type != entryNew.Type { | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				if entryLoad.Offset != entryNew.Offset { | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				if entryLoad.Length != entryNew.Length { | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				found = true | ||||
| 				break | ||||
| 			} | ||||
|  | ||||
| 			if !found { | ||||
| 				t.Errorf("blob not found in loaded index: %v", entryNew) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkIndexNew(b *testing.B) { | ||||
| 	repo, cleanup := createFilledRepo(b, 3, 0) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) | ||||
|  | ||||
| 		if err != nil { | ||||
| 			b.Fatalf("New() returned error %v", err) | ||||
| 		} | ||||
|  | ||||
| 		if idx == nil { | ||||
| 			b.Fatalf("New() returned nil index") | ||||
| 		} | ||||
| 		b.Logf("idx %v packs", len(idx.Packs)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkIndexSave(b *testing.B) { | ||||
| 	repo, cleanup := repository.TestRepository(b) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) | ||||
| 	test.OK(b, err) | ||||
|  | ||||
| 	for i := 0; i < 8000; i++ { | ||||
| 		entries := make([]restic.Blob, 0, 200) | ||||
| 		for j := 0; j < cap(entries); j++ { | ||||
| 			entries = append(entries, restic.Blob{ | ||||
| 				ID:     restic.NewRandomID(), | ||||
| 				Length: 1000, | ||||
| 				Offset: 5, | ||||
| 				Type:   restic.DataBlob, | ||||
| 			}) | ||||
| 		} | ||||
|  | ||||
| 		idx.AddPack(restic.NewRandomID(), 10000, entries) | ||||
| 	} | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		ids, err := idx.Save(context.TODO(), repo, nil) | ||||
| 		if err != nil { | ||||
| 			b.Fatalf("New() returned error %v", err) | ||||
| 		} | ||||
|  | ||||
| 		b.Logf("saved as %v", ids) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestIndexDuplicateBlobs(t *testing.T) { | ||||
| 	repo, cleanup := createFilledRepo(t, 3, 0.05) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	dups := idx.DuplicateBlobs() | ||||
| 	if len(dups) == 0 { | ||||
| 		t.Errorf("no duplicate blobs found") | ||||
| 	} | ||||
| 	t.Logf("%d packs, %d duplicate blobs", len(idx.Packs), len(dups)) | ||||
|  | ||||
| 	packs := idx.PacksForBlobs(dups) | ||||
| 	if len(packs) == 0 { | ||||
| 		t.Errorf("no packs with duplicate blobs found") | ||||
| 	} | ||||
| 	t.Logf("%d packs with duplicate blobs", len(packs)) | ||||
| } | ||||
|  | ||||
| func loadIndex(t testing.TB, repo restic.Repository) *Index { | ||||
| 	idx, err := Load(context.TODO(), repo, nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Load() returned error %v", err) | ||||
| 	} | ||||
|  | ||||
| 	return idx | ||||
| } | ||||
|  | ||||
| func TestIndexSave(t *testing.T) { | ||||
| 	repo, cleanup := createFilledRepo(t, 3, 0) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	idx := loadIndex(t, repo) | ||||
|  | ||||
| 	ids, err := idx.Save(context.TODO(), repo, idx.IndexIDs.List()) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unable to save new index: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	t.Logf("new index saved as %v", ids) | ||||
|  | ||||
| 	for id := range idx.IndexIDs { | ||||
| 		t.Logf("remove index %v", id.Str()) | ||||
| 		h := restic.Handle{Type: restic.IndexFile, Name: id.String()} | ||||
| 		err = repo.Backend().Remove(context.TODO(), h) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("error removing index %v: %v", id, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	idx2 := loadIndex(t, repo) | ||||
| 	t.Logf("load new index with %d packs", len(idx2.Packs)) | ||||
|  | ||||
| 	checker := checker.New(repo) | ||||
| 	hints, errs := checker.LoadIndex(context.TODO()) | ||||
| 	for _, h := range hints { | ||||
| 		t.Logf("hint: %v\n", h) | ||||
| 	} | ||||
|  | ||||
| 	for _, err := range errs { | ||||
| 		t.Errorf("checker found error: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := context.WithCancel(context.TODO()) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	errCh := make(chan error) | ||||
| 	go checker.Structure(ctx, errCh) | ||||
| 	i := 0 | ||||
| 	for err := range errCh { | ||||
| 		t.Errorf("checker returned error: %v", err) | ||||
| 		i++ | ||||
| 		if i == 10 { | ||||
| 			t.Errorf("more than 10 errors returned, skipping the rest") | ||||
| 			cancel() | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Location describes the location of a blob in a pack. | ||||
| type location struct { | ||||
| 	PackID restic.ID | ||||
| 	restic.Blob | ||||
| } | ||||
|  | ||||
| // FindBlob returns a list of packs and positions the blob can be found in. | ||||
| func (idx *Index) findBlob(h restic.BlobHandle) (result []location) { | ||||
| 	for id, p := range idx.Packs { | ||||
| 		for _, entry := range p.Entries { | ||||
| 			if entry.ID.Equal(h.ID) && entry.Type == h.Type { | ||||
| 				result = append(result, location{ | ||||
| 					PackID: id, | ||||
| 					Blob:   entry, | ||||
| 				}) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return result | ||||
| } | ||||
|  | ||||
| func TestIndexAddRemovePack(t *testing.T) { | ||||
| 	repo, cleanup := createFilledRepo(t, 3, 0) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	idx, err := Load(context.TODO(), repo, nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Load() returned error %v", err) | ||||
| 	} | ||||
|  | ||||
| 	var packID restic.ID | ||||
| 	err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { | ||||
| 		packID = id | ||||
| 		return nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	t.Logf("selected pack %v", packID.Str()) | ||||
|  | ||||
| 	blobs := idx.Packs[packID].Entries | ||||
|  | ||||
| 	idx.RemovePack(packID) | ||||
|  | ||||
| 	if _, ok := idx.Packs[packID]; ok { | ||||
| 		t.Errorf("removed pack %v found in index.Packs", packID.Str()) | ||||
| 	} | ||||
|  | ||||
| 	for _, blob := range blobs { | ||||
| 		h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} | ||||
| 		locs := idx.findBlob(h) | ||||
| 		if len(locs) != 0 { | ||||
| 			t.Errorf("removed blob %v found in index", h) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // example index serialization from doc/Design.rst | ||||
| var docExample = []byte(` | ||||
| { | ||||
|   "supersedes": [ | ||||
| 	"ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452" | ||||
|   ], | ||||
|   "packs": [ | ||||
| 	{ | ||||
| 	  "id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c", | ||||
| 	  "blobs": [ | ||||
| 		{ | ||||
| 		  "id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce", | ||||
| 		  "type": "data", | ||||
| 		  "offset": 0, | ||||
| 		  "length": 25 | ||||
| 		},{ | ||||
| 		  "id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae", | ||||
| 		  "type": "tree", | ||||
| 		  "offset": 38, | ||||
| 		  "length": 100 | ||||
| 		}, | ||||
| 		{ | ||||
| 		  "id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66", | ||||
| 		  "type": "data", | ||||
| 		  "offset": 150, | ||||
| 		  "length": 123 | ||||
| 		} | ||||
| 	  ] | ||||
| 	} | ||||
|   ] | ||||
| } | ||||
| `) | ||||
|  | ||||
| func TestIndexLoadDocReference(t *testing.T) { | ||||
| 	repo, cleanup := repository.TestRepository(t) | ||||
| 	defer cleanup() | ||||
|  | ||||
| 	id, err := repo.SaveUnpacked(context.TODO(), restic.IndexFile, docExample) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("SaveUnpacked() returned error %v", err) | ||||
| 	} | ||||
|  | ||||
| 	t.Logf("index saved as %v", id.Str()) | ||||
|  | ||||
| 	idx := loadIndex(t, repo) | ||||
|  | ||||
| 	blobID := restic.TestParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66") | ||||
| 	locs := idx.findBlob(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) | ||||
| 	if len(locs) == 0 { | ||||
| 		t.Error("blob not found in index") | ||||
| 	} | ||||
|  | ||||
| 	if len(locs) != 1 { | ||||
| 		t.Errorf("blob found %d times, expected just one", len(locs)) | ||||
| 	} | ||||
|  | ||||
| 	l := locs[0] | ||||
| 	if !l.ID.Equal(blobID) { | ||||
| 		t.Errorf("blob IDs are not equal: %v != %v", l.ID, blobID) | ||||
| 	} | ||||
|  | ||||
| 	if l.Type != restic.DataBlob { | ||||
| 		t.Errorf("want type %v, got %v", restic.DataBlob, l.Type) | ||||
| 	} | ||||
|  | ||||
| 	if l.Offset != 150 { | ||||
| 		t.Errorf("wrong offset, want %d, got %v", 150, l.Offset) | ||||
| 	} | ||||
|  | ||||
| 	if l.Length != 123 { | ||||
| 		t.Errorf("wrong length, want %d, got %v", 123, l.Length) | ||||
| 	} | ||||
| } | ||||
| @@ -281,7 +281,10 @@ type EachByPackResult struct { | ||||
| } | ||||
|  | ||||
| // EachByPack returns a channel that yields all blobs known to the index | ||||
| // grouped by packID but ignoring blobs with a packID in packPlacklist. | ||||
| // grouped by packID but ignoring blobs with a packID in packPlacklist for | ||||
| // finalized indexes. | ||||
| // This filtering is used when rebuilding the index where we need to ignore packs | ||||
| // from the finalized index which have been re-read into a non-finalized index. | ||||
| // When the  context is cancelled, the background goroutine | ||||
| // terminates. This blocks any modification of the index. | ||||
| func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult { | ||||
| @@ -300,7 +303,7 @@ func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <- | ||||
| 			m := &idx.byType[typ] | ||||
| 			m.foreach(func(e *indexEntry) bool { | ||||
| 				packID := idx.packs[e.packIndex] | ||||
| 				if !packBlacklist.Has(packID) { | ||||
| 				if !idx.final || !packBlacklist.Has(packID) { | ||||
| 					byPack[packID] = append(byPack[packID], e) | ||||
| 				} | ||||
| 				return true | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
| 	"github.com/restic/restic/internal/debug" | ||||
| 	"github.com/restic/restic/internal/restic" | ||||
| 	"github.com/restic/restic/internal/ui/progress" | ||||
| 	"golang.org/x/sync/errgroup" | ||||
| ) | ||||
|  | ||||
| // MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. | ||||
| @@ -261,12 +262,14 @@ func (mi *MasterIndex) MergeFinalIndexes() { | ||||
| 	mi.idx = newIdx | ||||
| } | ||||
|  | ||||
| const saveIndexParallelism = 4 | ||||
|  | ||||
| // Save saves all known indexes to index files, leaving out any | ||||
| // packs whose ID is contained in packBlacklist. The new index contains the IDs | ||||
| // of all known indexes in the "supersedes" field. The IDs are also returned in | ||||
| // the IDSet obsolete | ||||
| // packs whose ID is contained in packBlacklist from finalized indexes. | ||||
| // The new index contains the IDs of all known indexes in the "supersedes" | ||||
| // field. The IDs are also returned in the IDSet obsolete. | ||||
| // After calling this function, you should remove the obsolete index files. | ||||
| func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, p *progress.Counter) (obsolete restic.IDSet, err error) { | ||||
| func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) { | ||||
| 	mi.idxMutex.Lock() | ||||
| 	defer mi.idxMutex.Unlock() | ||||
|  | ||||
| @@ -275,28 +278,27 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla | ||||
| 	newIndex := NewIndex() | ||||
| 	obsolete = restic.NewIDSet() | ||||
|  | ||||
| 	finalize := func() error { | ||||
| 		newIndex.Finalize() | ||||
| 		if _, err := SaveIndex(ctx, repo, newIndex); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		newIndex = NewIndex() | ||||
| 		return nil | ||||
| 	} | ||||
| 	// track spawned goroutines using wg, create a new context which is | ||||
| 	// cancelled as soon as an error occurs. | ||||
| 	wg, ctx := errgroup.WithContext(ctx) | ||||
|  | ||||
| 	ch := make(chan *Index) | ||||
|  | ||||
| 	wg.Go(func() error { | ||||
| 		defer close(ch) | ||||
| 		for i, idx := range mi.idx { | ||||
| 			if idx.Final() { | ||||
| 				ids, err := idx.IDs() | ||||
| 				if err != nil { | ||||
| 					debug.Log("index %d does not have an ID: %v", err) | ||||
| 				return nil, err | ||||
| 					return err | ||||
| 				} | ||||
|  | ||||
| 				debug.Log("adding index ids %v to supersedes field", ids) | ||||
|  | ||||
| 				err = newIndex.AddToSupersedes(ids...) | ||||
| 				if err != nil { | ||||
| 				return nil, err | ||||
| 					return err | ||||
| 				} | ||||
| 				obsolete.Merge(restic.NewIDSet(ids...)) | ||||
| 			} else { | ||||
| @@ -309,15 +311,46 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla | ||||
| 				newIndex.StorePack(pbs.packID, pbs.blobs) | ||||
| 				p.Add(1) | ||||
| 				if IndexFull(newIndex) { | ||||
| 				if err := finalize(); err != nil { | ||||
| 					return nil, err | ||||
| 					select { | ||||
| 					case ch <- newIndex: | ||||
| 					case <-ctx.Done(): | ||||
| 						return nil | ||||
| 					} | ||||
| 					newIndex = NewIndex() | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	if err := finalize(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return | ||||
| 		err = newIndex.AddToSupersedes(extraObsolete...) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		obsolete.Merge(restic.NewIDSet(extraObsolete...)) | ||||
|  | ||||
| 		select { | ||||
| 		case ch <- newIndex: | ||||
| 		case <-ctx.Done(): | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
|  | ||||
| 	// a worker receives an index from ch, and saves the index | ||||
| 	worker := func() error { | ||||
| 		for idx := range ch { | ||||
| 			idx.Finalize() | ||||
| 			if _, err := SaveIndex(ctx, repo, idx); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// run workers on ch | ||||
| 	wg.Go(func() error { | ||||
| 		return RunWorkers(saveIndexParallelism, worker) | ||||
| 	}) | ||||
|  | ||||
| 	err = wg.Wait() | ||||
|  | ||||
| 	return obsolete, err | ||||
| } | ||||
|   | ||||
| @@ -346,7 +346,7 @@ func TestIndexSave(t *testing.T) { | ||||
|  | ||||
| 	repo.LoadIndex(context.TODO()) | ||||
|  | ||||
| 	obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil) | ||||
| 	obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil, nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unable to save new index: %v", err) | ||||
| 	} | ||||
|   | ||||
| @@ -6,7 +6,6 @@ import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/restic/restic/internal/index" | ||||
| 	"github.com/restic/restic/internal/repository" | ||||
| 	"github.com/restic/restic/internal/restic" | ||||
| ) | ||||
| @@ -163,7 +162,21 @@ func saveIndex(t *testing.T, repo restic.Repository) { | ||||
| } | ||||
|  | ||||
| func rebuildIndex(t *testing.T, repo restic.Repository) { | ||||
| 	idx, _, err := index.New(context.TODO(), repo, restic.NewIDSet(), nil) | ||||
| 	err := repo.SetIndex(repository.NewMasterIndex()) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	packs := make(map[restic.ID]int64) | ||||
| 	err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { | ||||
| 		packs[id] = size | ||||
| 		return nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	_, err = repo.(*repository.Repository).CreateIndexFromPacks(context.TODO(), packs, nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| @@ -179,7 +192,9 @@ func rebuildIndex(t *testing.T, repo restic.Repository) { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	_, err = idx.Save(context.TODO(), repo, nil) | ||||
| 	_, err = (repo.Index()).(*repository.MasterIndex). | ||||
| 		Save(context.TODO(), repo, restic.NewIDSet(), nil, nil) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/restic/chunker" | ||||
| 	"github.com/restic/restic/internal/cache" | ||||
| @@ -17,6 +18,7 @@ import ( | ||||
| 	"github.com/restic/restic/internal/hashing" | ||||
| 	"github.com/restic/restic/internal/pack" | ||||
| 	"github.com/restic/restic/internal/restic" | ||||
| 	"github.com/restic/restic/internal/ui/progress" | ||||
|  | ||||
| 	"github.com/minio/sha256-simd" | ||||
| 	"golang.org/x/sync/errgroup" | ||||
| @@ -515,6 +517,73 @@ func (r *Repository) LoadIndex(ctx context.Context) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| const listPackParallelism = 10 | ||||
|  | ||||
| // CreateIndexFromPacks creates a new index by reading all given pack files (with sizes). | ||||
| // The index is added to the MasterIndex but not marked as finalized. | ||||
| // Returned is the list of pack files which could not be read. | ||||
| func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) { | ||||
| 	var m sync.Mutex | ||||
|  | ||||
| 	debug.Log("Loading index from pack files") | ||||
|  | ||||
| 	// track spawned goroutines using wg, create a new context which is | ||||
| 	// cancelled as soon as an error occurs. | ||||
| 	wg, ctx := errgroup.WithContext(ctx) | ||||
|  | ||||
| 	type FileInfo struct { | ||||
| 		restic.ID | ||||
| 		Size int64 | ||||
| 	} | ||||
| 	ch := make(chan FileInfo) | ||||
|  | ||||
| 	// send list of pack files through ch, which is closed afterwards | ||||
| 	wg.Go(func() error { | ||||
| 		defer close(ch) | ||||
| 		for id, size := range packsize { | ||||
| 			select { | ||||
| 			case <-ctx.Done(): | ||||
| 				return nil | ||||
| 			case ch <- FileInfo{id, size}: | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
|  | ||||
| 	idx := NewIndex() | ||||
| 	// a worker receives an pack ID from ch, reads the pack contents, and adds them to idx | ||||
| 	worker := func() error { | ||||
| 		for fi := range ch { | ||||
| 			entries, _, err := r.ListPack(ctx, fi.ID, fi.Size) | ||||
| 			if err != nil { | ||||
| 				debug.Log("unable to list pack file %v", fi.ID.Str()) | ||||
| 				m.Lock() | ||||
| 				invalid = append(invalid, fi.ID) | ||||
| 				m.Unlock() | ||||
| 			} | ||||
| 			idx.StorePack(fi.ID, entries) | ||||
| 			p.Add(1) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// run workers on ch | ||||
| 	wg.Go(func() error { | ||||
| 		return RunWorkers(listPackParallelism, worker) | ||||
| 	}) | ||||
|  | ||||
| 	err = wg.Wait() | ||||
| 	if err != nil { | ||||
| 		return invalid, errors.Fatal(err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	// Add idx to MasterIndex | ||||
| 	r.idx.Insert(idx) | ||||
|  | ||||
| 	return invalid, nil | ||||
| } | ||||
|  | ||||
| // PrepareCache initializes the local cache. indexIDs is the list of IDs of | ||||
| // index files still present in the repo. | ||||
| func (r *Repository) PrepareCache(indexIDs restic.IDSet) error { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Alexander Neumann
					Alexander Neumann