mirror of
https://github.com/restic/restic.git
synced 2025-08-12 11:57:56 +00:00
Fix non-intuitive repository behavior
- The SaveBlob method now checks for duplicates. - Moves handling of pending blobs to MasterIndex. -> also cleans up pending index entries when they are saved in the index -> when using SaveBlob no need to care about index any longer - Always check for full index and save it when storing packs. -> removes the need of an index uploader -> also removes the verbose "uploaded intermediate index" messages - The Flush method now also saves the index - Fix race condition when checking and saving full/non-finalized indexes
This commit is contained in:
@@ -783,11 +783,6 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
|
||||
return nil, restic.ID{}, err
|
||||
}
|
||||
|
||||
err = arch.Repo.SaveIndex(ctx)
|
||||
if err != nil {
|
||||
return nil, restic.ID{}, err
|
||||
}
|
||||
|
||||
sn, err := restic.NewSnapshot(targets, opts.Tags, opts.Hostname, opts.Time)
|
||||
if err != nil {
|
||||
return nil, restic.ID{}, err
|
||||
|
@@ -96,11 +96,6 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = repo.SaveIndex(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !startCallback {
|
||||
t.Errorf("start callback did not happen")
|
||||
}
|
||||
@@ -418,13 +413,16 @@ type blobCountingRepo struct {
|
||||
saved map[restic.BlobHandle]uint
|
||||
}
|
||||
|
||||
func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) {
|
||||
id, err := repo.Repository.SaveBlob(ctx, t, buf, id)
|
||||
func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) {
|
||||
id, exists, err := repo.Repository.SaveBlob(ctx, t, buf, id, false)
|
||||
if exists {
|
||||
return id, exists, err
|
||||
}
|
||||
h := restic.BlobHandle{ID: id, Type: t}
|
||||
repo.m.Lock()
|
||||
repo.saved[h]++
|
||||
repo.m.Unlock()
|
||||
return id, err
|
||||
return id, exists, err
|
||||
}
|
||||
|
||||
func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) {
|
||||
@@ -853,11 +851,6 @@ func TestArchiverSaveDir(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = repo.SaveIndex(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
want := test.want
|
||||
if want == nil {
|
||||
want = test.src
|
||||
@@ -946,11 +939,6 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = repo.SaveIndex(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for h, n := range repo.saved {
|
||||
if n > 1 {
|
||||
t.Errorf("iteration %v: blob %v saved more than once (%d times)", i, h, n)
|
||||
@@ -1085,11 +1073,6 @@ func TestArchiverSaveTree(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = repo.SaveIndex(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
want := test.want
|
||||
if want == nil {
|
||||
want = test.src
|
||||
@@ -1841,13 +1824,13 @@ type failSaveRepo struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) {
|
||||
func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) {
|
||||
val := atomic.AddInt32(&f.cnt, 1)
|
||||
if val >= f.failAfter {
|
||||
return restic.ID{}, f.err
|
||||
return restic.ID{}, false, f.err
|
||||
}
|
||||
|
||||
return f.Repository.SaveBlob(ctx, t, buf, id)
|
||||
return f.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate)
|
||||
}
|
||||
|
||||
func TestArchiverAbortEarlyOnError(t *testing.T) {
|
||||
|
@@ -2,7 +2,6 @@ package archiver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
@@ -11,18 +10,14 @@ import (
|
||||
|
||||
// Saver allows saving a blob.
|
||||
type Saver interface {
|
||||
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) (restic.ID, error)
|
||||
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error)
|
||||
Index() restic.Index
|
||||
}
|
||||
|
||||
// BlobSaver concurrently saves incoming blobs to the repo.
|
||||
type BlobSaver struct {
|
||||
repo Saver
|
||||
|
||||
m sync.Mutex
|
||||
knownBlobs restic.BlobSet
|
||||
|
||||
ch chan<- saveBlobJob
|
||||
ch chan<- saveBlobJob
|
||||
}
|
||||
|
||||
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
|
||||
@@ -30,9 +25,8 @@ type BlobSaver struct {
|
||||
func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver {
|
||||
ch := make(chan saveBlobJob)
|
||||
s := &BlobSaver{
|
||||
repo: repo,
|
||||
knownBlobs: restic.NewBlobSet(),
|
||||
ch: ch,
|
||||
repo: repo,
|
||||
ch: ch,
|
||||
}
|
||||
|
||||
for i := uint(0); i < workers; i++ {
|
||||
@@ -106,45 +100,15 @@ type saveBlobResponse struct {
|
||||
}
|
||||
|
||||
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
|
||||
id := restic.Hash(buf)
|
||||
h := restic.BlobHandle{ID: id, Type: t}
|
||||
id, known, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
|
||||
|
||||
// check if another goroutine has already saved this blob
|
||||
known := false
|
||||
s.m.Lock()
|
||||
if s.knownBlobs.Has(h) {
|
||||
known = true
|
||||
} else {
|
||||
s.knownBlobs.Insert(h)
|
||||
known = false
|
||||
}
|
||||
s.m.Unlock()
|
||||
|
||||
// blob is already known, nothing to do
|
||||
if known {
|
||||
return saveBlobResponse{
|
||||
id: id,
|
||||
known: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// check if the repo knows this blob
|
||||
if s.repo.Index().Has(id, t) {
|
||||
return saveBlobResponse{
|
||||
id: id,
|
||||
known: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// otherwise we're responsible for saving it
|
||||
_, err := s.repo.SaveBlob(ctx, t, buf, id)
|
||||
if err != nil {
|
||||
return saveBlobResponse{}, err
|
||||
}
|
||||
|
||||
return saveBlobResponse{
|
||||
id: id,
|
||||
known: false,
|
||||
known: known,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@@ -21,13 +21,13 @@ type saveFail struct {
|
||||
failAt int32
|
||||
}
|
||||
|
||||
func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) {
|
||||
func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicates bool) (restic.ID, bool, error) {
|
||||
val := atomic.AddInt32(&b.cnt, 1)
|
||||
if val == b.failAt {
|
||||
return restic.ID{}, errTest
|
||||
return restic.ID{}, false, errTest
|
||||
}
|
||||
|
||||
return id, nil
|
||||
return id, false, nil
|
||||
}
|
||||
|
||||
func (b *saveFail) Index() restic.Index {
|
||||
|
@@ -1,53 +0,0 @@
|
||||
package archiver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// IndexUploader polls the repo for full indexes and uploads them.
|
||||
type IndexUploader struct {
|
||||
restic.Repository
|
||||
|
||||
// Start is called when an index is to be uploaded.
|
||||
Start func()
|
||||
|
||||
// Complete is called when uploading an index has finished.
|
||||
Complete func(id restic.ID)
|
||||
}
|
||||
|
||||
// Upload periodically uploads full indexes to the repo. When shutdown is
|
||||
// cancelled, the last index upload will finish and then Upload returns.
|
||||
func (u IndexUploader) Upload(ctx, shutdown context.Context, interval time.Duration) error {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-shutdown.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
full := u.Repository.Index().(*repository.MasterIndex).FullIndexes()
|
||||
for _, idx := range full {
|
||||
if u.Start != nil {
|
||||
u.Start()
|
||||
}
|
||||
|
||||
id, err := repository.SaveIndex(ctx, u.Repository, idx)
|
||||
if err != nil {
|
||||
debug.Log("save indexes returned an error: %v", err)
|
||||
return err
|
||||
}
|
||||
if u.Complete != nil {
|
||||
u.Complete(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user