Merge pull request #1439 from armhold/propagate-context

replace ad-hoc context.TODO() with gopts.ctx
This commit is contained in:
Alexander Neumann
2017-12-08 20:27:36 +01:00
32 changed files with 94 additions and 101 deletions

View File

@@ -103,7 +103,7 @@ func (r *Reader) Archive(ctx context.Context, name string, rd io.Reader, p *rest
debug.Log("snapshot saved as %v", id.Str())
err = repo.Flush()
err = repo.Flush(ctx)
if err != nil {
return nil, restic.ID{}, err
}

View File

@@ -764,7 +764,7 @@ func (arch *Archiver) Snapshot(ctx context.Context, p *restic.Progress, paths, t
debug.Log("workers terminated")
// flush repository
err = arch.repo.Flush()
err = arch.repo.Flush(ctx)
if err != nil {
return nil, restic.ID{}, err
}

View File

@@ -144,7 +144,7 @@ func testArchiverDuplication(t *testing.T) {
wg.Wait()
err = repo.Flush()
err = repo.Flush(context.Background())
if err != nil {
t.Fatal(err)
}

View File

@@ -248,7 +248,7 @@ func testParallelSaveWithDuplication(t *testing.T, seed int) {
rtest.OK(t, <-errChan)
}
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
rtest.OK(t, repo.SaveIndex(context.TODO()))
chkr := createAndInitChecker(t, repo)

View File

@@ -41,10 +41,10 @@ func newClient(ctx context.Context, cfg Config, rt http.RoundTripper) (*b2.Clien
}
// Open opens a connection to the B2 service.
func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client, err := newClient(ctx, cfg, rt)
@@ -79,10 +79,10 @@ func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
// Create opens a connection to the B2 service. If the bucket does not exist yet,
// it is created.
func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client, err := newClient(ctx, cfg, rt)
@@ -115,7 +115,7 @@ func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
sem: sem,
}
present, err := be.Test(context.TODO(), restic.Handle{Type: restic.ConfigFile})
present, err := be.Test(ctx, restic.Handle{Type: restic.ConfigFile})
if err != nil {
return nil, err
}

View File

@@ -45,19 +45,19 @@ func newB2TestSuite(t testing.TB) *test.Suite {
// CreateFn is a function that creates a temporary repository for the tests.
Create: func(config interface{}) (restic.Backend, error) {
cfg := config.(b2.Config)
return b2.Create(cfg, tr)
return b2.Create(context.Background(), cfg, tr)
},
// OpenFn is a function that opens a previously created temporary repository.
Open: func(config interface{}) (restic.Backend, error) {
cfg := config.(b2.Config)
return b2.Open(cfg, tr)
return b2.Open(context.Background(), cfg, tr)
},
// CleanupFn removes data created during the tests.
Cleanup: func(config interface{}) error {
cfg := config.(b2.Config)
be, err := b2.Open(cfg, tr)
be, err := b2.Open(context.Background(), cfg, tr)
if err != nil {
return err
}

View File

@@ -33,15 +33,17 @@ func NewRetryBackend(be restic.Backend, maxTries int, report func(string, error,
}
}
func (be *RetryBackend) retry(msg string, f func() error) error {
return backoff.RetryNotify(f,
backoff.WithMaxTries(backoff.NewExponentialBackOff(), uint64(be.MaxTries)),
func (be *RetryBackend) retry(ctx context.Context, msg string, f func() error) error {
err := backoff.RetryNotify(f,
backoff.WithContext(backoff.WithMaxTries(backoff.NewExponentialBackOff(), uint64(be.MaxTries)), ctx),
func(err error, d time.Duration) {
if be.Report != nil {
be.Report(msg, err, d)
}
},
)
return err
}
// Save stores the data in the backend under the given handle.
@@ -60,7 +62,7 @@ func (be *RetryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
return errors.Errorf("reader is not at the beginning (pos %v)", pos)
}
return be.retry(fmt.Sprintf("Save(%v)", h), func() error {
return be.retry(ctx, fmt.Sprintf("Save(%v)", h), func() error {
_, err := seeker.Seek(0, io.SeekStart)
if err != nil {
return err
@@ -87,7 +89,7 @@ func (be *RetryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
// is returned. rd must be closed after use. If an error is returned, the
// ReadCloser must be nil.
func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (rd io.ReadCloser, err error) {
err = be.retry(fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
func() error {
var innerError error
rd, innerError = be.Backend.Load(ctx, h, length, offset)
@@ -99,7 +101,7 @@ func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, o
// Stat returns information about the File identified by h.
func (be *RetryBackend) Stat(ctx context.Context, h restic.Handle) (fi restic.FileInfo, err error) {
err = be.retry(fmt.Sprintf("Stat(%v)", h),
err = be.retry(ctx, fmt.Sprintf("Stat(%v)", h),
func() error {
var innerError error
fi, innerError = be.Backend.Stat(ctx, h)

View File

@@ -89,7 +89,7 @@ func (r *packerManager) insertPacker(p *Packer) {
}
// savePacker stores p in the backend.
func (r *Repository) savePacker(t restic.BlobType, p *Packer) error {
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error {
debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size())
_, err := p.Packer.Finalize()
if err != nil {
@@ -104,7 +104,7 @@ func (r *Repository) savePacker(t restic.BlobType, p *Packer) error {
id := restic.IDFromHash(p.hw.Sum(nil))
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
err = r.be.Save(context.TODO(), h, p.tmpfile)
err = r.be.Save(ctx, h, p.tmpfile)
if err != nil {
debug.Log("Save(%v) error: %v", h, err)
return err

View File

@@ -126,7 +126,7 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
}
}
if err := repo.Flush(); err != nil {
if err := repo.Flush(ctx); err != nil {
return nil, err
}

View File

@@ -55,13 +55,13 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl
}
if rand.Float32() < 0.2 {
if err = repo.Flush(); err != nil {
if err = repo.Flush(context.Background()); err != nil {
t.Fatalf("repo.Flush() returned error %v", err)
}
}
}
if err := repo.Flush(); err != nil {
if err := repo.Flush(context.Background()); err != nil {
t.Fatalf("repo.Flush() returned error %v", err)
}
}

View File

@@ -250,7 +250,7 @@ func (r *Repository) SaveAndEncrypt(ctx context.Context, t restic.BlobType, data
}
// else write the pack to the backend
return *id, r.savePacker(t, packer)
return *id, r.savePacker(ctx, t, packer)
}
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
@@ -289,7 +289,7 @@ func (r *Repository) SaveUnpacked(ctx context.Context, t restic.FileType, p []by
}
// Flush saves all remaining packs.
func (r *Repository) Flush() error {
func (r *Repository) Flush(ctx context.Context) error {
pms := []struct {
t restic.BlobType
pm *packerManager
@@ -303,7 +303,7 @@ func (r *Repository) Flush() error {
debug.Log("manually flushing %d packs", len(p.pm.packers))
for _, packer := range p.pm.packers {
err := r.savePacker(p.t, packer)
err := r.savePacker(ctx, p.t, packer)
if err != nil {
p.pm.pm.Unlock()
return err

View File

@@ -37,7 +37,7 @@ func TestSave(t *testing.T) {
rtest.Equals(t, id, sid)
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
// rtest.OK(t, repo.SaveIndex())
// read back
@@ -72,7 +72,7 @@ func TestSaveFrom(t *testing.T) {
rtest.OK(t, err)
rtest.Equals(t, id, id2)
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
// read back
buf := restic.NewBlobBuffer(size)
@@ -122,7 +122,7 @@ func TestLoadTree(t *testing.T) {
// archive a few files
sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil)
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
_, err := repo.LoadTree(context.TODO(), *sn.Tree)
rtest.OK(t, err)
@@ -138,7 +138,7 @@ func BenchmarkLoadTree(t *testing.B) {
// archive a few files
sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil)
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
t.ResetTimer()
@@ -159,7 +159,7 @@ func TestLoadBlob(t *testing.T) {
id, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{})
rtest.OK(t, err)
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
// first, test with buffers that are too small
for _, testlength := range []int{length - 20, length, restic.CiphertextLength(length) - 1} {
@@ -204,7 +204,7 @@ func BenchmarkLoadBlob(b *testing.B) {
id, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{})
rtest.OK(b, err)
rtest.OK(b, repo.Flush())
rtest.OK(b, repo.Flush(context.Background()))
b.ResetTimer()
b.SetBytes(int64(length))
@@ -352,7 +352,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
// add 3 packs, write intermediate index
for i := 0; i < 3; i++ {
saveRandomDataBlobs(t, repo, 5, 1<<15)
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
}
rtest.OK(t, repo.SaveFullIndex(context.TODO()))
@@ -361,7 +361,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
// add another 5 packs
for i := 0; i < 5; i++ {
saveRandomDataBlobs(t, repo, 5, 1<<15)
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
}
// save final index

View File

@@ -29,7 +29,7 @@ type Repository interface {
List(context.Context, FileType) <-chan ID
ListPack(context.Context, ID) ([]Blob, int64, error)
Flush() error
Flush(context.Context) error
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
SaveJSONUnpacked(context.Context, FileType, interface{}) (ID, error)

View File

@@ -90,7 +90,7 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (rest
treeID := saveDir(t, repo, snapshot.Nodes)
err := repo.Flush()
err := repo.Flush(ctx)
if err != nil {
t.Fatal(err)
}

View File

@@ -189,7 +189,7 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int,
t.Logf("saved snapshot %v", id.Str())
err = repo.Flush()
err = repo.Flush(context.Background())
if err != nil {
t.Fatal(err)
}

View File

@@ -103,7 +103,7 @@ func TestLoadTree(t *testing.T) {
rtest.OK(t, err)
// save packs
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
// load tree again
tree2, err := repo.LoadTree(context.TODO(), id)

View File

@@ -29,7 +29,7 @@ func TestWalkTree(t *testing.T) {
rtest.OK(t, err)
// flush repo, write all packs
rtest.OK(t, repo.Flush())
rtest.OK(t, repo.Flush(context.Background()))
// start tree walker
treeJobs := make(chan walk.TreeJob)