mirror of
https://github.com/restic/restic.git
synced 2025-12-11 18:47:50 +00:00
check: split index/pack check into repository package
This commit is contained in:
@@ -257,10 +257,10 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
|
|||||||
errorsFound := false
|
errorsFound := false
|
||||||
for _, hint := range hints {
|
for _, hint := range hints {
|
||||||
switch hint.(type) {
|
switch hint.(type) {
|
||||||
case *checker.ErrDuplicatePacks:
|
case *repository.ErrDuplicatePacks:
|
||||||
printer.S("%s", hint.Error())
|
printer.S("%s", hint.Error())
|
||||||
summary.HintRepairIndex = true
|
summary.HintRepairIndex = true
|
||||||
case *checker.ErrMixedPack:
|
case *repository.ErrMixedPack:
|
||||||
printer.S("%s", hint.Error())
|
printer.S("%s", hint.Error())
|
||||||
summary.HintPrune = true
|
summary.HintPrune = true
|
||||||
default:
|
default:
|
||||||
@@ -295,7 +295,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
|
|||||||
go chkr.Packs(ctx, errChan)
|
go chkr.Packs(ctx, errChan)
|
||||||
|
|
||||||
for err := range errChan {
|
for err := range errChan {
|
||||||
var packErr *checker.PackError
|
var packErr *repository.PackError
|
||||||
if errors.As(err, &packErr) {
|
if errors.As(err, &packErr) {
|
||||||
if packErr.Orphaned {
|
if packErr.Orphaned {
|
||||||
orphanedPacks++
|
orphanedPacks++
|
||||||
|
|||||||
@@ -1,19 +1,15 @@
|
|||||||
package checker
|
package checker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/klauspost/compress/zstd"
|
|
||||||
"github.com/restic/restic/internal/data"
|
"github.com/restic/restic/internal/data"
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/repository"
|
"github.com/restic/restic/internal/repository"
|
||||||
"github.com/restic/restic/internal/repository/index"
|
|
||||||
"github.com/restic/restic/internal/repository/pack"
|
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
"github.com/restic/restic/internal/ui/progress"
|
"github.com/restic/restic/internal/ui/progress"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
@@ -25,7 +21,7 @@ import (
|
|||||||
// A Checker only tests for internal errors within the data structures of the
|
// A Checker only tests for internal errors within the data structures of the
|
||||||
// repository (e.g. missing blobs), and needs a valid Repository to work on.
|
// repository (e.g. missing blobs), and needs a valid Repository to work on.
|
||||||
type Checker struct {
|
type Checker struct {
|
||||||
packs map[restic.ID]int64
|
*repository.Checker
|
||||||
blobRefs struct {
|
blobRefs struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
M restic.BlobSet
|
M restic.BlobSet
|
||||||
@@ -40,7 +36,7 @@ type Checker struct {
|
|||||||
// New returns a new checker which runs on repo.
|
// New returns a new checker which runs on repo.
|
||||||
func New(repo restic.Repository, trackUnused bool) *Checker {
|
func New(repo restic.Repository, trackUnused bool) *Checker {
|
||||||
c := &Checker{
|
c := &Checker{
|
||||||
packs: make(map[restic.ID]int64),
|
Checker: repository.NewChecker(repo),
|
||||||
repo: repo,
|
repo: repo,
|
||||||
trackUnused: trackUnused,
|
trackUnused: trackUnused,
|
||||||
}
|
}
|
||||||
@@ -50,187 +46,12 @@ func New(repo restic.Repository, trackUnused bool) *Checker {
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrDuplicatePacks is returned when a pack is found in more than one index.
|
|
||||||
type ErrDuplicatePacks struct {
|
|
||||||
PackID restic.ID
|
|
||||||
Indexes restic.IDSet
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ErrDuplicatePacks) Error() string {
|
|
||||||
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID, e.Indexes)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrMixedPack is returned when a pack is found that contains both tree and data blobs.
|
|
||||||
type ErrMixedPack struct {
|
|
||||||
PackID restic.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ErrMixedPack) Error() string {
|
|
||||||
return fmt.Sprintf("pack %v contains a mix of tree and data blobs", e.PackID.Str())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Checker) LoadSnapshots(ctx context.Context) error {
|
func (c *Checker) LoadSnapshots(ctx context.Context) error {
|
||||||
var err error
|
var err error
|
||||||
c.snapshots, err = restic.MemorizeList(ctx, c.repo, restic.SnapshotFile)
|
c.snapshots, err = restic.MemorizeList(ctx, c.repo, restic.SnapshotFile)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) {
|
|
||||||
packs := make(map[restic.ID]restic.BlobType)
|
|
||||||
err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) {
|
|
||||||
tpe, exists := packs[pb.PackID]
|
|
||||||
if exists {
|
|
||||||
if pb.Type != tpe {
|
|
||||||
tpe = restic.InvalidBlob
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tpe = pb.Type
|
|
||||||
}
|
|
||||||
packs[pb.PackID] = tpe
|
|
||||||
})
|
|
||||||
return packs, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadIndex loads all index files.
|
|
||||||
func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) (hints []error, errs []error) {
|
|
||||||
debug.Log("Start")
|
|
||||||
|
|
||||||
var bar *progress.Counter
|
|
||||||
if p != nil {
|
|
||||||
bar = p.NewCounterTerminalOnly("index files loaded")
|
|
||||||
}
|
|
||||||
|
|
||||||
packToIndex := make(map[restic.ID]restic.IDSet)
|
|
||||||
masterIndex := index.NewMasterIndex()
|
|
||||||
err := masterIndex.Load(ctx, c.repo, bar, func(id restic.ID, idx *index.Index, err error) error {
|
|
||||||
debug.Log("process index %v, err %v", id, err)
|
|
||||||
err = errors.Wrapf(err, "error loading index %v", id)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("process blobs")
|
|
||||||
cnt := 0
|
|
||||||
err = idx.Each(ctx, func(blob restic.PackedBlob) {
|
|
||||||
cnt++
|
|
||||||
|
|
||||||
if _, ok := packToIndex[blob.PackID]; !ok {
|
|
||||||
packToIndex[blob.PackID] = restic.NewIDSet()
|
|
||||||
}
|
|
||||||
packToIndex[blob.PackID].Insert(id)
|
|
||||||
})
|
|
||||||
|
|
||||||
debug.Log("%d blobs processed", cnt)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
// failed to load the index
|
|
||||||
return hints, append(errs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = c.repo.SetIndex(masterIndex)
|
|
||||||
if err != nil {
|
|
||||||
debug.Log("SetIndex returned error: %v", err)
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// compute pack size using index entries
|
|
||||||
c.packs, err = pack.Size(ctx, c.repo, false)
|
|
||||||
if err != nil {
|
|
||||||
return hints, append(errs, err)
|
|
||||||
}
|
|
||||||
packTypes, err := computePackTypes(ctx, c.repo)
|
|
||||||
if err != nil {
|
|
||||||
return hints, append(errs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("checking for duplicate packs")
|
|
||||||
for packID := range c.packs {
|
|
||||||
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
|
|
||||||
if len(packToIndex[packID]) > 1 {
|
|
||||||
hints = append(hints, &ErrDuplicatePacks{
|
|
||||||
PackID: packID,
|
|
||||||
Indexes: packToIndex[packID],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if packTypes[packID] == restic.InvalidBlob {
|
|
||||||
hints = append(hints, &ErrMixedPack{
|
|
||||||
PackID: packID,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return hints, errs
|
|
||||||
}
|
|
||||||
|
|
||||||
// PackError describes an error with a specific pack.
|
|
||||||
type PackError struct {
|
|
||||||
ID restic.ID
|
|
||||||
Orphaned bool
|
|
||||||
Truncated bool
|
|
||||||
Err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *PackError) Error() string {
|
|
||||||
return "pack " + e.ID.String() + ": " + e.Err.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Packs checks that all packs referenced in the index are still available and
|
|
||||||
// there are no packs that aren't in an index. errChan is closed after all
|
|
||||||
// packs have been checked.
|
|
||||||
func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
|
||||||
defer close(errChan)
|
|
||||||
debug.Log("checking for %d packs", len(c.packs))
|
|
||||||
|
|
||||||
debug.Log("listing repository packs")
|
|
||||||
repoPacks := make(map[restic.ID]int64)
|
|
||||||
|
|
||||||
err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
|
|
||||||
repoPacks[id] = size
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
errChan <- err
|
|
||||||
}
|
|
||||||
|
|
||||||
for id, size := range c.packs {
|
|
||||||
reposize, ok := repoPacks[id]
|
|
||||||
// remove from repoPacks so we can find orphaned packs
|
|
||||||
delete(repoPacks, id)
|
|
||||||
|
|
||||||
// missing: present in c.packs but not in the repo
|
|
||||||
if !ok {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case errChan <- &PackError{ID: id, Err: errors.New("does not exist")}:
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// size not matching: present in c.packs and in the repo, but sizes do not match
|
|
||||||
if size != reposize {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case errChan <- &PackError{ID: id, Truncated: true, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// orphaned: present in the repo but not in c.packs
|
|
||||||
for orphanID := range repoPacks {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case errChan <- &PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Error is an error that occurred while checking a repository.
|
// Error is an error that occurred while checking a repository.
|
||||||
type Error struct {
|
type Error struct {
|
||||||
TreeID restic.ID
|
TreeID restic.ID
|
||||||
@@ -433,97 +254,3 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er
|
|||||||
|
|
||||||
return blobs, err
|
return blobs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountPacks returns the number of packs in the repository.
|
|
||||||
func (c *Checker) CountPacks() uint64 {
|
|
||||||
return uint64(len(c.packs))
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPacks returns IDSet of packs in the repository
|
|
||||||
func (c *Checker) GetPacks() map[restic.ID]int64 {
|
|
||||||
return c.packs
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadData loads all data from the repository and checks the integrity.
|
|
||||||
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
|
|
||||||
c.ReadPacks(ctx, c.packs, nil, errChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
const maxStreamBufferSize = 4 * 1024 * 1024
|
|
||||||
|
|
||||||
// ReadPacks loads data from specified packs and checks the integrity.
|
|
||||||
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
|
|
||||||
defer close(errChan)
|
|
||||||
|
|
||||||
g, ctx := errgroup.WithContext(ctx)
|
|
||||||
type checkTask struct {
|
|
||||||
id restic.ID
|
|
||||||
size int64
|
|
||||||
blobs []restic.Blob
|
|
||||||
}
|
|
||||||
ch := make(chan checkTask)
|
|
||||||
|
|
||||||
// as packs are streamed the concurrency is limited by IO
|
|
||||||
workerCount := int(c.repo.Connections())
|
|
||||||
// run workers
|
|
||||||
for i := 0; i < workerCount; i++ {
|
|
||||||
g.Go(func() error {
|
|
||||||
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
|
|
||||||
dec, err := zstd.NewReader(nil)
|
|
||||||
if err != nil {
|
|
||||||
panic(dec)
|
|
||||||
}
|
|
||||||
defer dec.Close()
|
|
||||||
for {
|
|
||||||
var ps checkTask
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
case ps, ok = <-ch:
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err := repository.CheckPack(ctx, c.repo.(*repository.Repository), ps.id, ps.blobs, ps.size, bufRd, dec)
|
|
||||||
p.Add(1)
|
|
||||||
if err == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
case errChan <- err:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
packSet := restic.NewIDSet()
|
|
||||||
for pack := range packs {
|
|
||||||
packSet.Insert(pack)
|
|
||||||
}
|
|
||||||
|
|
||||||
// push packs to ch
|
|
||||||
for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) {
|
|
||||||
size := packs[pbs.PackID]
|
|
||||||
debug.Log("listed %v", pbs.PackID)
|
|
||||||
select {
|
|
||||||
case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
|
|
||||||
err := g.Wait()
|
|
||||||
if err != nil {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case errChan <- err:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ func checkData(chkr *checker.Checker) []error {
|
|||||||
|
|
||||||
func assertOnlyMixedPackHints(t *testing.T, hints []error) {
|
func assertOnlyMixedPackHints(t *testing.T, hints []error) {
|
||||||
for _, err := range hints {
|
for _, err := range hints {
|
||||||
if _, ok := err.(*checker.ErrMixedPack); !ok {
|
if _, ok := err.(*repository.ErrMixedPack); !ok {
|
||||||
t.Fatalf("expected mixed pack hint, got %v", err)
|
t.Fatalf("expected mixed pack hint, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -110,7 +110,7 @@ func TestMissingPack(t *testing.T) {
|
|||||||
test.Assert(t, len(errs) == 1,
|
test.Assert(t, len(errs) == 1,
|
||||||
"expected exactly one error, got %v", len(errs))
|
"expected exactly one error, got %v", len(errs))
|
||||||
|
|
||||||
if err, ok := errs[0].(*checker.PackError); ok {
|
if err, ok := errs[0].(*repository.PackError); ok {
|
||||||
test.Equals(t, packID, err.ID)
|
test.Equals(t, packID, err.ID)
|
||||||
} else {
|
} else {
|
||||||
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
|
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
|
||||||
@@ -138,7 +138,7 @@ func TestUnreferencedPack(t *testing.T) {
|
|||||||
test.Assert(t, len(errs) == 1,
|
test.Assert(t, len(errs) == 1,
|
||||||
"expected exactly one error, got %v", len(errs))
|
"expected exactly one error, got %v", len(errs))
|
||||||
|
|
||||||
if err, ok := errs[0].(*checker.PackError); ok {
|
if err, ok := errs[0].(*repository.PackError); ok {
|
||||||
test.Equals(t, packID, err.ID.String())
|
test.Equals(t, packID, err.ID.String())
|
||||||
} else {
|
} else {
|
||||||
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
|
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
|
||||||
@@ -269,7 +269,7 @@ func TestDuplicatePacksInIndex(t *testing.T) {
|
|||||||
|
|
||||||
found := false
|
found := false
|
||||||
for _, hint := range hints {
|
for _, hint := range hints {
|
||||||
if _, ok := hint.(*checker.ErrDuplicatePacks); ok {
|
if _, ok := hint.(*repository.ErrDuplicatePacks); ok {
|
||||||
found = true
|
found = true
|
||||||
} else {
|
} else {
|
||||||
t.Errorf("got unexpected hint: %v", hint)
|
t.Errorf("got unexpected hint: %v", hint)
|
||||||
@@ -613,7 +613,7 @@ func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, fun
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, err := range hints {
|
for _, err := range hints {
|
||||||
if _, ok := err.(*checker.ErrMixedPack); !ok {
|
if _, ok := err.(*repository.ErrMixedPack); !ok {
|
||||||
t.Fatalf("expected mixed pack hint, got %v", err)
|
t.Fatalf("expected mixed pack hint, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
298
internal/repository/checker.go
Normal file
298
internal/repository/checker.go
Normal file
@@ -0,0 +1,298 @@
|
|||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
|
"github.com/restic/restic/internal/debug"
|
||||||
|
"github.com/restic/restic/internal/errors"
|
||||||
|
"github.com/restic/restic/internal/repository/index"
|
||||||
|
"github.com/restic/restic/internal/repository/pack"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"github.com/restic/restic/internal/ui/progress"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxStreamBufferSize = 4 * 1024 * 1024
|
||||||
|
|
||||||
|
// ErrDuplicatePacks is returned when a pack is found in more than one index.
|
||||||
|
type ErrDuplicatePacks struct {
|
||||||
|
PackID restic.ID
|
||||||
|
Indexes restic.IDSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ErrDuplicatePacks) Error() string {
|
||||||
|
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID, e.Indexes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrMixedPack is returned when a pack is found that contains both tree and data blobs.
|
||||||
|
type ErrMixedPack struct {
|
||||||
|
PackID restic.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ErrMixedPack) Error() string {
|
||||||
|
return fmt.Sprintf("pack %v contains a mix of tree and data blobs", e.PackID.Str())
|
||||||
|
}
|
||||||
|
|
||||||
|
// PackError describes an error with a specific pack.
|
||||||
|
type PackError struct {
|
||||||
|
ID restic.ID
|
||||||
|
Orphaned bool
|
||||||
|
Truncated bool
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *PackError) Error() string {
|
||||||
|
return "pack " + e.ID.String() + ": " + e.Err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checker handles index-related operations for repository checking.
|
||||||
|
type Checker struct {
|
||||||
|
packs map[restic.ID]int64
|
||||||
|
repo restic.Repository
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChecker creates a new Checker.
|
||||||
|
func NewChecker(repo restic.Repository) *Checker {
|
||||||
|
return &Checker{
|
||||||
|
packs: make(map[restic.ID]int64),
|
||||||
|
repo: repo,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) {
|
||||||
|
packs := make(map[restic.ID]restic.BlobType)
|
||||||
|
err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) {
|
||||||
|
tpe, exists := packs[pb.PackID]
|
||||||
|
if exists {
|
||||||
|
if pb.Type != tpe {
|
||||||
|
tpe = restic.InvalidBlob
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tpe = pb.Type
|
||||||
|
}
|
||||||
|
packs[pb.PackID] = tpe
|
||||||
|
})
|
||||||
|
return packs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadIndex loads all index files.
|
||||||
|
func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) (hints []error, errs []error) {
|
||||||
|
debug.Log("Start")
|
||||||
|
|
||||||
|
var bar *progress.Counter
|
||||||
|
if p != nil {
|
||||||
|
bar = p.NewCounterTerminalOnly("index files loaded")
|
||||||
|
}
|
||||||
|
|
||||||
|
packToIndex := make(map[restic.ID]restic.IDSet)
|
||||||
|
masterIndex := index.NewMasterIndex()
|
||||||
|
err := masterIndex.Load(ctx, c.repo, bar, func(id restic.ID, idx *index.Index, err error) error {
|
||||||
|
debug.Log("process index %v, err %v", id, err)
|
||||||
|
err = errors.Wrapf(err, "error loading index %v", id)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("process blobs")
|
||||||
|
cnt := 0
|
||||||
|
err = idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||||
|
cnt++
|
||||||
|
|
||||||
|
if _, ok := packToIndex[blob.PackID]; !ok {
|
||||||
|
packToIndex[blob.PackID] = restic.NewIDSet()
|
||||||
|
}
|
||||||
|
packToIndex[blob.PackID].Insert(id)
|
||||||
|
})
|
||||||
|
|
||||||
|
debug.Log("%d blobs processed", cnt)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// failed to load the index
|
||||||
|
return hints, append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.repo.SetIndex(masterIndex)
|
||||||
|
if err != nil {
|
||||||
|
debug.Log("SetIndex returned error: %v", err)
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute pack size using index entries
|
||||||
|
c.packs, err = pack.Size(ctx, c.repo, false)
|
||||||
|
if err != nil {
|
||||||
|
return hints, append(errs, err)
|
||||||
|
}
|
||||||
|
packTypes, err := computePackTypes(ctx, c.repo)
|
||||||
|
if err != nil {
|
||||||
|
return hints, append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("checking for duplicate packs")
|
||||||
|
for packID := range c.packs {
|
||||||
|
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
|
||||||
|
if len(packToIndex[packID]) > 1 {
|
||||||
|
hints = append(hints, &ErrDuplicatePacks{
|
||||||
|
PackID: packID,
|
||||||
|
Indexes: packToIndex[packID],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if packTypes[packID] == restic.InvalidBlob {
|
||||||
|
hints = append(hints, &ErrMixedPack{
|
||||||
|
PackID: packID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return hints, errs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Packs checks that all packs referenced in the index are still available and
|
||||||
|
// there are no packs that aren't in an index. errChan is closed after all
|
||||||
|
// packs have been checked.
|
||||||
|
func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
||||||
|
defer close(errChan)
|
||||||
|
debug.Log("checking for %d packs", len(c.packs))
|
||||||
|
|
||||||
|
debug.Log("listing repository packs")
|
||||||
|
repoPacks := make(map[restic.ID]int64)
|
||||||
|
|
||||||
|
err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
|
||||||
|
repoPacks[id] = size
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
errChan <- err
|
||||||
|
}
|
||||||
|
|
||||||
|
for id, size := range c.packs {
|
||||||
|
reposize, ok := repoPacks[id]
|
||||||
|
// remove from repoPacks so we can find orphaned packs
|
||||||
|
delete(repoPacks, id)
|
||||||
|
|
||||||
|
// missing: present in c.packs but not in the repo
|
||||||
|
if !ok {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case errChan <- &PackError{ID: id, Err: errors.New("does not exist")}:
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// size not matching: present in c.packs and in the repo, but sizes do not match
|
||||||
|
if size != reposize {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case errChan <- &PackError{ID: id, Truncated: true, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// orphaned: present in the repo but not in c.packs
|
||||||
|
for orphanID := range repoPacks {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case errChan <- &PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CountPacks returns the number of packs in the repository.
|
||||||
|
func (c *Checker) CountPacks() uint64 {
|
||||||
|
return uint64(len(c.packs))
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPacks returns IDSet of packs in the repository
|
||||||
|
func (c *Checker) GetPacks() map[restic.ID]int64 {
|
||||||
|
return c.packs
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadData loads all data from the repository and checks the integrity.
|
||||||
|
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
|
||||||
|
c.ReadPacks(ctx, c.packs, nil, errChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadPacks loads data from specified packs and checks the integrity.
|
||||||
|
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
|
||||||
|
defer close(errChan)
|
||||||
|
|
||||||
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
type checkTask struct {
|
||||||
|
id restic.ID
|
||||||
|
size int64
|
||||||
|
blobs []restic.Blob
|
||||||
|
}
|
||||||
|
ch := make(chan checkTask)
|
||||||
|
|
||||||
|
// as packs are streamed the concurrency is limited by IO
|
||||||
|
workerCount := int(c.repo.Connections())
|
||||||
|
// run workers
|
||||||
|
for i := 0; i < workerCount; i++ {
|
||||||
|
g.Go(func() error {
|
||||||
|
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
|
||||||
|
dec, err := zstd.NewReader(nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(dec)
|
||||||
|
}
|
||||||
|
defer dec.Close()
|
||||||
|
for {
|
||||||
|
var ps checkTask
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case ps, ok = <-ch:
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := CheckPack(ctx, c.repo.(*Repository), ps.id, ps.blobs, ps.size, bufRd, dec)
|
||||||
|
p.Add(1)
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case errChan <- err:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
packSet := restic.NewIDSet()
|
||||||
|
for pack := range packs {
|
||||||
|
packSet.Insert(pack)
|
||||||
|
}
|
||||||
|
|
||||||
|
// push packs to ch
|
||||||
|
for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) {
|
||||||
|
size := packs[pbs.PackID]
|
||||||
|
debug.Log("listed %v", pbs.PackID)
|
||||||
|
select {
|
||||||
|
case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
|
||||||
|
err := g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case errChan <- err:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user