Merge pull request #4626 from MichaelEischer/reliable-large-restores

Improve reliability of large restores
This commit is contained in:
Michael Eischer
2024-01-09 18:23:09 +01:00
committed by GitHub
3 changed files with 150 additions and 77 deletions

View File

@@ -197,19 +197,20 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
return wg.Wait()
}
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
type blobToFileOffsetsMapping map[restic.ID]struct {
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
blob restic.Blob
}
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
// calculate blob->[]files->[]offsets mappings
blobs := make(map[restic.ID]struct {
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
})
var blobList []restic.Blob
blobs := make(blobToFileOffsetsMapping)
for file := range pack.files {
addBlob := func(blob restic.Blob, fileOffset int64) {
blobInfo, ok := blobs[blob.ID]
if !ok {
blobInfo.files = make(map[*fileInfo][]int64)
blobList = append(blobList, blob)
blobInfo.blob = blob
blobs[blob.ID] = blobInfo
}
blobInfo.files[file] = append(blobInfo.files[file], fileOffset)
@@ -239,80 +240,120 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
}
}
sanitizeError := func(file *fileInfo, err error) error {
if err != nil {
err = r.Error(file.location, err)
}
return err
}
// track already processed blobs for precise error reporting
processedBlobs := restic.NewBlobSet()
err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error {
processedBlobs.Insert(h)
blob := blobs[h.ID]
if err != nil {
for file := range blob.files {
if errFile := sanitizeError(file, err); errFile != nil {
return errFile
for _, entry := range blobs {
occurrences := 0
for _, offsets := range entry.files {
occurrences += len(offsets)
}
// With a maximum blob size of 8MB, the normal blob streaming has to write
// at most 800MB for a single blob. This should be short enough to avoid
// network connection timeouts. Based on a quick test, a limit of 100 only
// selects a very small number of blobs (the number of references per blob
// - aka. `count` - seem to follow a expontential distribution)
if occurrences > 100 {
// process frequently referenced blobs first as these can take a long time to write
// which can cause backend connections to time out
delete(blobs, entry.blob.ID)
partialBlobs := blobToFileOffsetsMapping{entry.blob.ID: entry}
err := r.downloadBlobs(ctx, pack.id, partialBlobs, processedBlobs)
if err := r.reportError(blobs, processedBlobs, err); err != nil {
return err
}
}
}
if len(blobs) == 0 {
return nil
}
err := r.downloadBlobs(ctx, pack.id, blobs, processedBlobs)
return r.reportError(blobs, processedBlobs, err)
}
func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error {
if err != nil {
err = r.Error(file.location, err)
}
return err
}
func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet, err error) error {
if err == nil {
return nil
}
// only report error for not yet processed blobs
affectedFiles := make(map[*fileInfo]struct{})
for _, entry := range blobs {
if processedBlobs.Has(entry.blob.BlobHandle) {
continue
}
for file := range entry.files {
affectedFiles[file] = struct{}{}
}
}
for file := range affectedFiles {
if errFile := r.sanitizeError(file, err); errFile != nil {
return errFile
}
}
return nil
}
func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID,
blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error {
blobList := make([]restic.Blob, 0, len(blobs))
for _, entry := range blobs {
blobList = append(blobList, entry.blob)
}
return repository.StreamPack(ctx, r.packLoader, r.key, packID, blobList,
func(h restic.BlobHandle, blobData []byte, err error) error {
processedBlobs.Insert(h)
blob := blobs[h.ID]
if err != nil {
for file := range blob.files {
if errFile := r.sanitizeError(file, err); errFile != nil {
return errFile
}
}
return nil
}
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
createSize := int64(-1)
file.lock.Lock()
if file.inProgress {
file.lock.Unlock()
} else {
defer file.lock.Unlock()
file.inProgress = true
createSize = file.size
}
writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse)
if r.progress != nil {
r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size))
}
return writeErr
}
err := r.sanitizeError(file, writeToFile())
if err != nil {
return err
}
}
}
return nil
}
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
createSize := int64(-1)
file.lock.Lock()
if file.inProgress {
file.lock.Unlock()
} else {
defer file.lock.Unlock()
file.inProgress = true
createSize = file.size
}
writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse)
if r.progress != nil {
r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size))
}
return writeErr
}
err := sanitizeError(file, writeToFile())
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
// only report error for not yet processed blobs
affectedFiles := make(map[*fileInfo]struct{})
for _, blob := range blobList {
if processedBlobs.Has(blob.BlobHandle) {
continue
}
blob := blobs[blob.ID]
for file := range blob.files {
affectedFiles[file] = struct{}{}
}
}
for file := range affectedFiles {
if errFile := sanitizeError(file, err); errFile != nil {
return errFile
}
}
}
return nil
})
}

View File

@@ -248,6 +248,27 @@ func TestFileRestorerPackSkip(t *testing.T) {
}
}
func TestFileRestorerFrequentBlob(t *testing.T) {
tempdir := rtest.TempDir(t)
for _, sparse := range []bool{false, true} {
blobs := []TestBlob{
{"data1-1", "pack1-1"},
}
for i := 0; i < 10000; i++ {
blobs = append(blobs, TestBlob{"a", "pack1-1"})
}
blobs = append(blobs, TestBlob{"end", "pack1-1"})
restoreAndVerify(t, tempdir, []TestFile{
{
name: "file1",
blobs: blobs,
},
}, nil, sparse)
}
}
func TestErrorRestoreFiles(t *testing.T) {
tempdir := rtest.TempDir(t)
content := []TestFile{