diff --git a/cmd/restic/cmd_check.go b/cmd/restic/cmd_check.go index afc7835f0..f619cf58a 100644 --- a/cmd/restic/cmd_check.go +++ b/cmd/restic/cmd_check.go @@ -257,10 +257,10 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args errorsFound := false for _, hint := range hints { switch hint.(type) { - case *checker.ErrDuplicatePacks: + case *repository.ErrDuplicatePacks: printer.S("%s", hint.Error()) summary.HintRepairIndex = true - case *checker.ErrMixedPack: + case *repository.ErrMixedPack: printer.S("%s", hint.Error()) summary.HintPrune = true default: @@ -295,7 +295,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args go chkr.Packs(ctx, errChan) for err := range errChan { - var packErr *checker.PackError + var packErr *repository.PackError if errors.As(err, &packErr) { if packErr.Orphaned { orphanedPacks++ diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 7b57bf518..79dbba76a 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -1,19 +1,15 @@ package checker import ( - "bufio" "context" "fmt" "runtime" "sync" - "github.com/klauspost/compress/zstd" "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" - "github.com/restic/restic/internal/repository/index" - "github.com/restic/restic/internal/repository/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" "golang.org/x/sync/errgroup" @@ -25,7 +21,7 @@ import ( // A Checker only tests for internal errors within the data structures of the // repository (e.g. missing blobs), and needs a valid Repository to work on. type Checker struct { - packs map[restic.ID]int64 + *repository.Checker blobRefs struct { sync.Mutex M restic.BlobSet @@ -40,7 +36,7 @@ type Checker struct { // New returns a new checker which runs on repo. func New(repo restic.Repository, trackUnused bool) *Checker { c := &Checker{ - packs: make(map[restic.ID]int64), + Checker: repository.NewChecker(repo), repo: repo, trackUnused: trackUnused, } @@ -50,187 +46,12 @@ func New(repo restic.Repository, trackUnused bool) *Checker { return c } -// ErrDuplicatePacks is returned when a pack is found in more than one index. -type ErrDuplicatePacks struct { - PackID restic.ID - Indexes restic.IDSet -} - -func (e *ErrDuplicatePacks) Error() string { - return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID, e.Indexes) -} - -// ErrMixedPack is returned when a pack is found that contains both tree and data blobs. -type ErrMixedPack struct { - PackID restic.ID -} - -func (e *ErrMixedPack) Error() string { - return fmt.Sprintf("pack %v contains a mix of tree and data blobs", e.PackID.Str()) -} - func (c *Checker) LoadSnapshots(ctx context.Context) error { var err error c.snapshots, err = restic.MemorizeList(ctx, c.repo, restic.SnapshotFile) return err } -func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) { - packs := make(map[restic.ID]restic.BlobType) - err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) { - tpe, exists := packs[pb.PackID] - if exists { - if pb.Type != tpe { - tpe = restic.InvalidBlob - } - } else { - tpe = pb.Type - } - packs[pb.PackID] = tpe - }) - return packs, err -} - -// LoadIndex loads all index files. -func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) (hints []error, errs []error) { - debug.Log("Start") - - var bar *progress.Counter - if p != nil { - bar = p.NewCounterTerminalOnly("index files loaded") - } - - packToIndex := make(map[restic.ID]restic.IDSet) - masterIndex := index.NewMasterIndex() - err := masterIndex.Load(ctx, c.repo, bar, func(id restic.ID, idx *index.Index, err error) error { - debug.Log("process index %v, err %v", id, err) - err = errors.Wrapf(err, "error loading index %v", id) - - if err != nil { - errs = append(errs, err) - return nil - } - - debug.Log("process blobs") - cnt := 0 - err = idx.Each(ctx, func(blob restic.PackedBlob) { - cnt++ - - if _, ok := packToIndex[blob.PackID]; !ok { - packToIndex[blob.PackID] = restic.NewIDSet() - } - packToIndex[blob.PackID].Insert(id) - }) - - debug.Log("%d blobs processed", cnt) - return err - }) - if err != nil { - // failed to load the index - return hints, append(errs, err) - } - - err = c.repo.SetIndex(masterIndex) - if err != nil { - debug.Log("SetIndex returned error: %v", err) - errs = append(errs, err) - } - - // compute pack size using index entries - c.packs, err = pack.Size(ctx, c.repo, false) - if err != nil { - return hints, append(errs, err) - } - packTypes, err := computePackTypes(ctx, c.repo) - if err != nil { - return hints, append(errs, err) - } - - debug.Log("checking for duplicate packs") - for packID := range c.packs { - debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID])) - if len(packToIndex[packID]) > 1 { - hints = append(hints, &ErrDuplicatePacks{ - PackID: packID, - Indexes: packToIndex[packID], - }) - } - if packTypes[packID] == restic.InvalidBlob { - hints = append(hints, &ErrMixedPack{ - PackID: packID, - }) - } - } - - return hints, errs -} - -// PackError describes an error with a specific pack. -type PackError struct { - ID restic.ID - Orphaned bool - Truncated bool - Err error -} - -func (e *PackError) Error() string { - return "pack " + e.ID.String() + ": " + e.Err.Error() -} - -// Packs checks that all packs referenced in the index are still available and -// there are no packs that aren't in an index. errChan is closed after all -// packs have been checked. -func (c *Checker) Packs(ctx context.Context, errChan chan<- error) { - defer close(errChan) - debug.Log("checking for %d packs", len(c.packs)) - - debug.Log("listing repository packs") - repoPacks := make(map[restic.ID]int64) - - err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { - repoPacks[id] = size - return nil - }) - - if err != nil { - errChan <- err - } - - for id, size := range c.packs { - reposize, ok := repoPacks[id] - // remove from repoPacks so we can find orphaned packs - delete(repoPacks, id) - - // missing: present in c.packs but not in the repo - if !ok { - select { - case <-ctx.Done(): - return - case errChan <- &PackError{ID: id, Err: errors.New("does not exist")}: - } - continue - } - - // size not matching: present in c.packs and in the repo, but sizes do not match - if size != reposize { - select { - case <-ctx.Done(): - return - case errChan <- &PackError{ID: id, Truncated: true, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}: - } - } - } - - // orphaned: present in the repo but not in c.packs - for orphanID := range repoPacks { - select { - case <-ctx.Done(): - return - case errChan <- &PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}: - } - } -} - // Error is an error that occurred while checking a repository. type Error struct { TreeID restic.ID @@ -433,97 +254,3 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er return blobs, err } - -// CountPacks returns the number of packs in the repository. -func (c *Checker) CountPacks() uint64 { - return uint64(len(c.packs)) -} - -// GetPacks returns IDSet of packs in the repository -func (c *Checker) GetPacks() map[restic.ID]int64 { - return c.packs -} - -// ReadData loads all data from the repository and checks the integrity. -func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { - c.ReadPacks(ctx, c.packs, nil, errChan) -} - -const maxStreamBufferSize = 4 * 1024 * 1024 - -// ReadPacks loads data from specified packs and checks the integrity. -func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { - defer close(errChan) - - g, ctx := errgroup.WithContext(ctx) - type checkTask struct { - id restic.ID - size int64 - blobs []restic.Blob - } - ch := make(chan checkTask) - - // as packs are streamed the concurrency is limited by IO - workerCount := int(c.repo.Connections()) - // run workers - for i := 0; i < workerCount; i++ { - g.Go(func() error { - bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize) - dec, err := zstd.NewReader(nil) - if err != nil { - panic(dec) - } - defer dec.Close() - for { - var ps checkTask - var ok bool - - select { - case <-ctx.Done(): - return nil - case ps, ok = <-ch: - if !ok { - return nil - } - } - - err := repository.CheckPack(ctx, c.repo.(*repository.Repository), ps.id, ps.blobs, ps.size, bufRd, dec) - p.Add(1) - if err == nil { - continue - } - - select { - case <-ctx.Done(): - return nil - case errChan <- err: - } - } - }) - } - - packSet := restic.NewIDSet() - for pack := range packs { - packSet.Insert(pack) - } - - // push packs to ch - for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) { - size := packs[pbs.PackID] - debug.Log("listed %v", pbs.PackID) - select { - case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}: - case <-ctx.Done(): - } - } - close(ch) - - err := g.Wait() - if err != nil { - select { - case <-ctx.Done(): - return - case errChan <- err: - } - } -} diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index ce1542210..7008ba0f0 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -67,7 +67,7 @@ func checkData(chkr *checker.Checker) []error { func assertOnlyMixedPackHints(t *testing.T, hints []error) { for _, err := range hints { - if _, ok := err.(*checker.ErrMixedPack); !ok { + if _, ok := err.(*repository.ErrMixedPack); !ok { t.Fatalf("expected mixed pack hint, got %v", err) } } @@ -110,7 +110,7 @@ func TestMissingPack(t *testing.T) { test.Assert(t, len(errs) == 1, "expected exactly one error, got %v", len(errs)) - if err, ok := errs[0].(*checker.PackError); ok { + if err, ok := errs[0].(*repository.PackError); ok { test.Equals(t, packID, err.ID) } else { t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err) @@ -138,7 +138,7 @@ func TestUnreferencedPack(t *testing.T) { test.Assert(t, len(errs) == 1, "expected exactly one error, got %v", len(errs)) - if err, ok := errs[0].(*checker.PackError); ok { + if err, ok := errs[0].(*repository.PackError); ok { test.Equals(t, packID, err.ID.String()) } else { t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err) @@ -269,7 +269,7 @@ func TestDuplicatePacksInIndex(t *testing.T) { found := false for _, hint := range hints { - if _, ok := hint.(*checker.ErrDuplicatePacks); ok { + if _, ok := hint.(*repository.ErrDuplicatePacks); ok { found = true } else { t.Errorf("got unexpected hint: %v", hint) @@ -613,7 +613,7 @@ func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, fun } for _, err := range hints { - if _, ok := err.(*checker.ErrMixedPack); !ok { + if _, ok := err.(*repository.ErrMixedPack); !ok { t.Fatalf("expected mixed pack hint, got %v", err) } } diff --git a/internal/repository/checker.go b/internal/repository/checker.go new file mode 100644 index 000000000..25ef91134 --- /dev/null +++ b/internal/repository/checker.go @@ -0,0 +1,298 @@ +package repository + +import ( + "bufio" + "context" + "fmt" + + "github.com/klauspost/compress/zstd" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository/index" + "github.com/restic/restic/internal/repository/pack" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" +) + +const maxStreamBufferSize = 4 * 1024 * 1024 + +// ErrDuplicatePacks is returned when a pack is found in more than one index. +type ErrDuplicatePacks struct { + PackID restic.ID + Indexes restic.IDSet +} + +func (e *ErrDuplicatePacks) Error() string { + return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID, e.Indexes) +} + +// ErrMixedPack is returned when a pack is found that contains both tree and data blobs. +type ErrMixedPack struct { + PackID restic.ID +} + +func (e *ErrMixedPack) Error() string { + return fmt.Sprintf("pack %v contains a mix of tree and data blobs", e.PackID.Str()) +} + +// PackError describes an error with a specific pack. +type PackError struct { + ID restic.ID + Orphaned bool + Truncated bool + Err error +} + +func (e *PackError) Error() string { + return "pack " + e.ID.String() + ": " + e.Err.Error() +} + +// Checker handles index-related operations for repository checking. +type Checker struct { + packs map[restic.ID]int64 + repo restic.Repository +} + +// NewChecker creates a new Checker. +func NewChecker(repo restic.Repository) *Checker { + return &Checker{ + packs: make(map[restic.ID]int64), + repo: repo, + } +} +func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) { + packs := make(map[restic.ID]restic.BlobType) + err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) { + tpe, exists := packs[pb.PackID] + if exists { + if pb.Type != tpe { + tpe = restic.InvalidBlob + } + } else { + tpe = pb.Type + } + packs[pb.PackID] = tpe + }) + return packs, err +} + +// LoadIndex loads all index files. +func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory) (hints []error, errs []error) { + debug.Log("Start") + + var bar *progress.Counter + if p != nil { + bar = p.NewCounterTerminalOnly("index files loaded") + } + + packToIndex := make(map[restic.ID]restic.IDSet) + masterIndex := index.NewMasterIndex() + err := masterIndex.Load(ctx, c.repo, bar, func(id restic.ID, idx *index.Index, err error) error { + debug.Log("process index %v, err %v", id, err) + err = errors.Wrapf(err, "error loading index %v", id) + + if err != nil { + errs = append(errs, err) + return nil + } + + debug.Log("process blobs") + cnt := 0 + err = idx.Each(ctx, func(blob restic.PackedBlob) { + cnt++ + + if _, ok := packToIndex[blob.PackID]; !ok { + packToIndex[blob.PackID] = restic.NewIDSet() + } + packToIndex[blob.PackID].Insert(id) + }) + + debug.Log("%d blobs processed", cnt) + return err + }) + if err != nil { + // failed to load the index + return hints, append(errs, err) + } + + err = c.repo.SetIndex(masterIndex) + if err != nil { + debug.Log("SetIndex returned error: %v", err) + errs = append(errs, err) + } + + // compute pack size using index entries + c.packs, err = pack.Size(ctx, c.repo, false) + if err != nil { + return hints, append(errs, err) + } + packTypes, err := computePackTypes(ctx, c.repo) + if err != nil { + return hints, append(errs, err) + } + + debug.Log("checking for duplicate packs") + for packID := range c.packs { + debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID])) + if len(packToIndex[packID]) > 1 { + hints = append(hints, &ErrDuplicatePacks{ + PackID: packID, + Indexes: packToIndex[packID], + }) + } + if packTypes[packID] == restic.InvalidBlob { + hints = append(hints, &ErrMixedPack{ + PackID: packID, + }) + } + } + + return hints, errs +} + +// Packs checks that all packs referenced in the index are still available and +// there are no packs that aren't in an index. errChan is closed after all +// packs have been checked. +func (c *Checker) Packs(ctx context.Context, errChan chan<- error) { + defer close(errChan) + debug.Log("checking for %d packs", len(c.packs)) + + debug.Log("listing repository packs") + repoPacks := make(map[restic.ID]int64) + + err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { + repoPacks[id] = size + return nil + }) + + if err != nil { + errChan <- err + } + + for id, size := range c.packs { + reposize, ok := repoPacks[id] + // remove from repoPacks so we can find orphaned packs + delete(repoPacks, id) + + // missing: present in c.packs but not in the repo + if !ok { + select { + case <-ctx.Done(): + return + case errChan <- &PackError{ID: id, Err: errors.New("does not exist")}: + } + continue + } + + // size not matching: present in c.packs and in the repo, but sizes do not match + if size != reposize { + select { + case <-ctx.Done(): + return + case errChan <- &PackError{ID: id, Truncated: true, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}: + } + } + } + + // orphaned: present in the repo but not in c.packs + for orphanID := range repoPacks { + select { + case <-ctx.Done(): + return + case errChan <- &PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}: + } + } +} + +// CountPacks returns the number of packs in the repository. +func (c *Checker) CountPacks() uint64 { + return uint64(len(c.packs)) +} + +// GetPacks returns IDSet of packs in the repository +func (c *Checker) GetPacks() map[restic.ID]int64 { + return c.packs +} + +// ReadData loads all data from the repository and checks the integrity. +func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { + c.ReadPacks(ctx, c.packs, nil, errChan) +} + +// ReadPacks loads data from specified packs and checks the integrity. +func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { + defer close(errChan) + + g, ctx := errgroup.WithContext(ctx) + type checkTask struct { + id restic.ID + size int64 + blobs []restic.Blob + } + ch := make(chan checkTask) + + // as packs are streamed the concurrency is limited by IO + workerCount := int(c.repo.Connections()) + // run workers + for i := 0; i < workerCount; i++ { + g.Go(func() error { + bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize) + dec, err := zstd.NewReader(nil) + if err != nil { + panic(dec) + } + defer dec.Close() + for { + var ps checkTask + var ok bool + + select { + case <-ctx.Done(): + return nil + case ps, ok = <-ch: + if !ok { + return nil + } + } + + err := CheckPack(ctx, c.repo.(*Repository), ps.id, ps.blobs, ps.size, bufRd, dec) + p.Add(1) + if err == nil { + continue + } + + select { + case <-ctx.Done(): + return nil + case errChan <- err: + } + } + }) + } + + packSet := restic.NewIDSet() + for pack := range packs { + packSet.Insert(pack) + } + + // push packs to ch + for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) { + size := packs[pbs.PackID] + debug.Log("listed %v", pbs.PackID) + select { + case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}: + case <-ctx.Done(): + } + } + close(ch) + + err := g.Wait() + if err != nil { + select { + case <-ctx.Done(): + return + case errChan <- err: + } + } +}