mirror of
https://github.com/restic/restic.git
synced 2025-12-11 18:47:50 +00:00
backends: pass error logger to backends
This commit is contained in:
@@ -414,7 +414,7 @@ func OpenRepository(ctx context.Context, opts GlobalOptions, printer progress.Pr
|
||||
}
|
||||
|
||||
// start using the cache
|
||||
s.UseCache(c)
|
||||
s.UseCache(c, printer.E)
|
||||
|
||||
oldCacheDirs, err := cache.Old(c.Base)
|
||||
if err != nil {
|
||||
@@ -492,9 +492,9 @@ func innerOpen(ctx context.Context, s string, gopts GlobalOptions, opts options.
|
||||
|
||||
var be backend.Backend
|
||||
if create {
|
||||
be, err = factory.Create(ctx, cfg, rt, lim)
|
||||
be, err = factory.Create(ctx, cfg, rt, lim, printer.E)
|
||||
} else {
|
||||
be, err = factory.Open(ctx, cfg, rt, lim)
|
||||
be, err = factory.Open(ctx, cfg, rt, lim, printer.E)
|
||||
}
|
||||
|
||||
if errors.Is(err, backend.ErrNoRepository) {
|
||||
|
||||
@@ -159,13 +159,13 @@ func supportedAccessTiers() []blob.AccessTier {
|
||||
}
|
||||
|
||||
// Open opens the Azure backend at specified container.
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (*Backend, error) {
|
||||
return open(cfg, rt)
|
||||
}
|
||||
|
||||
// Create opens the Azure backend at specified container and creates the container if
|
||||
// it does not exist yet.
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (*Backend, error) {
|
||||
be, err := open(cfg, rt)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -116,7 +116,7 @@ func TestBackendAzureAccountToken(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = azure.Create(ctx, *cfg, tr)
|
||||
_, err = azure.Create(ctx, *cfg, tr, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -159,7 +159,7 @@ func TestBackendAzureContainerToken(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = azure.Create(ctx, *cfg, tr)
|
||||
_, err = azure.Create(ctx, *cfg, tr, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -193,7 +193,7 @@ func TestUploadLargeFile(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
be, err := azure.Create(ctx, *cfg, tr)
|
||||
be, err := azure.Create(ctx, *cfg, tr, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -88,7 +88,7 @@ func newClient(ctx context.Context, cfg Config, rt http.RoundTripper) (*b2.Clien
|
||||
}
|
||||
|
||||
// Open opens a connection to the B2 service.
|
||||
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
|
||||
func Open(ctx context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (backend.Backend, error) {
|
||||
debug.Log("cfg %#v", cfg)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@@ -120,7 +120,7 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backen
|
||||
|
||||
// Create opens a connection to the B2 service. If the bucket does not exist yet,
|
||||
// it is created.
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (backend.Backend, error) {
|
||||
debug.Log("cfg %#v", cfg)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
8
internal/backend/cache/backend.go
vendored
8
internal/backend/cache/backend.go
vendored
@@ -2,9 +2,7 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
@@ -22,16 +20,18 @@ type Backend struct {
|
||||
// is finished.
|
||||
inProgressMutex sync.Mutex
|
||||
inProgress map[backend.Handle]chan struct{}
|
||||
errorLog func(string, ...interface{})
|
||||
}
|
||||
|
||||
// ensure Backend implements backend.Backend
|
||||
var _ backend.Backend = &Backend{}
|
||||
|
||||
func newBackend(be backend.Backend, c *Cache) *Backend {
|
||||
func newBackend(be backend.Backend, c *Cache, errorLog func(string, ...interface{})) *Backend {
|
||||
return &Backend{
|
||||
Backend: be,
|
||||
Cache: c,
|
||||
inProgress: make(map[backend.Handle]chan struct{}),
|
||||
errorLog: errorLog,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -253,7 +253,7 @@ func (b *Backend) List(ctx context.Context, t backend.FileType, fn func(f backen
|
||||
// clear the cache for files that are not in the repo anymore, ignore errors
|
||||
err = b.Cache.Clear(t, ids)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error clearing %s files in cache: %v\n", t.String(), err)
|
||||
b.errorLog("error clearing %s files in cache: %v\n", t.String(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
12
internal/backend/cache/backend_test.go
vendored
12
internal/backend/cache/backend_test.go
vendored
@@ -67,7 +67,7 @@ func list(t testing.TB, be backend.Backend, fn func(backend.FileInfo) error) {
|
||||
func TestBackend(t *testing.T) {
|
||||
be := mem.New()
|
||||
c := TestNewCache(t)
|
||||
wbe := c.Wrap(be)
|
||||
wbe := c.Wrap(be, t.Logf)
|
||||
|
||||
h, data := randomData(5234142)
|
||||
|
||||
@@ -135,7 +135,7 @@ func (l *loadCountingBackend) Load(ctx context.Context, h backend.Handle, length
|
||||
func TestOutOfBoundsAccess(t *testing.T) {
|
||||
be := &loadCountingBackend{Backend: mem.New()}
|
||||
c := TestNewCache(t)
|
||||
wbe := c.Wrap(be)
|
||||
wbe := c.Wrap(be, t.Logf)
|
||||
|
||||
h, data := randomData(50)
|
||||
save(t, be, h, data)
|
||||
@@ -164,7 +164,7 @@ func TestOutOfBoundsAccess(t *testing.T) {
|
||||
func TestForget(t *testing.T) {
|
||||
be := &loadCountingBackend{Backend: mem.New()}
|
||||
c := TestNewCache(t)
|
||||
wbe := c.Wrap(be)
|
||||
wbe := c.Wrap(be, t.Logf)
|
||||
|
||||
h, data := randomData(50)
|
||||
save(t, be, h, data)
|
||||
@@ -236,7 +236,7 @@ func TestErrorBackend(t *testing.T) {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
wrappedBE := c.Wrap(errBackend)
|
||||
wrappedBE := c.Wrap(errBackend, t.Logf)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
@@ -249,7 +249,7 @@ func TestErrorBackend(t *testing.T) {
|
||||
func TestAutomaticCacheClear(t *testing.T) {
|
||||
be := mem.New()
|
||||
c := TestNewCache(t)
|
||||
wbe := c.Wrap(be)
|
||||
wbe := c.Wrap(be, t.Logf)
|
||||
|
||||
// add two handles h1 and h2
|
||||
h1, data := randomData(2000)
|
||||
@@ -308,7 +308,7 @@ func TestAutomaticCacheClearInvalidFilename(t *testing.T) {
|
||||
}
|
||||
save(t, be, h, data)
|
||||
|
||||
wbe := c.Wrap(be)
|
||||
wbe := c.Wrap(be, t.Logf)
|
||||
|
||||
// list all files in the backend
|
||||
list(t, wbe, func(_ backend.FileInfo) error { return nil })
|
||||
|
||||
4
internal/backend/cache/cache.go
vendored
4
internal/backend/cache/cache.go
vendored
@@ -237,8 +237,8 @@ func IsOld(t time.Time, maxAge time.Duration) bool {
|
||||
}
|
||||
|
||||
// Wrap returns a backend with a cache.
|
||||
func (c *Cache) Wrap(be backend.Backend) backend.Backend {
|
||||
return newBackend(be, c)
|
||||
func (c *Cache) Wrap(be backend.Backend, errorLog func(string, ...interface{})) backend.Backend {
|
||||
return newBackend(be, c, errorLog)
|
||||
}
|
||||
|
||||
// BaseDir returns the base directory.
|
||||
|
||||
@@ -120,7 +120,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
}
|
||||
|
||||
// Open opens the gs backend at the specified bucket.
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (backend.Backend, error) {
|
||||
return open(cfg, rt)
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ func Open(_ context.Context, cfg Config, rt http.RoundTripper) (backend.Backend,
|
||||
//
|
||||
// The service account must have the "storage.buckets.create" permission to
|
||||
// create a bucket the does not yet exist.
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (backend.Backend, error) {
|
||||
be, err := open(cfg, rt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -7,10 +7,10 @@ import (
|
||||
"github.com/restic/restic/internal/backend"
|
||||
)
|
||||
|
||||
func WrapBackendConstructor[B backend.Backend, C any](constructor func(ctx context.Context, cfg C) (B, error)) func(ctx context.Context, cfg C, lim Limiter) (backend.Backend, error) {
|
||||
return func(ctx context.Context, cfg C, lim Limiter) (backend.Backend, error) {
|
||||
func WrapBackendConstructor[B backend.Backend, C any](constructor func(ctx context.Context, cfg C, errorLog func(string, ...interface{})) (B, error)) func(ctx context.Context, cfg C, lim Limiter, errorLog func(string, ...interface{})) (backend.Backend, error) {
|
||||
return func(ctx context.Context, cfg C, lim Limiter, errorLog func(string, ...interface{})) (backend.Backend, error) {
|
||||
var be backend.Backend
|
||||
be, err := constructor(ctx, cfg)
|
||||
be, err := constructor(ctx, cfg, errorLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestLayout(t *testing.T) {
|
||||
be, err := Open(context.TODO(), Config{
|
||||
Path: repo,
|
||||
Connections: 2,
|
||||
})
|
||||
}, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -52,14 +52,14 @@ func open(cfg Config) (*Local, error) {
|
||||
}
|
||||
|
||||
// Open opens the local backend as specified by config.
|
||||
func Open(_ context.Context, cfg Config) (*Local, error) {
|
||||
func Open(_ context.Context, cfg Config, _ func(string, ...interface{})) (*Local, error) {
|
||||
debug.Log("open local backend at %v", cfg.Path)
|
||||
return open(cfg)
|
||||
}
|
||||
|
||||
// Create creates all the necessary files and directories for a new local
|
||||
// backend at dir. Afterwards a new config blob should be created.
|
||||
func Create(_ context.Context, cfg Config) (*Local, error) {
|
||||
func Create(_ context.Context, cfg Config, _ func(string, ...interface{})) (*Local, error) {
|
||||
debug.Log("create local backend at %v", cfg.Path)
|
||||
|
||||
be, err := open(cfg)
|
||||
|
||||
@@ -26,7 +26,7 @@ func TestNoSpacePermanent(t *testing.T) {
|
||||
|
||||
dir := rtest.TempDir(t)
|
||||
|
||||
be, err := Open(context.Background(), Config{Path: dir, Connections: 2})
|
||||
be, err := Open(context.Background(), Config{Path: dir, Connections: 2}, t.Logf)
|
||||
rtest.OK(t, err)
|
||||
defer func() {
|
||||
rtest.OK(t, be.Close())
|
||||
|
||||
@@ -66,7 +66,7 @@ func empty(t testing.TB, dir string) {
|
||||
func openclose(t testing.TB, dir string) {
|
||||
cfg := local.Config{Path: dir}
|
||||
|
||||
be, err := local.Open(context.TODO(), cfg)
|
||||
be, err := local.Open(context.TODO(), cfg, t.Logf)
|
||||
if err != nil {
|
||||
t.Logf("Open returned error %v", err)
|
||||
}
|
||||
|
||||
@@ -33,16 +33,16 @@ type Factory interface {
|
||||
Scheme() string
|
||||
ParseConfig(s string) (interface{}, error)
|
||||
StripPassword(s string) string
|
||||
Create(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter) (backend.Backend, error)
|
||||
Open(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter) (backend.Backend, error)
|
||||
Create(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (backend.Backend, error)
|
||||
Open(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (backend.Backend, error)
|
||||
}
|
||||
|
||||
type genericBackendFactory[C any, T backend.Backend] struct {
|
||||
scheme string
|
||||
parseConfigFn func(s string) (*C, error)
|
||||
stripPasswordFn func(s string) string
|
||||
createFn func(ctx context.Context, cfg C, rt http.RoundTripper, lim limiter.Limiter) (T, error)
|
||||
openFn func(ctx context.Context, cfg C, rt http.RoundTripper, lim limiter.Limiter) (T, error)
|
||||
createFn func(ctx context.Context, cfg C, rt http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (T, error)
|
||||
openFn func(ctx context.Context, cfg C, rt http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (T, error)
|
||||
}
|
||||
|
||||
func (f *genericBackendFactory[C, T]) Scheme() string {
|
||||
@@ -58,29 +58,29 @@ func (f *genericBackendFactory[C, T]) StripPassword(s string) string {
|
||||
}
|
||||
return s
|
||||
}
|
||||
func (f *genericBackendFactory[C, T]) Create(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter) (backend.Backend, error) {
|
||||
return f.createFn(ctx, *cfg.(*C), rt, lim)
|
||||
func (f *genericBackendFactory[C, T]) Create(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (backend.Backend, error) {
|
||||
return f.createFn(ctx, *cfg.(*C), rt, lim, errorLog)
|
||||
}
|
||||
func (f *genericBackendFactory[C, T]) Open(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter) (backend.Backend, error) {
|
||||
return f.openFn(ctx, *cfg.(*C), rt, lim)
|
||||
func (f *genericBackendFactory[C, T]) Open(ctx context.Context, cfg interface{}, rt http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (backend.Backend, error) {
|
||||
return f.openFn(ctx, *cfg.(*C), rt, lim, errorLog)
|
||||
}
|
||||
|
||||
func NewHTTPBackendFactory[C any, T backend.Backend](
|
||||
scheme string,
|
||||
parseConfigFn func(s string) (*C, error),
|
||||
stripPasswordFn func(s string) string,
|
||||
createFn func(ctx context.Context, cfg C, rt http.RoundTripper) (T, error),
|
||||
openFn func(ctx context.Context, cfg C, rt http.RoundTripper) (T, error)) Factory {
|
||||
createFn func(ctx context.Context, cfg C, rt http.RoundTripper, errorLog func(string, ...interface{})) (T, error),
|
||||
openFn func(ctx context.Context, cfg C, rt http.RoundTripper, errorLog func(string, ...interface{})) (T, error)) Factory {
|
||||
|
||||
return &genericBackendFactory[C, T]{
|
||||
scheme: scheme,
|
||||
parseConfigFn: parseConfigFn,
|
||||
stripPasswordFn: stripPasswordFn,
|
||||
createFn: func(ctx context.Context, cfg C, rt http.RoundTripper, _ limiter.Limiter) (T, error) {
|
||||
return createFn(ctx, cfg, rt)
|
||||
createFn: func(ctx context.Context, cfg C, rt http.RoundTripper, _ limiter.Limiter, errorLog func(string, ...interface{})) (T, error) {
|
||||
return createFn(ctx, cfg, rt, errorLog)
|
||||
},
|
||||
openFn: func(ctx context.Context, cfg C, rt http.RoundTripper, _ limiter.Limiter) (T, error) {
|
||||
return openFn(ctx, cfg, rt)
|
||||
openFn: func(ctx context.Context, cfg C, rt http.RoundTripper, _ limiter.Limiter, errorLog func(string, ...interface{})) (T, error) {
|
||||
return openFn(ctx, cfg, rt, errorLog)
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -89,18 +89,18 @@ func NewLimitedBackendFactory[C any, T backend.Backend](
|
||||
scheme string,
|
||||
parseConfigFn func(s string) (*C, error),
|
||||
stripPasswordFn func(s string) string,
|
||||
createFn func(ctx context.Context, cfg C, lim limiter.Limiter) (T, error),
|
||||
openFn func(ctx context.Context, cfg C, lim limiter.Limiter) (T, error)) Factory {
|
||||
createFn func(ctx context.Context, cfg C, lim limiter.Limiter, errorLog func(string, ...interface{})) (T, error),
|
||||
openFn func(ctx context.Context, cfg C, lim limiter.Limiter, errorLog func(string, ...interface{})) (T, error)) Factory {
|
||||
|
||||
return &genericBackendFactory[C, T]{
|
||||
scheme: scheme,
|
||||
parseConfigFn: parseConfigFn,
|
||||
stripPasswordFn: stripPasswordFn,
|
||||
createFn: func(ctx context.Context, cfg C, _ http.RoundTripper, lim limiter.Limiter) (T, error) {
|
||||
return createFn(ctx, cfg, lim)
|
||||
createFn: func(ctx context.Context, cfg C, _ http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (T, error) {
|
||||
return createFn(ctx, cfg, lim, errorLog)
|
||||
},
|
||||
openFn: func(ctx context.Context, cfg C, _ http.RoundTripper, lim limiter.Limiter) (T, error) {
|
||||
return openFn(ctx, cfg, lim)
|
||||
openFn: func(ctx context.Context, cfg C, _ http.RoundTripper, lim limiter.Limiter, errorLog func(string, ...interface{})) (T, error) {
|
||||
return openFn(ctx, cfg, lim, errorLog)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,10 +33,10 @@ func NewFactory() location.Factory {
|
||||
return &struct{}{}, nil
|
||||
},
|
||||
location.NoPassword,
|
||||
func(_ context.Context, _ struct{}, _ http.RoundTripper) (*MemoryBackend, error) {
|
||||
func(_ context.Context, _ struct{}, _ http.RoundTripper, _ func(string, ...interface{})) (*MemoryBackend, error) {
|
||||
return be, nil
|
||||
},
|
||||
func(_ context.Context, _ struct{}, _ http.RoundTripper) (*MemoryBackend, error) {
|
||||
func(_ context.Context, _ struct{}, _ http.RoundTripper, _ func(string, ...interface{})) (*MemoryBackend, error) {
|
||||
return be, nil
|
||||
},
|
||||
)
|
||||
|
||||
@@ -43,7 +43,7 @@ func NewFactory() location.Factory {
|
||||
}
|
||||
|
||||
// run starts command with args and initializes the StdioConn.
|
||||
func run(command string, args ...string) (*StdioConn, *sync.WaitGroup, chan struct{}, func() error, error) {
|
||||
func run(errorLog func(string, ...interface{}), command string, args ...string) (*StdioConn, *sync.WaitGroup, chan struct{}, func() error, error) {
|
||||
cmd := exec.Command(command, args...)
|
||||
|
||||
p, err := cmd.StderrPipe()
|
||||
@@ -61,7 +61,7 @@ func run(command string, args ...string) (*StdioConn, *sync.WaitGroup, chan stru
|
||||
defer close(waitCh)
|
||||
sc := bufio.NewScanner(p)
|
||||
for sc.Scan() {
|
||||
fmt.Fprintf(os.Stderr, "rclone: %v\n", sc.Text())
|
||||
errorLog("rclone: %v\n", sc.Text())
|
||||
}
|
||||
debug.Log("command has exited, closing waitCh")
|
||||
}()
|
||||
@@ -140,7 +140,7 @@ func wrapConn(c *StdioConn, lim limiter.Limiter) *wrappedConn {
|
||||
}
|
||||
|
||||
// New initializes a Backend and starts the process.
|
||||
func newBackend(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
|
||||
func newBackend(ctx context.Context, cfg Config, lim limiter.Limiter, errorLog func(string, ...interface{})) (*Backend, error) {
|
||||
var (
|
||||
args []string
|
||||
err error
|
||||
@@ -170,7 +170,7 @@ func newBackend(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend,
|
||||
arg0, args := args[0], args[1:]
|
||||
|
||||
debug.Log("running command: %v %v", arg0, args)
|
||||
stdioConn, wg, waitCh, bg, err := run(arg0, args...)
|
||||
stdioConn, wg, waitCh, bg, err := run(errorLog, arg0, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -263,8 +263,8 @@ func newBackend(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend,
|
||||
}
|
||||
|
||||
// Open starts an rclone process with the given config.
|
||||
func Open(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
|
||||
be, err := newBackend(ctx, cfg, lim)
|
||||
func Open(ctx context.Context, cfg Config, lim limiter.Limiter, errorLog func(string, ...interface{})) (*Backend, error) {
|
||||
be, err := newBackend(ctx, cfg, lim, errorLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -279,7 +279,7 @@ func Open(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error
|
||||
URL: url,
|
||||
}
|
||||
|
||||
restBackend, err := rest.Open(ctx, restConfig, debug.RoundTripper(be.tr))
|
||||
restBackend, err := rest.Open(ctx, restConfig, debug.RoundTripper(be.tr), errorLog)
|
||||
if err != nil {
|
||||
_ = be.Close()
|
||||
return nil, err
|
||||
@@ -290,8 +290,8 @@ func Open(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error
|
||||
}
|
||||
|
||||
// Create initializes a new restic repo with rclone.
|
||||
func Create(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
|
||||
be, err := newBackend(ctx, cfg, lim)
|
||||
func Create(ctx context.Context, cfg Config, lim limiter.Limiter, errorLog func(string, ...interface{})) (*Backend, error) {
|
||||
be, err := newBackend(ctx, cfg, lim, errorLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -308,7 +308,7 @@ func Create(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, err
|
||||
URL: url,
|
||||
}
|
||||
|
||||
restBackend, err := rest.Create(ctx, restConfig, debug.RoundTripper(be.tr))
|
||||
restBackend, err := rest.Create(ctx, restConfig, debug.RoundTripper(be.tr), errorLog)
|
||||
if err != nil {
|
||||
_ = be.Close()
|
||||
return nil, err
|
||||
|
||||
@@ -15,7 +15,7 @@ func TestRcloneExit(t *testing.T) {
|
||||
dir := rtest.TempDir(t)
|
||||
cfg := NewConfig()
|
||||
cfg.Remote = dir
|
||||
be, err := Open(context.TODO(), cfg, nil)
|
||||
be, err := Open(context.TODO(), cfg, nil, t.Logf)
|
||||
var e *exec.Error
|
||||
if errors.As(err, &e) && e.Err == exec.ErrNotFound {
|
||||
t.Skipf("program %q not found", e.Name)
|
||||
@@ -45,7 +45,7 @@ func TestRcloneFailedStart(t *testing.T) {
|
||||
cfg := NewConfig()
|
||||
// exits with exit code 1
|
||||
cfg.Program = "false"
|
||||
_, err := Open(context.TODO(), cfg, nil)
|
||||
_, err := Open(context.TODO(), cfg, nil, t.Logf)
|
||||
var e *exec.ExitError
|
||||
if !errors.As(err, &e) {
|
||||
// unexpected error
|
||||
|
||||
@@ -55,7 +55,7 @@ const (
|
||||
)
|
||||
|
||||
// Open opens the REST backend with the given config.
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (*Backend, error) {
|
||||
// use url without trailing slash for layout
|
||||
url := cfg.URL.String()
|
||||
if url[len(url)-1] == '/' {
|
||||
@@ -84,8 +84,8 @@ func drainAndClose(resp *http.Response) error {
|
||||
}
|
||||
|
||||
// Create creates a new REST on server configured in config.
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
be, err := Open(ctx, cfg, rt)
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper, errorLog func(string, ...interface{})) (*Backend, error) {
|
||||
be, err := Open(ctx, cfg, rt, errorLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ func TestListAPI(t *testing.T) {
|
||||
URL: srvURL,
|
||||
}
|
||||
|
||||
be, err := rest.Open(context.TODO(), cfg, http.DefaultTransport)
|
||||
be, err := rest.Open(context.TODO(), cfg, http.DefaultTransport, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ func runRESTServer(ctx context.Context, t testing.TB, dir, reqListenAddr string)
|
||||
matched = true
|
||||
}
|
||||
}
|
||||
_, _ = fmt.Fprintln(os.Stdout, line) // print all output to console
|
||||
t.Log(line)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -195,13 +195,13 @@ func getCredentials(cfg Config, tr http.RoundTripper) (*credentials.Credentials,
|
||||
|
||||
// Open opens the S3 backend at bucket and region. The bucket is created if it
|
||||
// does not exist yet.
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
|
||||
func Open(_ context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (backend.Backend, error) {
|
||||
return open(cfg, rt)
|
||||
}
|
||||
|
||||
// Create opens the S3 backend at bucket and region and creates the bucket if
|
||||
// it does not exist yet.
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (backend.Backend, error) {
|
||||
be, err := open(cfg, rt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "open")
|
||||
|
||||
@@ -117,9 +117,9 @@ func newMinioTestSuite(t testing.TB) (*test.Suite[s3.Config], func()) {
|
||||
return &cfg, nil
|
||||
},
|
||||
|
||||
Factory: location.NewHTTPBackendFactory("s3", s3.ParseConfig, location.NoPassword, func(ctx context.Context, cfg s3.Config, rt http.RoundTripper) (be backend.Backend, err error) {
|
||||
Factory: location.NewHTTPBackendFactory("s3", s3.ParseConfig, location.NoPassword, func(ctx context.Context, cfg s3.Config, rt http.RoundTripper, errorLog func(string, ...interface{})) (be backend.Backend, err error) {
|
||||
for i := 0; i < 50; i++ {
|
||||
be, err = s3.Create(ctx, cfg, rt)
|
||||
be, err = s3.Create(ctx, cfg, rt, errorLog)
|
||||
if err != nil {
|
||||
t.Logf("s3 open: try %d: error %v", i, err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
@@ -39,7 +39,7 @@ func TestLayout(t *testing.T) {
|
||||
Command: fmt.Sprintf("%q -e", sftpServer),
|
||||
Path: repo,
|
||||
Connections: 5,
|
||||
})
|
||||
}, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ func NewFactory() location.Factory {
|
||||
return location.NewLimitedBackendFactory("sftp", ParseConfig, location.NoPassword, limiter.WrapBackendConstructor(Create), limiter.WrapBackendConstructor(Open))
|
||||
}
|
||||
|
||||
func startClient(cfg Config) (*SFTP, error) {
|
||||
func startClient(cfg Config, errorLog func(string, ...interface{})) (*SFTP, error) {
|
||||
program, args, err := buildSSHCommand(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -71,7 +71,7 @@ func startClient(cfg Config) (*SFTP, error) {
|
||||
go func() {
|
||||
sc := bufio.NewScanner(stderr)
|
||||
for sc.Scan() {
|
||||
fmt.Fprintf(os.Stderr, "subprocess %v: %v\n", program, sc.Text())
|
||||
errorLog("subprocess %v: %v\n", program, sc.Text())
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -144,10 +144,10 @@ func (r *SFTP) clientError() error {
|
||||
|
||||
// Open opens an sftp backend as described by the config by running
|
||||
// "ssh" with the appropriate arguments (or cfg.Command, if set).
|
||||
func Open(_ context.Context, cfg Config) (*SFTP, error) {
|
||||
func Open(_ context.Context, cfg Config, errorLog func(string, ...interface{})) (*SFTP, error) {
|
||||
debug.Log("open backend with config %#v", cfg)
|
||||
|
||||
sftp, err := startClient(cfg)
|
||||
sftp, err := startClient(cfg, errorLog)
|
||||
if err != nil {
|
||||
debug.Log("unable to start program: %v", err)
|
||||
return nil, err
|
||||
@@ -240,8 +240,8 @@ func buildSSHCommand(cfg Config) (cmd string, args []string, err error) {
|
||||
|
||||
// Create creates an sftp backend as described by the config by running "ssh"
|
||||
// with the appropriate arguments (or cfg.Command, if set).
|
||||
func Create(ctx context.Context, cfg Config) (*SFTP, error) {
|
||||
sftp, err := startClient(cfg)
|
||||
func Create(ctx context.Context, cfg Config, errorLog func(string, ...interface{})) (*SFTP, error) {
|
||||
sftp, err := startClient(cfg, errorLog)
|
||||
if err != nil {
|
||||
debug.Log("unable to start program: %v", err)
|
||||
return nil, err
|
||||
|
||||
@@ -42,7 +42,7 @@ func NewFactory() location.Factory {
|
||||
|
||||
// Open opens the swift backend at a container in region. The container is
|
||||
// created if it does not exist yet.
|
||||
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
|
||||
func Open(ctx context.Context, cfg Config, rt http.RoundTripper, _ func(string, ...interface{})) (backend.Backend, error) {
|
||||
debug.Log("config %#v", cfg)
|
||||
|
||||
be := &beSwift{
|
||||
|
||||
@@ -155,13 +155,13 @@ func (s *Suite[C]) RunBenchmarks(b *testing.B) {
|
||||
s.cleanup(b)
|
||||
}
|
||||
|
||||
func (s *Suite[C]) createOrError() (backend.Backend, error) {
|
||||
func (s *Suite[C]) createOrError(t testing.TB) (backend.Backend, error) {
|
||||
tr, err := backend.Transport(backend.TransportOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create transport for tests: %v", err)
|
||||
}
|
||||
|
||||
be, err := s.Factory.Create(context.TODO(), s.Config, tr, nil)
|
||||
be, err := s.Factory.Create(context.TODO(), s.Config, tr, nil, t.Logf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -179,7 +179,7 @@ func (s *Suite[C]) createOrError() (backend.Backend, error) {
|
||||
}
|
||||
|
||||
func (s *Suite[C]) create(t testing.TB) backend.Backend {
|
||||
be, err := s.createOrError()
|
||||
be, err := s.createOrError(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -192,7 +192,7 @@ func (s *Suite[C]) open(t testing.TB) backend.Backend {
|
||||
t.Fatalf("cannot create transport for tests: %v", err)
|
||||
}
|
||||
|
||||
be, err := s.Factory.Open(context.TODO(), s.Config, tr, nil)
|
||||
be, err := s.Factory.Open(context.TODO(), s.Config, tr, nil, func(string, ...interface{}) {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func (s *Suite[C]) TestCreateWithConfig(t *testing.T) {
|
||||
store(t, b, backend.ConfigFile, []byte("test config"))
|
||||
|
||||
// now create the backend again, this must fail
|
||||
_, err = s.createOrError()
|
||||
_, err = s.createOrError(t)
|
||||
if err == nil {
|
||||
t.Fatalf("expected error not found for creating a backend with an existing config file")
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ func TestLoadRawBrokenWithCache(t *testing.T) {
|
||||
c := cache.TestNewCache(t)
|
||||
repo, err := repository.New(b, repository.Options{})
|
||||
rtest.OK(t, err)
|
||||
repo.UseCache(c)
|
||||
repo.UseCache(c, t.Logf)
|
||||
|
||||
data := rtest.Random(23, 10*KiB)
|
||||
id := restic.Hash(data)
|
||||
|
||||
@@ -161,13 +161,13 @@ func (r *Repository) packSize() uint {
|
||||
}
|
||||
|
||||
// UseCache replaces the backend with the wrapped cache.
|
||||
func (r *Repository) UseCache(c *cache.Cache) {
|
||||
func (r *Repository) UseCache(c *cache.Cache, errorLog func(string, ...interface{})) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
debug.Log("using cache")
|
||||
r.cache = c
|
||||
r.be = c.Wrap(r.be)
|
||||
r.be = c.Wrap(r.be, errorLog)
|
||||
}
|
||||
|
||||
func (r *Repository) Cache() *cache.Cache {
|
||||
|
||||
@@ -207,7 +207,7 @@ func TestLoadBlobBroken(t *testing.T) {
|
||||
|
||||
// setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data
|
||||
c := cache.TestNewCache(t)
|
||||
repo.UseCache(c)
|
||||
repo.UseCache(c, t.Logf)
|
||||
|
||||
data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil)
|
||||
rtest.OK(t, err)
|
||||
@@ -355,7 +355,7 @@ func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) {
|
||||
repodir, cleanup := rtest.Env(t, repoFixture)
|
||||
defer cleanup()
|
||||
|
||||
be, err := local.Open(context.TODO(), local.Config{Path: repodir, Connections: 2})
|
||||
be, err := local.Open(context.TODO(), local.Config{Path: repodir, Connections: 2}, t.Logf)
|
||||
rtest.OK(t, err)
|
||||
repo := repository.TestOpenBackend(t, &damageOnceBackend{Backend: be})
|
||||
|
||||
@@ -446,7 +446,7 @@ func TestListPack(t *testing.T) {
|
||||
|
||||
// setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data
|
||||
c := cache.TestNewCache(t)
|
||||
repo.UseCache(c)
|
||||
repo.UseCache(c, t.Logf)
|
||||
|
||||
// Forcibly cache pack file
|
||||
packID := repo.LookupBlob(restic.TreeBlob, id)[0].PackID
|
||||
|
||||
@@ -91,7 +91,7 @@ func TestRepositoryWithVersion(t testing.TB, version uint) (*Repository, restic.
|
||||
if dir != "" {
|
||||
_, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
lbe, err := local.Create(context.TODO(), local.Config{Path: dir})
|
||||
lbe, err := local.Create(context.TODO(), local.Config{Path: dir}, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating local backend at %v: %v", dir, err)
|
||||
}
|
||||
@@ -115,7 +115,7 @@ func TestFromFixture(t testing.TB, repoFixture string) (*Repository, backend.Bac
|
||||
// TestOpenLocal opens a local repository.
|
||||
func TestOpenLocal(t testing.TB, dir string) (*Repository, backend.Backend) {
|
||||
var be backend.Backend
|
||||
be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2})
|
||||
be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2}, t.Logf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user