mirror of
https://github.com/restic/restic.git
synced 2025-10-09 07:51:51 +00:00
backend: Split RetryBackend into own package
The RetryBackend tests depend on the mock backend. When the Backend interface is eventually split from the restic package, this will lead to a dependency cycle between backend and backend/mock. Thus split the RetryBackend into a separate package to avoid this problem.
This commit is contained in:
200
internal/backend/retry/backend_retry.go
Normal file
200
internal/backend/retry/backend_retry.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package retry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// Backend retries operations on the backend in case of an error with a
|
||||
// backoff.
|
||||
type Backend struct {
|
||||
restic.Backend
|
||||
MaxTries int
|
||||
Report func(string, error, time.Duration)
|
||||
Success func(string, int)
|
||||
}
|
||||
|
||||
// statically ensure that RetryBackend implements restic.Backend.
|
||||
var _ restic.Backend = &Backend{}
|
||||
|
||||
// New wraps be with a backend that retries operations after a
|
||||
// backoff. report is called with a description and the error, if one occurred.
|
||||
// success is called with the number of retries before a successful operation
|
||||
// (it is not called if it succeeded on the first try)
|
||||
func New(be restic.Backend, maxTries int, report func(string, error, time.Duration), success func(string, int)) *Backend {
|
||||
return &Backend{
|
||||
Backend: be,
|
||||
MaxTries: maxTries,
|
||||
Report: report,
|
||||
Success: success,
|
||||
}
|
||||
}
|
||||
|
||||
// retryNotifyErrorWithSuccess is an extension of backoff.RetryNotify with notification of success after an error.
|
||||
// success is NOT notified on the first run of operation (only after an error).
|
||||
func retryNotifyErrorWithSuccess(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, success func(retries int)) error {
|
||||
if success == nil {
|
||||
return backoff.RetryNotify(operation, b, notify)
|
||||
}
|
||||
retries := 0
|
||||
operationWrapper := func() error {
|
||||
err := operation()
|
||||
if err != nil {
|
||||
retries++
|
||||
} else if retries > 0 {
|
||||
success(retries)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return backoff.RetryNotify(operationWrapper, b, notify)
|
||||
}
|
||||
|
||||
var fastRetries = false
|
||||
|
||||
func (be *Backend) retry(ctx context.Context, msg string, f func() error) error {
|
||||
// Don't do anything when called with an already cancelled context. There would be
|
||||
// no retries in that case either, so be consistent and abort always.
|
||||
// This enforces a strict contract for backend methods: Using a cancelled context
|
||||
// will prevent any backup repository modifications. This simplifies ensuring that
|
||||
// a backup repository is not modified any further after a context was cancelled.
|
||||
// The 'local' backend for example does not provide this guarantee on its own.
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
bo := backoff.NewExponentialBackOff()
|
||||
if fastRetries {
|
||||
// speed up integration tests
|
||||
bo.InitialInterval = 1 * time.Millisecond
|
||||
}
|
||||
|
||||
err := retryNotifyErrorWithSuccess(f,
|
||||
backoff.WithContext(backoff.WithMaxRetries(bo, uint64(be.MaxTries)), ctx),
|
||||
func(err error, d time.Duration) {
|
||||
if be.Report != nil {
|
||||
be.Report(msg, err, d)
|
||||
}
|
||||
},
|
||||
func(retries int) {
|
||||
if be.Success != nil {
|
||||
be.Success(msg, retries)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Save stores the data in the backend under the given handle.
|
||||
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
return be.retry(ctx, fmt.Sprintf("Save(%v)", h), func() error {
|
||||
err := rd.Rewind()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = be.Backend.Save(ctx, h, rd)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if be.Backend.HasAtomicReplace() {
|
||||
debug.Log("Save(%v) failed with error: %v", h, err)
|
||||
// there is no need to remove files from backends which can atomically replace files
|
||||
// in fact if something goes wrong at the backend side the delete operation might delete the wrong instance of the file
|
||||
} else {
|
||||
debug.Log("Save(%v) failed with error, removing file: %v", h, err)
|
||||
rerr := be.Backend.Remove(ctx, h)
|
||||
if rerr != nil {
|
||||
debug.Log("Remove(%v) returned error: %v", h, err)
|
||||
}
|
||||
}
|
||||
|
||||
// return original error
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is larger than zero, only a portion of the file
|
||||
// is returned. rd must be closed after use. If an error is returned, the
|
||||
// ReadCloser must be nil.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) {
|
||||
return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
|
||||
func() error {
|
||||
return be.Backend.Load(ctx, h, length, offset, consumer)
|
||||
})
|
||||
}
|
||||
|
||||
// Stat returns information about the File identified by h.
|
||||
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (fi restic.FileInfo, err error) {
|
||||
err = be.retry(ctx, fmt.Sprintf("Stat(%v)", h),
|
||||
func() error {
|
||||
var innerError error
|
||||
fi, innerError = be.Backend.Stat(ctx, h)
|
||||
|
||||
return innerError
|
||||
})
|
||||
return fi, err
|
||||
}
|
||||
|
||||
// Remove removes a File with type t and name.
|
||||
func (be *Backend) Remove(ctx context.Context, h restic.Handle) (err error) {
|
||||
return be.retry(ctx, fmt.Sprintf("Remove(%v)", h), func() error {
|
||||
return be.Backend.Remove(ctx, h)
|
||||
})
|
||||
}
|
||||
|
||||
// Test a boolean value whether a File with the name and type exists.
|
||||
func (be *Backend) Test(ctx context.Context, h restic.Handle) (exists bool, err error) {
|
||||
err = be.retry(ctx, fmt.Sprintf("Test(%v)", h), func() error {
|
||||
var innerError error
|
||||
exists, innerError = be.Backend.Test(ctx, h)
|
||||
|
||||
return innerError
|
||||
})
|
||||
return exists, err
|
||||
}
|
||||
|
||||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error is returned by the underlying backend, the request is retried. When fn
|
||||
// returns an error, the operation is aborted and the error is returned to the
|
||||
// caller.
|
||||
func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
// create a new context that we can cancel when fn returns an error, so
|
||||
// that listing is aborted
|
||||
listCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
listed := make(map[string]struct{}) // remember for which files we already ran fn
|
||||
var innerErr error // remember when fn returned an error, so we can return that to the caller
|
||||
|
||||
err := be.retry(listCtx, fmt.Sprintf("List(%v)", t), func() error {
|
||||
return be.Backend.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
if _, ok := listed[fi.Name]; ok {
|
||||
return nil
|
||||
}
|
||||
listed[fi.Name] = struct{}{}
|
||||
|
||||
innerErr = fn(fi)
|
||||
if innerErr != nil {
|
||||
// if fn returned an error, listing is aborted, so we cancel the context
|
||||
cancel()
|
||||
}
|
||||
return innerErr
|
||||
})
|
||||
})
|
||||
|
||||
// the error fn returned takes precedence
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
365
internal/backend/retry/backend_retry_test.go
Normal file
365
internal/backend/retry/backend_retry_test.go
Normal file
@@ -0,0 +1,365 @@
|
||||
package retry
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
func TestBackendSaveRetry(t *testing.T) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
errcount := 0
|
||||
be := &mock.Backend{
|
||||
SaveFn: func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if errcount == 0 {
|
||||
errcount++
|
||||
_, err := io.CopyN(ioutil.Discard, rd, 120)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.New("injected error")
|
||||
}
|
||||
|
||||
_, err := io.Copy(buf, rd)
|
||||
return err
|
||||
},
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(be, 10, nil, nil)
|
||||
|
||||
data := test.Random(23, 5*1024*1024+11241)
|
||||
err := retryBackend.Save(context.TODO(), restic.Handle{}, restic.NewByteReader(data, be.Hasher()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(data) != buf.Len() {
|
||||
t.Errorf("wrong number of bytes written: want %d, got %d", len(data), buf.Len())
|
||||
}
|
||||
|
||||
if !bytes.Equal(data, buf.Bytes()) {
|
||||
t.Errorf("wrong data written to backend")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackendSaveRetryAtomic(t *testing.T) {
|
||||
errcount := 0
|
||||
calledRemove := false
|
||||
be := &mock.Backend{
|
||||
SaveFn: func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if errcount == 0 {
|
||||
errcount++
|
||||
return errors.New("injected error")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
RemoveFn: func(ctx context.Context, h restic.Handle) error {
|
||||
calledRemove = true
|
||||
return nil
|
||||
},
|
||||
HasAtomicReplaceFn: func() bool { return true },
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(be, 10, nil, nil)
|
||||
|
||||
data := test.Random(23, 5*1024*1024+11241)
|
||||
err := retryBackend.Save(context.TODO(), restic.Handle{}, restic.NewByteReader(data, be.Hasher()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if calledRemove {
|
||||
t.Fatal("remove must not be called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackendListRetry(t *testing.T) {
|
||||
const (
|
||||
ID1 = "id1"
|
||||
ID2 = "id2"
|
||||
)
|
||||
|
||||
retry := 0
|
||||
be := &mock.Backend{
|
||||
ListFn: func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
// fail during first retry, succeed during second
|
||||
retry++
|
||||
if retry == 1 {
|
||||
_ = fn(restic.FileInfo{Name: ID1})
|
||||
return errors.New("test list error")
|
||||
}
|
||||
_ = fn(restic.FileInfo{Name: ID1})
|
||||
_ = fn(restic.FileInfo{Name: ID2})
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(be, 10, nil, nil)
|
||||
|
||||
var listed []string
|
||||
err := retryBackend.List(context.TODO(), restic.PackFile, func(fi restic.FileInfo) error {
|
||||
listed = append(listed, fi.Name)
|
||||
return nil
|
||||
})
|
||||
test.OK(t, err) // assert overall success
|
||||
test.Equals(t, 2, retry) // assert retried once
|
||||
test.Equals(t, []string{ID1, ID2}, listed) // assert no duplicate files
|
||||
}
|
||||
|
||||
func TestBackendListRetryErrorFn(t *testing.T) {
|
||||
var names = []string{"id1", "id2", "foo", "bar"}
|
||||
|
||||
be := &mock.Backend{
|
||||
ListFn: func(ctx context.Context, tpe restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
t.Logf("List called for %v", tpe)
|
||||
for _, name := range names {
|
||||
err := fn(restic.FileInfo{Name: name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(be, 10, nil, nil)
|
||||
|
||||
var ErrTest = errors.New("test error")
|
||||
|
||||
var listed []string
|
||||
run := 0
|
||||
err := retryBackend.List(context.TODO(), restic.PackFile, func(fi restic.FileInfo) error {
|
||||
t.Logf("fn called for %v", fi.Name)
|
||||
run++
|
||||
// return an error for the third item in the list
|
||||
if run == 3 {
|
||||
t.Log("returning an error")
|
||||
return ErrTest
|
||||
}
|
||||
listed = append(listed, fi.Name)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != ErrTest {
|
||||
t.Fatalf("wrong error returned, want %v, got %v", ErrTest, err)
|
||||
}
|
||||
|
||||
// processing should stop after the error was returned, so run should be 3
|
||||
if run != 3 {
|
||||
t.Fatalf("function was called %d times, wanted %v", run, 3)
|
||||
}
|
||||
|
||||
test.Equals(t, []string{"id1", "id2"}, listed)
|
||||
}
|
||||
|
||||
func TestBackendListRetryErrorBackend(t *testing.T) {
|
||||
var names = []string{"id1", "id2", "foo", "bar"}
|
||||
|
||||
var ErrBackendTest = errors.New("test error")
|
||||
|
||||
retries := 0
|
||||
be := &mock.Backend{
|
||||
ListFn: func(ctx context.Context, tpe restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
t.Logf("List called for %v, retries %v", tpe, retries)
|
||||
retries++
|
||||
for i, name := range names {
|
||||
if i == 2 {
|
||||
return ErrBackendTest
|
||||
}
|
||||
|
||||
err := fn(restic.FileInfo{Name: name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
const maxRetries = 2
|
||||
retryBackend := New(be, maxRetries, nil, nil)
|
||||
|
||||
var listed []string
|
||||
err := retryBackend.List(context.TODO(), restic.PackFile, func(fi restic.FileInfo) error {
|
||||
t.Logf("fn called for %v", fi.Name)
|
||||
listed = append(listed, fi.Name)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != ErrBackendTest {
|
||||
t.Fatalf("wrong error returned, want %v, got %v", ErrBackendTest, err)
|
||||
}
|
||||
|
||||
if retries != maxRetries+1 {
|
||||
t.Fatalf("List was called %d times, wanted %v", retries, maxRetries+1)
|
||||
}
|
||||
|
||||
test.Equals(t, names[:2], listed)
|
||||
}
|
||||
|
||||
// failingReader returns an error after reading limit number of bytes
|
||||
type failingReader struct {
|
||||
data []byte
|
||||
pos int
|
||||
limit int
|
||||
}
|
||||
|
||||
func (r failingReader) Read(p []byte) (n int, err error) {
|
||||
i := 0
|
||||
for ; i < len(p) && i+r.pos < r.limit; i++ {
|
||||
p[i] = r.data[r.pos+i]
|
||||
}
|
||||
r.pos += i
|
||||
if r.pos >= r.limit {
|
||||
return i, errors.Errorf("reader reached limit of %d", r.limit)
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
func (r failingReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// closingReader adapts io.Reader to io.ReadCloser interface
|
||||
type closingReader struct {
|
||||
rd io.Reader
|
||||
}
|
||||
|
||||
func (r closingReader) Read(p []byte) (n int, err error) {
|
||||
return r.rd.Read(p)
|
||||
}
|
||||
func (r closingReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestBackendLoadRetry(t *testing.T) {
|
||||
data := test.Random(23, 1024)
|
||||
limit := 100
|
||||
attempt := 0
|
||||
|
||||
be := mock.NewBackend()
|
||||
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// returns failing reader on first invocation, good reader on subsequent invocations
|
||||
attempt++
|
||||
if attempt > 1 {
|
||||
return closingReader{rd: bytes.NewReader(data)}, nil
|
||||
}
|
||||
return failingReader{data: data, limit: limit}, nil
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(be, 10, nil, nil)
|
||||
|
||||
var buf []byte
|
||||
err := retryBackend.Load(context.TODO(), restic.Handle{}, 0, 0, func(rd io.Reader) (err error) {
|
||||
buf, err = ioutil.ReadAll(rd)
|
||||
return err
|
||||
})
|
||||
test.OK(t, err)
|
||||
test.Equals(t, data, buf)
|
||||
test.Equals(t, 2, attempt)
|
||||
}
|
||||
|
||||
func assertIsCanceled(t *testing.T, err error) {
|
||||
test.Assert(t, err == context.Canceled, "got unexpected err %v", err)
|
||||
}
|
||||
|
||||
func TestBackendCanceledContext(t *testing.T) {
|
||||
// unimplemented mock backend functions return an error by default
|
||||
// check that we received the expected context canceled error instead
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(mock.NewBackend(), 2, nil, nil)
|
||||
h := restic.Handle{Type: restic.PackFile, Name: restic.NewRandomID().String()}
|
||||
|
||||
// create an already canceled context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
_, err := retryBackend.Test(ctx, h)
|
||||
assertIsCanceled(t, err)
|
||||
_, err = retryBackend.Stat(ctx, h)
|
||||
assertIsCanceled(t, err)
|
||||
|
||||
err = retryBackend.Save(ctx, h, restic.NewByteReader([]byte{}, nil))
|
||||
assertIsCanceled(t, err)
|
||||
err = retryBackend.Remove(ctx, h)
|
||||
assertIsCanceled(t, err)
|
||||
err = retryBackend.Load(ctx, restic.Handle{}, 0, 0, func(rd io.Reader) (err error) {
|
||||
return nil
|
||||
})
|
||||
assertIsCanceled(t, err)
|
||||
err = retryBackend.List(ctx, restic.PackFile, func(restic.FileInfo) error {
|
||||
return nil
|
||||
})
|
||||
assertIsCanceled(t, err)
|
||||
|
||||
// don't test "Delete" as it is not used by normal code
|
||||
}
|
||||
|
||||
func TestNotifyWithSuccessIsNotCalled(t *testing.T) {
|
||||
operation := func() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
notify := func(error, time.Duration) {
|
||||
t.Fatal("Notify should not have been called")
|
||||
}
|
||||
|
||||
success := func(retries int) {
|
||||
t.Fatal("Success should not have been called")
|
||||
}
|
||||
|
||||
err := retryNotifyErrorWithSuccess(operation, &backoff.ZeroBackOff{}, notify, success)
|
||||
if err != nil {
|
||||
t.Fatal("retry should not have returned an error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNotifyWithSuccessIsCalled(t *testing.T) {
|
||||
operationCalled := 0
|
||||
operation := func() error {
|
||||
operationCalled++
|
||||
if operationCalled <= 2 {
|
||||
return errors.New("expected error in test")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
notifyCalled := 0
|
||||
notify := func(error, time.Duration) {
|
||||
notifyCalled++
|
||||
}
|
||||
|
||||
successCalled := 0
|
||||
success := func(retries int) {
|
||||
successCalled++
|
||||
}
|
||||
|
||||
err := retryNotifyErrorWithSuccess(operation, &backoff.ZeroBackOff{}, notify, success)
|
||||
if err != nil {
|
||||
t.Fatal("retry should not have returned an error")
|
||||
}
|
||||
|
||||
if notifyCalled != 2 {
|
||||
t.Fatalf("Notify should have been called 2 times, but was called %d times instead", notifyCalled)
|
||||
}
|
||||
|
||||
if successCalled != 1 {
|
||||
t.Fatalf("Success should have been called only once, but was called %d times instead", successCalled)
|
||||
}
|
||||
}
|
8
internal/backend/retry/testing.go
Normal file
8
internal/backend/retry/testing.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package retry
|
||||
|
||||
import "testing"
|
||||
|
||||
// TestFastRetries reduces the initial retry delay to 1 millisecond
|
||||
func TestFastRetries(t testing.TB) {
|
||||
fastRetries = true
|
||||
}
|
Reference in New Issue
Block a user