Merge pull request #5532 from MichaelEischer/checker-cleanup

Replace Repository.SetIndex with internal helper
This commit is contained in:
Michael Eischer
2025-10-03 20:08:14 +02:00
committed by GitHub
15 changed files with 387 additions and 360 deletions

View File

@@ -257,10 +257,10 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
errorsFound := false errorsFound := false
for _, hint := range hints { for _, hint := range hints {
switch hint.(type) { switch hint.(type) {
case *checker.ErrDuplicatePacks: case *repository.ErrDuplicatePacks:
printer.S("%s", hint.Error()) printer.S("%s", hint.Error())
summary.HintRepairIndex = true summary.HintRepairIndex = true
case *checker.ErrMixedPack: case *repository.ErrMixedPack:
printer.S("%s", hint.Error()) printer.S("%s", hint.Error())
summary.HintPrune = true summary.HintPrune = true
default: default:
@@ -295,7 +295,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
go chkr.Packs(ctx, errChan) go chkr.Packs(ctx, errChan)
for err := range errChan { for err := range errChan {
var packErr *checker.PackError var packErr *repository.PackError
if errors.As(err, &packErr) { if errors.As(err, &packErr) {
if packErr.Orphaned { if packErr.Orphaned {
orphanedPacks++ orphanedPacks++

View File

@@ -29,7 +29,7 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func prepareTempdirRepoSrc(t testing.TB, src TestDir) (string, restic.Repository) { func prepareTempdirRepoSrc(t testing.TB, src TestDir) (string, *repository.Repository) {
tempdir := rtest.TempDir(t) tempdir := rtest.TempDir(t)
repo := repository.TestRepository(t) repo := repository.TestRepository(t)
@@ -1421,7 +1421,7 @@ func TestArchiverSnapshot(t *testing.T) {
} }
TestEnsureSnapshot(t, repo, snapshotID, want) TestEnsureSnapshot(t, repo, snapshotID, want)
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
// check that the snapshot contains the targets with absolute paths // check that the snapshot contains the targets with absolute paths
for i, target := range sn.Paths { for i, target := range sn.Paths {
@@ -1641,7 +1641,7 @@ func TestArchiverSnapshotSelect(t *testing.T) {
} }
TestEnsureSnapshot(t, repo, snapshotID, want) TestEnsureSnapshot(t, repo, snapshotID, want)
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
}) })
} }
} }
@@ -1866,7 +1866,7 @@ func TestArchiverParent(t *testing.T) {
t.Logf("testfs: %v", testFS) t.Logf("testfs: %v", testFS)
} }
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
}) })
} }
} }
@@ -2021,7 +2021,7 @@ func TestArchiverErrorReporting(t *testing.T) {
} }
TestEnsureSnapshot(t, repo, snapshotID, want) TestEnsureSnapshot(t, repo, snapshotID, want)
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
}) })
} }
} }
@@ -2384,7 +2384,7 @@ func TestMetadataChanged(t *testing.T) {
// make sure the content matches // make sure the content matches
TestEnsureFileContent(context.Background(), t, repo, "testfile", node3, files["testfile"].(TestFile)) TestEnsureFileContent(context.Background(), t, repo, "testfile", node3, files["testfile"].(TestFile))
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
} }
func TestRacyFileTypeSwap(t *testing.T) { func TestRacyFileTypeSwap(t *testing.T) {

View File

@@ -1,19 +1,15 @@
package checker package checker
import ( import (
"bufio"
"context" "context"
"fmt" "fmt"
"runtime" "runtime"
"sync" "sync"
"github.com/klauspost/compress/zstd"
"github.com/restic/restic/internal/data" "github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/repository/pack"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress" "github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@@ -25,24 +21,27 @@ import (
// A Checker only tests for internal errors within the data structures of the // A Checker only tests for internal errors within the data structures of the
// repository (e.g. missing blobs), and needs a valid Repository to work on. // repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct { type Checker struct {
packs map[restic.ID]int64 *repository.Checker
blobRefs struct { blobRefs struct {
sync.Mutex sync.Mutex
M restic.BlobSet M restic.BlobSet
} }
trackUnused bool trackUnused bool
masterIndex *index.MasterIndex snapshots restic.Lister
snapshots restic.Lister
repo restic.Repository repo restic.Repository
} }
type checkerRepository interface {
restic.Repository
Checker() *repository.Checker
}
// New returns a new checker which runs on repo. // New returns a new checker which runs on repo.
func New(repo restic.Repository, trackUnused bool) *Checker { func New(repo checkerRepository, trackUnused bool) *Checker {
c := &Checker{ c := &Checker{
packs: make(map[restic.ID]int64), Checker: repo.Checker(),
masterIndex: index.NewMasterIndex(),
repo: repo, repo: repo,
trackUnused: trackUnused, trackUnused: trackUnused,
} }
@@ -52,186 +51,12 @@ func New(repo restic.Repository, trackUnused bool) *Checker {
return c return c
} }
// ErrDuplicatePacks is returned when a pack is found in more than one index.
type ErrDuplicatePacks struct {
PackID restic.ID
Indexes restic.IDSet
}
func (e *ErrDuplicatePacks) Error() string {
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID, e.Indexes)
}
// ErrMixedPack is returned when a pack is found that contains both tree and data blobs.
type ErrMixedPack struct {
PackID restic.ID
}
func (e *ErrMixedPack) Error() string {
return fmt.Sprintf("pack %v contains a mix of tree and data blobs", e.PackID.Str())
}
func (c *Checker) LoadSnapshots(ctx context.Context) error { func (c *Checker) LoadSnapshots(ctx context.Context) error {
var err error var err error
c.snapshots, err = restic.MemorizeList(ctx, c.repo, restic.SnapshotFile) c.snapshots, err = restic.MemorizeList(ctx, c.repo, restic.SnapshotFile)
return err return err
} }
func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) {
packs := make(map[restic.ID]restic.BlobType)
err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) {
tpe, exists := packs[pb.PackID]
if exists {
if pb.Type != tpe {
tpe = restic.InvalidBlob
}
} else {
tpe = pb.Type
}
packs[pb.PackID] = tpe
})
return packs, err
}
// LoadIndex loads all index files.
func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) (hints []error, errs []error) {
debug.Log("Start")
var bar *progress.Counter
if p != nil {
bar = p.NewCounterTerminalOnly("index files loaded")
}
packToIndex := make(map[restic.ID]restic.IDSet)
err := c.masterIndex.Load(ctx, c.repo, bar, func(id restic.ID, idx *index.Index, err error) error {
debug.Log("process index %v, err %v", id, err)
err = errors.Wrapf(err, "error loading index %v", id)
if err != nil {
errs = append(errs, err)
return nil
}
debug.Log("process blobs")
cnt := 0
err = idx.Each(ctx, func(blob restic.PackedBlob) {
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
packToIndex[blob.PackID] = restic.NewIDSet()
}
packToIndex[blob.PackID].Insert(id)
})
debug.Log("%d blobs processed", cnt)
return err
})
if err != nil {
// failed to load the index
return hints, append(errs, err)
}
err = c.repo.SetIndex(c.masterIndex)
if err != nil {
debug.Log("SetIndex returned error: %v", err)
errs = append(errs, err)
}
// compute pack size using index entries
c.packs, err = pack.Size(ctx, c.repo, false)
if err != nil {
return hints, append(errs, err)
}
packTypes, err := computePackTypes(ctx, c.repo)
if err != nil {
return hints, append(errs, err)
}
debug.Log("checking for duplicate packs")
for packID := range c.packs {
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
if len(packToIndex[packID]) > 1 {
hints = append(hints, &ErrDuplicatePacks{
PackID: packID,
Indexes: packToIndex[packID],
})
}
if packTypes[packID] == restic.InvalidBlob {
hints = append(hints, &ErrMixedPack{
PackID: packID,
})
}
}
return hints, errs
}
// PackError describes an error with a specific pack.
type PackError struct {
ID restic.ID
Orphaned bool
Truncated bool
Err error
}
func (e *PackError) Error() string {
return "pack " + e.ID.String() + ": " + e.Err.Error()
}
// Packs checks that all packs referenced in the index are still available and
// there are no packs that aren't in an index. errChan is closed after all
// packs have been checked.
func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
defer close(errChan)
debug.Log("checking for %d packs", len(c.packs))
debug.Log("listing repository packs")
repoPacks := make(map[restic.ID]int64)
err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
repoPacks[id] = size
return nil
})
if err != nil {
errChan <- err
}
for id, size := range c.packs {
reposize, ok := repoPacks[id]
// remove from repoPacks so we can find orphaned packs
delete(repoPacks, id)
// missing: present in c.packs but not in the repo
if !ok {
select {
case <-ctx.Done():
return
case errChan <- &PackError{ID: id, Err: errors.New("does not exist")}:
}
continue
}
// size not matching: present in c.packs and in the repo, but sizes do not match
if size != reposize {
select {
case <-ctx.Done():
return
case errChan <- &PackError{ID: id, Truncated: true, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}:
}
}
}
// orphaned: present in the repo but not in c.packs
for orphanID := range repoPacks {
select {
case <-ctx.Done():
return
case errChan <- &PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
}
}
}
// Error is an error that occurred while checking a repository. // Error is an error that occurred while checking a repository.
type Error struct { type Error struct {
TreeID restic.ID TreeID restic.ID
@@ -434,97 +259,3 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er
return blobs, err return blobs, err
} }
// CountPacks returns the number of packs in the repository.
func (c *Checker) CountPacks() uint64 {
return uint64(len(c.packs))
}
// GetPacks returns IDSet of packs in the repository
func (c *Checker) GetPacks() map[restic.ID]int64 {
return c.packs
}
// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan)
}
const maxStreamBufferSize = 4 * 1024 * 1024
// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan)
g, ctx := errgroup.WithContext(ctx)
type checkTask struct {
id restic.ID
size int64
blobs []restic.Blob
}
ch := make(chan checkTask)
// as packs are streamed the concurrency is limited by IO
workerCount := int(c.repo.Connections())
// run workers
for i := 0; i < workerCount; i++ {
g.Go(func() error {
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
for {
var ps checkTask
var ok bool
select {
case <-ctx.Done():
return nil
case ps, ok = <-ch:
if !ok {
return nil
}
}
err := repository.CheckPack(ctx, c.repo.(*repository.Repository), ps.id, ps.blobs, ps.size, bufRd, dec)
p.Add(1)
if err == nil {
continue
}
select {
case <-ctx.Done():
return nil
case errChan <- err:
}
}
})
}
packSet := restic.NewIDSet()
for pack := range packs {
packSet.Insert(pack)
}
// push packs to ch
for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) {
size := packs[pbs.PackID]
debug.Log("listed %v", pbs.PackID)
select {
case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}:
case <-ctx.Done():
}
}
close(ch)
err := g.Wait()
if err != nil {
select {
case <-ctx.Done():
return
case errChan <- err:
}
}
}

View File

@@ -67,7 +67,7 @@ func checkData(chkr *checker.Checker) []error {
func assertOnlyMixedPackHints(t *testing.T, hints []error) { func assertOnlyMixedPackHints(t *testing.T, hints []error) {
for _, err := range hints { for _, err := range hints {
if _, ok := err.(*checker.ErrMixedPack); !ok { if _, ok := err.(*repository.ErrMixedPack); !ok {
t.Fatalf("expected mixed pack hint, got %v", err) t.Fatalf("expected mixed pack hint, got %v", err)
} }
} }
@@ -110,7 +110,7 @@ func TestMissingPack(t *testing.T) {
test.Assert(t, len(errs) == 1, test.Assert(t, len(errs) == 1,
"expected exactly one error, got %v", len(errs)) "expected exactly one error, got %v", len(errs))
if err, ok := errs[0].(*checker.PackError); ok { if err, ok := errs[0].(*repository.PackError); ok {
test.Equals(t, packID, err.ID) test.Equals(t, packID, err.ID)
} else { } else {
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err) t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
@@ -138,7 +138,7 @@ func TestUnreferencedPack(t *testing.T) {
test.Assert(t, len(errs) == 1, test.Assert(t, len(errs) == 1,
"expected exactly one error, got %v", len(errs)) "expected exactly one error, got %v", len(errs))
if err, ok := errs[0].(*checker.PackError); ok { if err, ok := errs[0].(*repository.PackError); ok {
test.Equals(t, packID, err.ID.String()) test.Equals(t, packID, err.ID.String())
} else { } else {
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err) t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
@@ -269,7 +269,7 @@ func TestDuplicatePacksInIndex(t *testing.T) {
found := false found := false
for _, hint := range hints { for _, hint := range hints {
if _, ok := hint.(*checker.ErrDuplicatePacks); ok { if _, ok := hint.(*repository.ErrDuplicatePacks); ok {
found = true found = true
} else { } else {
t.Errorf("got unexpected hint: %v", hint) t.Errorf("got unexpected hint: %v", hint)
@@ -435,13 +435,16 @@ func TestCheckerModifiedData(t *testing.T) {
// loadTreesOnceRepository allows each tree to be loaded only once // loadTreesOnceRepository allows each tree to be loaded only once
type loadTreesOnceRepository struct { type loadTreesOnceRepository struct {
restic.Repository *repository.Repository
loadedTrees restic.IDSet loadedTrees restic.IDSet
mutex sync.Mutex mutex sync.Mutex
DuplicateTree bool DuplicateTree bool
} }
func (r *loadTreesOnceRepository) LoadTree(ctx context.Context, id restic.ID) (*data.Tree, error) { func (r *loadTreesOnceRepository) LoadBlob(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
if t != restic.TreeBlob {
return r.Repository.LoadBlob(ctx, t, id, buf)
}
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
@@ -451,7 +454,7 @@ func (r *loadTreesOnceRepository) LoadTree(ctx context.Context, id restic.ID) (*
return nil, errors.Errorf("trying to load tree with id %v twice", id) return nil, errors.Errorf("trying to load tree with id %v twice", id)
} }
r.loadedTrees.Insert(id) r.loadedTrees.Insert(id)
return data.LoadTree(ctx, r.Repository, id) return r.Repository.LoadBlob(ctx, t, id, buf)
} }
func TestCheckerNoDuplicateTreeDecodes(t *testing.T) { func TestCheckerNoDuplicateTreeDecodes(t *testing.T) {
@@ -471,22 +474,24 @@ func TestCheckerNoDuplicateTreeDecodes(t *testing.T) {
test.OKs(t, checkPacks(chkr)) test.OKs(t, checkPacks(chkr))
test.OKs(t, checkStruct(chkr)) test.OKs(t, checkStruct(chkr))
test.Assert(t, len(checkRepo.loadedTrees) > 0, "intercepting tree loading did not work")
test.Assert(t, !checkRepo.DuplicateTree, "detected duplicate tree loading") test.Assert(t, !checkRepo.DuplicateTree, "detected duplicate tree loading")
} }
// delayRepository delays read of a specific handle. // delayRepository delays read of a specific handle.
type delayRepository struct { type delayRepository struct {
restic.Repository *repository.Repository
DelayTree restic.ID DelayTree restic.ID
UnblockChannel chan struct{} UnblockChannel chan struct{}
Unblocker sync.Once Unblocker sync.Once
Triggered bool
} }
func (r *delayRepository) LoadTree(ctx context.Context, id restic.ID) (*data.Tree, error) { func (r *delayRepository) LoadBlob(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
if id == r.DelayTree { if t == restic.TreeBlob && id == r.DelayTree {
<-r.UnblockChannel <-r.UnblockChannel
} }
return data.LoadTree(ctx, r.Repository, id) return r.Repository.LoadBlob(ctx, t, id, buf)
} }
func (r *delayRepository) LookupBlobSize(t restic.BlobType, id restic.ID) (uint, bool) { func (r *delayRepository) LookupBlobSize(t restic.BlobType, id restic.ID) (uint, bool) {
@@ -499,6 +504,7 @@ func (r *delayRepository) LookupBlobSize(t restic.BlobType, id restic.ID) (uint,
func (r *delayRepository) Unblock() { func (r *delayRepository) Unblock() {
r.Unblocker.Do(func() { r.Unblocker.Do(func() {
close(r.UnblockChannel) close(r.UnblockChannel)
r.Triggered = true
}) })
} }
@@ -600,6 +606,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
if !errFound { if !errFound {
t.Fatal("no error found, checker is broken") t.Fatal("no error found, checker is broken")
} }
test.Assert(t, delayRepo.Triggered, "delay repository did not trigger")
} }
func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, func()) { func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, func()) {
@@ -613,7 +620,7 @@ func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, fun
} }
for _, err := range hints { for _, err := range hints {
if _, ok := err.(*checker.ErrMixedPack); !ok { if _, ok := err.(*repository.ErrMixedPack); !ok {
t.Fatalf("expected mixed pack hint, got %v", err) t.Fatalf("expected mixed pack hint, got %v", err)
} }
} }

View File

@@ -3,12 +3,10 @@ package checker
import ( import (
"context" "context"
"testing" "testing"
"github.com/restic/restic/internal/restic"
) )
// TestCheckRepo runs the checker on repo. // TestCheckRepo runs the checker on repo.
func TestCheckRepo(t testing.TB, repo restic.Repository, skipStructure bool) { func TestCheckRepo(t testing.TB, repo checkerRepository) {
chkr := New(repo, true) chkr := New(repo, true)
hints, errs := chkr.LoadIndex(context.TODO(), nil) hints, errs := chkr.LoadIndex(context.TODO(), nil)
@@ -33,23 +31,21 @@ func TestCheckRepo(t testing.TB, repo restic.Repository, skipStructure bool) {
t.Error(err) t.Error(err)
} }
if !skipStructure { // structure
// structure errChan = make(chan error)
errChan = make(chan error) go chkr.Structure(context.TODO(), nil, errChan)
go chkr.Structure(context.TODO(), nil, errChan)
for err := range errChan { for err := range errChan {
t.Error(err) t.Error(err)
} }
// unused blobs // unused blobs
blobs, err := chkr.UnusedBlobs(context.TODO()) blobs, err := chkr.UnusedBlobs(context.TODO())
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
if len(blobs) > 0 { if len(blobs) > 0 {
t.Errorf("unused blobs found: %v", blobs) t.Errorf("unused blobs found: %v", blobs)
}
} }
// read data // read data

View File

@@ -46,7 +46,7 @@ func TestCreateSnapshot(t *testing.T) {
t.Fatalf("snapshot has zero tree ID") t.Fatalf("snapshot has zero tree ID")
} }
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
} }
func BenchmarkTestCreateSnapshot(t *testing.B) { func BenchmarkTestCreateSnapshot(t *testing.B) {

View File

@@ -0,0 +1,287 @@
package repository
import (
"bufio"
"context"
"fmt"
"github.com/klauspost/compress/zstd"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/repository/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
const maxStreamBufferSize = 4 * 1024 * 1024
// ErrDuplicatePacks is returned when a pack is found in more than one index.
type ErrDuplicatePacks struct {
PackID restic.ID
Indexes restic.IDSet
}
func (e *ErrDuplicatePacks) Error() string {
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID, e.Indexes)
}
// ErrMixedPack is returned when a pack is found that contains both tree and data blobs.
type ErrMixedPack struct {
PackID restic.ID
}
func (e *ErrMixedPack) Error() string {
return fmt.Sprintf("pack %v contains a mix of tree and data blobs", e.PackID.Str())
}
// PackError describes an error with a specific pack.
type PackError struct {
ID restic.ID
Orphaned bool
Truncated bool
Err error
}
func (e *PackError) Error() string {
return "pack " + e.ID.String() + ": " + e.Err.Error()
}
// Checker handles index-related operations for repository checking.
type Checker struct {
packs map[restic.ID]int64
repo *Repository
}
// NewChecker creates a new Checker.
func NewChecker(repo *Repository) *Checker {
return &Checker{
packs: make(map[restic.ID]int64),
repo: repo,
}
}
func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) {
packs := make(map[restic.ID]restic.BlobType)
err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) {
tpe, exists := packs[pb.PackID]
if exists {
if pb.Type != tpe {
tpe = restic.InvalidBlob
}
} else {
tpe = pb.Type
}
packs[pb.PackID] = tpe
})
return packs, err
}
// LoadIndex loads all index files.
func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) (hints []error, errs []error) {
debug.Log("Start")
packToIndex := make(map[restic.ID]restic.IDSet)
// Use the repository's internal loadIndexWithCallback to handle per-index errors
err := c.repo.loadIndexWithCallback(ctx, p, func(id restic.ID, idx *index.Index, err error) error {
debug.Log("process index %v, err %v", id, err)
err = errors.Wrapf(err, "error loading index %v", id)
if err != nil {
errs = append(errs, err)
return nil
}
debug.Log("process blobs")
cnt := 0
err = idx.Each(ctx, func(blob restic.PackedBlob) {
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
packToIndex[blob.PackID] = restic.NewIDSet()
}
packToIndex[blob.PackID].Insert(id)
})
debug.Log("%d blobs processed", cnt)
return err
})
if err != nil {
// failed to load the index
return hints, append(errs, err)
}
// compute pack size using index entries
c.packs, err = pack.Size(ctx, c.repo, false)
if err != nil {
return hints, append(errs, err)
}
packTypes, err := computePackTypes(ctx, c.repo)
if err != nil {
return hints, append(errs, err)
}
debug.Log("checking for duplicate packs")
for packID := range c.packs {
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
if len(packToIndex[packID]) > 1 {
hints = append(hints, &ErrDuplicatePacks{
PackID: packID,
Indexes: packToIndex[packID],
})
}
if packTypes[packID] == restic.InvalidBlob {
hints = append(hints, &ErrMixedPack{
PackID: packID,
})
}
}
return hints, errs
}
// Packs checks that all packs referenced in the index are still available and
// there are no packs that aren't in an index. errChan is closed after all
// packs have been checked.
func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
defer close(errChan)
debug.Log("checking for %d packs", len(c.packs))
debug.Log("listing repository packs")
repoPacks := make(map[restic.ID]int64)
err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
repoPacks[id] = size
return nil
})
if err != nil {
errChan <- err
}
for id, size := range c.packs {
reposize, ok := repoPacks[id]
// remove from repoPacks so we can find orphaned packs
delete(repoPacks, id)
// missing: present in c.packs but not in the repo
if !ok {
select {
case <-ctx.Done():
return
case errChan <- &PackError{ID: id, Err: errors.New("does not exist")}:
}
continue
}
// size not matching: present in c.packs and in the repo, but sizes do not match
if size != reposize {
select {
case <-ctx.Done():
return
case errChan <- &PackError{ID: id, Truncated: true, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}:
}
}
}
// orphaned: present in the repo but not in c.packs
for orphanID := range repoPacks {
select {
case <-ctx.Done():
return
case errChan <- &PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
}
}
}
// CountPacks returns the number of packs in the repository.
func (c *Checker) CountPacks() uint64 {
return uint64(len(c.packs))
}
// GetPacks returns IDSet of packs in the repository
func (c *Checker) GetPacks() map[restic.ID]int64 {
return c.packs
}
// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan)
}
// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan)
g, ctx := errgroup.WithContext(ctx)
type checkTask struct {
id restic.ID
size int64
blobs []restic.Blob
}
ch := make(chan checkTask)
// as packs are streamed the concurrency is limited by IO
workerCount := int(c.repo.Connections())
// run workers
for i := 0; i < workerCount; i++ {
g.Go(func() error {
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
for {
var ps checkTask
var ok bool
select {
case <-ctx.Done():
return nil
case ps, ok = <-ch:
if !ok {
return nil
}
}
err := CheckPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd, dec)
p.Add(1)
if err == nil {
continue
}
select {
case <-ctx.Done():
return nil
case errChan <- err:
}
}
})
}
packSet := restic.NewIDSet()
for pack := range packs {
packSet.Insert(pack)
}
// push packs to ch
for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) {
size := packs[pbs.PackID]
debug.Log("listed %v", pbs.PackID)
select {
case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}:
case <-ctx.Done():
}
}
close(ch)
err := g.Wait()
if err != nil {
select {
case <-ctx.Done():
return
case errChan <- err:
}
}
}

View File

@@ -347,7 +347,7 @@ var (
depth = 3 depth = 3
) )
func createFilledRepo(t testing.TB, snapshots int, version uint) (restic.Repository, restic.Unpacked[restic.FileType]) { func createFilledRepo(t testing.TB, snapshots int, version uint) (*repository.Repository, restic.Unpacked[restic.FileType]) {
repo, unpacked, _ := repository.TestRepositoryWithVersion(t, version) repo, unpacked, _ := repository.TestRepositoryWithVersion(t, version)
for i := 0; i < snapshots; i++ { for i := 0; i < snapshots; i++ {
@@ -402,7 +402,7 @@ func testIndexSave(t *testing.T, version uint) {
})) }))
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs") rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
}) })
} }
} }
@@ -449,7 +449,7 @@ func testIndexSavePartial(t *testing.T, version uint) {
// remove pack files to make check happy // remove pack files to make check happy
rtest.OK(t, restic.ParallelRemove(context.TODO(), unpacked, newPacks, restic.PackFile, nil, nil)) rtest.OK(t, restic.ParallelRemove(context.TODO(), unpacked, newPacks, restic.PackFile, nil, nil))
checker.TestCheckRepo(t, repo, false) checker.TestCheckRepo(t, repo)
} }
func listPacks(t testing.TB, repo restic.Lister) restic.IDSet { func listPacks(t testing.TB, repo restic.Lister) restic.IDSet {

View File

@@ -8,7 +8,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/repository/pack" "github.com/restic/restic/internal/repository/pack"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@@ -49,7 +48,7 @@ func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) {
rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{})) rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{}))
repo = repository.TestOpenBackend(t, be) repo = repository.TestOpenBackend(t, be)
checker.TestCheckRepo(t, repo, true) repository.TestCheckRepo(t, repo)
if errOnUnused { if errOnUnused {
existing := listBlobs(repo) existing := listBlobs(repo)
@@ -181,7 +180,7 @@ func TestPruneSmall(t *testing.T) {
// repopen repository // repopen repository
repo = repository.TestOpenBackend(t, be) repo = repository.TestOpenBackend(t, be)
checker.TestCheckRepo(t, repo, true) repository.TestCheckRepo(t, repo)
// load all blobs // load all blobs
for blob := range keep { for blob := range keep {

View File

@@ -32,30 +32,18 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
} else { } else {
printer.P("loading indexes...\n") printer.P("loading indexes...\n")
mi := index.NewMasterIndex() err := repo.loadIndexWithCallback(ctx, nil, func(id restic.ID, _ *index.Index, err error) error {
err := index.ForAllIndexes(ctx, repo, repo, func(id restic.ID, idx *index.Index, err error) error {
if err != nil { if err != nil {
printer.E("removing invalid index %v: %v\n", id, err) printer.E("removing invalid index %v: %v\n", id, err)
obsoleteIndexes = append(obsoleteIndexes, id) obsoleteIndexes = append(obsoleteIndexes, id)
return nil return nil
} }
mi.Insert(idx)
return nil return nil
}) })
if err != nil { if err != nil {
return err return err
} }
err = mi.MergeFinalIndexes()
if err != nil {
return err
}
err = repo.SetIndex(mi)
if err != nil {
return err
}
packSizeFromIndex, err = pack.Size(ctx, repo, false) packSizeFromIndex, err = pack.Size(ctx, repo, false)
if err != nil { if err != nil {
return err return err

View File

@@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test"
@@ -36,7 +35,7 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T,
ReadAllPacks: readAllPacks, ReadAllPacks: readAllPacks,
}, &progress.NoopPrinter{})) }, &progress.NoopPrinter{}))
checker.TestCheckRepo(t, repo, true) repository.TestCheckRepo(t, repo)
} }
func TestRebuildIndex(t *testing.T) { func TestRebuildIndex(t *testing.T) {

View File

@@ -178,6 +178,10 @@ func (r *Repository) SetDryRun() {
r.be = dryrun.New(r.be) r.be = dryrun.New(r.be)
} }
func (r *Repository) Checker() *Checker {
return NewChecker(r)
}
// LoadUnpacked loads and decrypts the file with the given type and ID. // LoadUnpacked loads and decrypts the file with the given type and ID.
func (r *Repository) LoadUnpacked(ctx context.Context, t restic.FileType, id restic.ID) ([]byte, error) { func (r *Repository) LoadUnpacked(ctx context.Context, t restic.FileType, id restic.ID) ([]byte, error) {
debug.Log("load %v with id %v", t, id) debug.Log("load %v with id %v", t, id)
@@ -628,18 +632,17 @@ func (r *Repository) ListPacksFromIndex(ctx context.Context, packs restic.IDSet)
return r.idx.ListPacks(ctx, packs) return r.idx.ListPacks(ctx, packs)
} }
// SetIndex instructs the repository to use the given index.
func (r *Repository) SetIndex(i restic.MasterIndex) error {
r.idx = i.(*index.MasterIndex)
return r.prepareCache()
}
func (r *Repository) clearIndex() { func (r *Repository) clearIndex() {
r.idx = index.NewMasterIndex() r.idx = index.NewMasterIndex()
} }
// LoadIndex loads all index files from the backend in parallel and stores them // LoadIndex loads all index files from the backend in parallel and stores them
func (r *Repository) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) error { func (r *Repository) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) error {
return r.loadIndexWithCallback(ctx, p, nil)
}
// loadIndexWithCallback loads all index files from the backend in parallel and stores them
func (r *Repository) loadIndexWithCallback(ctx context.Context, p restic.TerminalCounterFactory, cb func(id restic.ID, idx *index.Index, err error) error) error {
debug.Log("Loading index") debug.Log("Loading index")
// reset in-memory index before loading it from the repository // reset in-memory index before loading it from the repository
@@ -650,7 +653,7 @@ func (r *Repository) LoadIndex(ctx context.Context, p restic.TerminalCounterFact
bar = p.NewCounterTerminalOnly("index files loaded") bar = p.NewCounterTerminalOnly("index files loaded")
} }
err := r.idx.Load(ctx, r, bar, nil) err := r.idx.Load(ctx, r, bar, cb)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -16,7 +16,6 @@ import (
"github.com/restic/restic/internal/backend/cache" "github.com/restic/restic/internal/backend/cache"
"github.com/restic/restic/internal/backend/local" "github.com/restic/restic/internal/backend/local"
"github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
@@ -131,7 +130,7 @@ func testSavePackMerging(t *testing.T, targetPercentage int, expectedPacks int)
})) }))
rtest.Equals(t, expectedPacks, packs, "unexpected number of pack files") rtest.Equals(t, expectedPacks, packs, "unexpected number of pack files")
checker.TestCheckRepo(t, repo, true) repository.TestCheckRepo(t, repo)
} }
func BenchmarkSaveAndEncrypt(t *testing.B) { func BenchmarkSaveAndEncrypt(t *testing.B) {

View File

@@ -162,3 +162,33 @@ func TestNewLock(_ *testing.T, repo *Repository, exclusive bool) (*restic.Lock,
// TODO get rid of this test helper // TODO get rid of this test helper
return restic.NewLock(context.TODO(), &internalRepository{repo}, exclusive) return restic.NewLock(context.TODO(), &internalRepository{repo}, exclusive)
} }
// TestCheckRepo runs the checker on repo.
func TestCheckRepo(t testing.TB, repo *Repository) {
chkr := NewChecker(repo)
hints, errs := chkr.LoadIndex(context.TODO(), nil)
if len(errs) != 0 {
t.Fatalf("errors loading index: %v", errs)
}
if len(hints) != 0 {
t.Fatalf("errors loading index: %v", hints)
}
// packs
errChan := make(chan error)
go chkr.Packs(context.TODO(), errChan)
for err := range errChan {
t.Error(err)
}
// read data
errChan = make(chan error)
go chkr.ReadData(context.TODO(), errChan)
for err := range errChan {
t.Error(err)
}
}

View File

@@ -22,7 +22,6 @@ type Repository interface {
Key() *crypto.Key Key() *crypto.Key
LoadIndex(ctx context.Context, p TerminalCounterFactory) error LoadIndex(ctx context.Context, p TerminalCounterFactory) error
SetIndex(mi MasterIndex) error
LookupBlob(t BlobType, id ID) []PackedBlob LookupBlob(t BlobType, id ID) []PackedBlob
LookupBlobSize(t BlobType, id ID) (size uint, exists bool) LookupBlobSize(t BlobType, id ID) (size uint, exists bool)
@@ -137,17 +136,6 @@ type TerminalCounterFactory interface {
NewCounterTerminalOnly(description string) *progress.Counter NewCounterTerminalOnly(description string) *progress.Counter
} }
// MasterIndex keeps track of the blobs are stored within files.
type MasterIndex interface {
Has(bh BlobHandle) bool
Lookup(bh BlobHandle) []PackedBlob
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob)) error
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
}
// Lister allows listing files in a backend. // Lister allows listing files in a backend.
type Lister interface { type Lister interface {
List(ctx context.Context, t FileType, fn func(ID, int64) error) error List(ctx context.Context, t FileType, fn func(ID, int64) error) error