mirror of
https://github.com/restic/restic.git
synced 2025-12-10 19:41:46 +00:00
check: refactor pack selection for read data
Drop the `packs` map from the internal state of the checker. Instead the Packs(...) method now calls a filter callback that can select the packs intended for checking.
This commit is contained in:
@@ -230,6 +230,11 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
|
||||
printer = newJSONErrorPrinter(term)
|
||||
}
|
||||
|
||||
readDataFilter, err := buildPacksFilter(opts, printer)
|
||||
if err != nil {
|
||||
return summary, err
|
||||
}
|
||||
|
||||
cleanup := prepareCheckCache(opts, &gopts, printer)
|
||||
defer cleanup()
|
||||
|
||||
@@ -370,12 +375,11 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
|
||||
}
|
||||
}
|
||||
|
||||
doReadData := func(packs map[restic.ID]int64) {
|
||||
if readDataFilter != nil {
|
||||
p := printer.NewCounter("packs")
|
||||
p.SetMax(uint64(len(packs)))
|
||||
errChan := make(chan error)
|
||||
|
||||
go chkr.ReadPacks(ctx, packs, p, errChan)
|
||||
go chkr.ReadPacks(ctx, readDataFilter, p, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
errorsFound = true
|
||||
@@ -388,48 +392,6 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
|
||||
p.Done()
|
||||
}
|
||||
|
||||
switch {
|
||||
case opts.ReadData:
|
||||
printer.P("read all data\n")
|
||||
doReadData(selectPacksByBucket(chkr.GetPacks(), 1, 1))
|
||||
case opts.ReadDataSubset != "":
|
||||
var packs map[restic.ID]int64
|
||||
dataSubset, err := stringToIntSlice(opts.ReadDataSubset)
|
||||
if err == nil {
|
||||
bucket := dataSubset[0]
|
||||
totalBuckets := dataSubset[1]
|
||||
packs = selectPacksByBucket(chkr.GetPacks(), bucket, totalBuckets)
|
||||
packCount := uint64(len(packs))
|
||||
printer.P("read group #%d of %d data packs (out of total %d packs in %d groups)\n", bucket, packCount, chkr.CountPacks(), totalBuckets)
|
||||
} else if strings.HasSuffix(opts.ReadDataSubset, "%") {
|
||||
percentage, err := parsePercentage(opts.ReadDataSubset)
|
||||
if err == nil {
|
||||
packs = selectRandomPacksByPercentage(chkr.GetPacks(), percentage)
|
||||
printer.P("read %.1f%% of data packs\n", percentage)
|
||||
}
|
||||
} else {
|
||||
repoSize := int64(0)
|
||||
allPacks := chkr.GetPacks()
|
||||
for _, size := range allPacks {
|
||||
repoSize += size
|
||||
}
|
||||
if repoSize == 0 {
|
||||
return summary, errors.Fatal("Cannot read from a repository having size 0")
|
||||
}
|
||||
subsetSize, _ := ui.ParseBytes(opts.ReadDataSubset)
|
||||
if subsetSize > repoSize {
|
||||
subsetSize = repoSize
|
||||
}
|
||||
packs = selectRandomPacksByFileSize(chkr.GetPacks(), subsetSize, repoSize)
|
||||
percentage := float64(subsetSize) / float64(repoSize) * 100.0
|
||||
printer.P("read %d bytes (%.1f%%) of data packs\n", subsetSize, percentage)
|
||||
}
|
||||
if packs == nil {
|
||||
return summary, errors.Fatal("internal error: failed to select packs to check")
|
||||
}
|
||||
doReadData(packs)
|
||||
}
|
||||
|
||||
if len(salvagePacks) > 0 {
|
||||
printer.E("\nThe repository contains damaged pack files. These damaged files must be removed to repair the repository. This can be done using the following commands. Please read the troubleshooting guide at https://restic.readthedocs.io/en/stable/077_troubleshooting.html first.\n\n")
|
||||
for id := range salvagePacks {
|
||||
@@ -453,6 +415,59 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
func buildPacksFilter(opts CheckOptions, printer progress.Printer) (func(packs map[restic.ID]int64) map[restic.ID]int64, error) {
|
||||
switch {
|
||||
case opts.ReadData:
|
||||
return func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
printer.P("read all data\n")
|
||||
return packs
|
||||
}, nil
|
||||
case opts.ReadDataSubset != "":
|
||||
dataSubset, err := stringToIntSlice(opts.ReadDataSubset)
|
||||
if err == nil {
|
||||
bucket := dataSubset[0]
|
||||
totalBuckets := dataSubset[1]
|
||||
return func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
packCount := uint64(len(packs))
|
||||
packs = selectPacksByBucket(packs, bucket, totalBuckets)
|
||||
printer.P("read group #%d of %d data packs (out of total %d packs in %d groups)\n", bucket, len(packs), packCount, totalBuckets)
|
||||
return packs
|
||||
}, nil
|
||||
} else if strings.HasSuffix(opts.ReadDataSubset, "%") {
|
||||
percentage, err := parsePercentage(opts.ReadDataSubset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
printer.P("read %.1f%% of data packs\n", percentage)
|
||||
return selectRandomPacksByPercentage(packs, percentage)
|
||||
}, nil
|
||||
}
|
||||
|
||||
repoSize := int64(0)
|
||||
return func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
for _, size := range packs {
|
||||
repoSize += size
|
||||
}
|
||||
subsetSize, _ := ui.ParseBytes(opts.ReadDataSubset)
|
||||
if subsetSize > repoSize {
|
||||
subsetSize = repoSize
|
||||
}
|
||||
if repoSize > 0 {
|
||||
packs = selectRandomPacksByFileSize(packs, subsetSize, repoSize)
|
||||
}
|
||||
percentage := float64(subsetSize) / float64(repoSize) * 100.0
|
||||
if repoSize == 0 {
|
||||
percentage = 100
|
||||
}
|
||||
printer.P("read %d bytes (%.1f%%) of data packs\n", subsetSize, percentage)
|
||||
return packs
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// selectPacksByBucket selects subsets of packs by ranges of buckets.
|
||||
func selectPacksByBucket(allPacks map[restic.ID]int64, bucket, totalBuckets uint) map[restic.ID]int64 {
|
||||
packs := make(map[restic.ID]int64)
|
||||
|
||||
@@ -60,7 +60,9 @@ func checkData(chkr *checker.Checker) []error {
|
||||
return collectErrors(
|
||||
context.TODO(),
|
||||
func(ctx context.Context, errCh chan<- error) {
|
||||
chkr.ReadData(ctx, errCh)
|
||||
chkr.ReadPacks(ctx, func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, nil, errCh)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package checker
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// TestCheckRepo runs the checker on repo.
|
||||
@@ -50,7 +52,9 @@ func TestCheckRepo(t testing.TB, repo checkerRepository) {
|
||||
|
||||
// read data
|
||||
errChan = make(chan error)
|
||||
go chkr.ReadData(context.TODO(), errChan)
|
||||
go chkr.ReadPacks(context.TODO(), func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, nil, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
|
||||
@@ -50,15 +50,13 @@ func (e *PackError) Error() string {
|
||||
|
||||
// Checker handles index-related operations for repository checking.
|
||||
type Checker struct {
|
||||
packs map[restic.ID]int64
|
||||
repo *Repository
|
||||
repo *Repository
|
||||
}
|
||||
|
||||
// NewChecker creates a new Checker.
|
||||
func NewChecker(repo *Repository) *Checker {
|
||||
return &Checker{
|
||||
packs: make(map[restic.ID]int64),
|
||||
repo: repo,
|
||||
repo: repo,
|
||||
}
|
||||
}
|
||||
func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) {
|
||||
@@ -111,18 +109,13 @@ func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory
|
||||
return hints, append(errs, err)
|
||||
}
|
||||
|
||||
// compute pack size using index entries
|
||||
c.packs, err = pack.Size(ctx, c.repo, false)
|
||||
if err != nil {
|
||||
return hints, append(errs, err)
|
||||
}
|
||||
packTypes, err := computePackTypes(ctx, c.repo)
|
||||
if err != nil {
|
||||
return hints, append(errs, err)
|
||||
}
|
||||
|
||||
debug.Log("checking for duplicate packs")
|
||||
for packID := range c.packs {
|
||||
for packID := range packTypes {
|
||||
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
|
||||
if len(packToIndex[packID]) > 1 {
|
||||
hints = append(hints, &ErrDuplicatePacks{
|
||||
@@ -145,12 +138,20 @@ func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory
|
||||
// packs have been checked.
|
||||
func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
||||
defer close(errChan)
|
||||
debug.Log("checking for %d packs", len(c.packs))
|
||||
|
||||
// compute pack size using index entries
|
||||
packs, err := pack.Size(ctx, c.repo, false)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
debug.Log("checking for %d packs", len(packs))
|
||||
|
||||
debug.Log("listing repository packs")
|
||||
repoPacks := make(map[restic.ID]int64)
|
||||
|
||||
err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
|
||||
err = c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
|
||||
repoPacks[id] = size
|
||||
return nil
|
||||
})
|
||||
@@ -159,7 +160,7 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
for id, size := range c.packs {
|
||||
for id, size := range packs {
|
||||
reposize, ok := repoPacks[id]
|
||||
// remove from repoPacks so we can find orphaned packs
|
||||
delete(repoPacks, id)
|
||||
@@ -194,25 +195,19 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
||||
}
|
||||
}
|
||||
|
||||
// CountPacks returns the number of packs in the repository.
|
||||
func (c *Checker) CountPacks() uint64 {
|
||||
return uint64(len(c.packs))
|
||||
}
|
||||
|
||||
// GetPacks returns IDSet of packs in the repository
|
||||
func (c *Checker) GetPacks() map[restic.ID]int64 {
|
||||
return c.packs
|
||||
}
|
||||
|
||||
// ReadData loads all data from the repository and checks the integrity.
|
||||
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
|
||||
c.ReadPacks(ctx, c.packs, nil, errChan)
|
||||
}
|
||||
|
||||
// ReadPacks loads data from specified packs and checks the integrity.
|
||||
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
|
||||
defer close(errChan)
|
||||
|
||||
// compute pack size using index entries
|
||||
packs, err := pack.Size(ctx, c.repo, false)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
packs = filter(packs)
|
||||
p.SetMax(uint64(len(packs)))
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
type checkTask struct {
|
||||
id restic.ID
|
||||
@@ -276,7 +271,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
|
||||
}
|
||||
close(ch)
|
||||
|
||||
err := g.Wait()
|
||||
err = g.Wait()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -186,7 +186,9 @@ func TestCheckRepo(t testing.TB, repo *Repository) {
|
||||
|
||||
// read data
|
||||
errChan = make(chan error)
|
||||
go chkr.ReadData(context.TODO(), errChan)
|
||||
go chkr.ReadPacks(context.TODO(), func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, nil, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
|
||||
Reference in New Issue
Block a user