mirror of
https://github.com/restic/restic.git
synced 2025-10-09 07:33:53 +00:00
Merge pull request #3831 from MichaelEischer/move-code
Move code out of the restic package and consolidate backend specific code
This commit is contained in:
@@ -7,8 +7,8 @@ import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/mock"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
29
internal/backend/limiter/limiter.go
Normal file
29
internal/backend/limiter/limiter.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// Limiter defines an interface that implementors can use to rate limit I/O
|
||||
// according to some policy defined and configured by the implementor.
|
||||
type Limiter interface {
|
||||
// Upstream returns a rate limited reader that is intended to be used in
|
||||
// uploads.
|
||||
Upstream(r io.Reader) io.Reader
|
||||
|
||||
// UpstreamWriter returns a rate limited writer that is intended to be used
|
||||
// in uploads.
|
||||
UpstreamWriter(w io.Writer) io.Writer
|
||||
|
||||
// Downstream returns a rate limited reader that is intended to be used
|
||||
// for downloads.
|
||||
Downstream(r io.Reader) io.Reader
|
||||
|
||||
// Downstream returns a rate limited reader that is intended to be used
|
||||
// for downloads.
|
||||
DownstreamWriter(r io.Writer) io.Writer
|
||||
|
||||
// Transport returns an http.RoundTripper limited with the limiter.
|
||||
Transport(http.RoundTripper) http.RoundTripper
|
||||
}
|
71
internal/backend/limiter/limiter_backend.go
Normal file
71
internal/backend/limiter/limiter_backend.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// LimitBackend wraps a Backend and applies rate limiting to Load() and Save()
|
||||
// calls on the backend.
|
||||
func LimitBackend(be restic.Backend, l Limiter) restic.Backend {
|
||||
return rateLimitedBackend{
|
||||
Backend: be,
|
||||
limiter: l,
|
||||
}
|
||||
}
|
||||
|
||||
type rateLimitedBackend struct {
|
||||
restic.Backend
|
||||
limiter Limiter
|
||||
}
|
||||
|
||||
func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
limited := limitedRewindReader{
|
||||
RewindReader: rd,
|
||||
limited: r.limiter.Upstream(rd),
|
||||
}
|
||||
|
||||
return r.Backend.Save(ctx, h, limited)
|
||||
}
|
||||
|
||||
type limitedRewindReader struct {
|
||||
restic.RewindReader
|
||||
|
||||
limited io.Reader
|
||||
}
|
||||
|
||||
func (l limitedRewindReader) Read(b []byte) (int, error) {
|
||||
return l.limited.Read(b)
|
||||
}
|
||||
|
||||
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
|
||||
return consumer(newDownstreamLimitedReader(rd, r.limiter))
|
||||
})
|
||||
}
|
||||
|
||||
type limitedReader struct {
|
||||
io.Reader
|
||||
writerTo io.WriterTo
|
||||
limiter Limiter
|
||||
}
|
||||
|
||||
func newDownstreamLimitedReader(rd io.Reader, limiter Limiter) io.Reader {
|
||||
lrd := limiter.Downstream(rd)
|
||||
if wt, ok := rd.(io.WriterTo); ok {
|
||||
lrd = &limitedReader{
|
||||
Reader: lrd,
|
||||
writerTo: wt,
|
||||
limiter: limiter,
|
||||
}
|
||||
}
|
||||
return lrd
|
||||
}
|
||||
|
||||
func (l *limitedReader) WriteTo(w io.Writer) (int64, error) {
|
||||
return l.writerTo.WriteTo(l.limiter.DownstreamWriter(w))
|
||||
}
|
||||
|
||||
var _ restic.Backend = (*rateLimitedBackend)(nil)
|
109
internal/backend/limiter/limiter_backend_test.go
Normal file
109
internal/backend/limiter/limiter_backend_test.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
func randomBytes(t *testing.T, size int) []byte {
|
||||
data := make([]byte, size)
|
||||
_, err := io.ReadFull(rand.Reader, data)
|
||||
rtest.OK(t, err)
|
||||
return data
|
||||
}
|
||||
|
||||
func TestLimitBackendSave(t *testing.T) {
|
||||
testHandle := restic.Handle{Type: restic.PackFile, Name: "test"}
|
||||
data := randomBytes(t, 1234)
|
||||
|
||||
be := mock.NewBackend()
|
||||
be.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
buf := new(bytes.Buffer)
|
||||
_, err := io.Copy(buf, rd)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if !bytes.Equal(data, buf.Bytes()) {
|
||||
return fmt.Errorf("data mismatch")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
|
||||
limbe := LimitBackend(be, limiter)
|
||||
|
||||
rd := restic.NewByteReader(data, nil)
|
||||
err := limbe.Save(context.TODO(), testHandle, rd)
|
||||
rtest.OK(t, err)
|
||||
}
|
||||
|
||||
type tracedReadWriteToCloser struct {
|
||||
io.Reader
|
||||
io.WriterTo
|
||||
Traced bool
|
||||
}
|
||||
|
||||
func newTracedReadWriteToCloser(rd *bytes.Reader) *tracedReadWriteToCloser {
|
||||
return &tracedReadWriteToCloser{Reader: rd, WriterTo: rd}
|
||||
}
|
||||
|
||||
func (r *tracedReadWriteToCloser) WriteTo(w io.Writer) (n int64, err error) {
|
||||
r.Traced = true
|
||||
return r.WriterTo.WriteTo(w)
|
||||
}
|
||||
|
||||
func (r *tracedReadWriteToCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLimitBackendLoad(t *testing.T) {
|
||||
testHandle := restic.Handle{Type: restic.PackFile, Name: "test"}
|
||||
data := randomBytes(t, 1234)
|
||||
|
||||
for _, test := range []struct {
|
||||
innerWriteTo, outerWriteTo bool
|
||||
}{{false, false}, {false, true}, {true, false}, {true, true}} {
|
||||
be := mock.NewBackend()
|
||||
src := newTracedReadWriteToCloser(bytes.NewReader(data))
|
||||
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
if length != 0 || offset != 0 {
|
||||
return nil, fmt.Errorf("Not supported")
|
||||
}
|
||||
// test both code paths in WriteTo of limitedReadCloser
|
||||
if test.innerWriteTo {
|
||||
return src, nil
|
||||
}
|
||||
return newTracedReadCloser(src), nil
|
||||
}
|
||||
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
|
||||
limbe := LimitBackend(be, limiter)
|
||||
|
||||
err := limbe.Load(context.TODO(), testHandle, 0, 0, func(rd io.Reader) error {
|
||||
dataRead := new(bytes.Buffer)
|
||||
// test both Read and WriteTo
|
||||
if !test.outerWriteTo {
|
||||
rd = newTracedReadCloser(rd)
|
||||
}
|
||||
_, err := io.Copy(dataRead, rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(data, dataRead.Bytes()) {
|
||||
return fmt.Errorf("read broken data")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, src.Traced == (test.innerWriteTo && test.outerWriteTo),
|
||||
"unexpected/missing writeTo call innerWriteTo %v outerWriteTo %v",
|
||||
test.innerWriteTo, test.outerWriteTo)
|
||||
}
|
||||
}
|
114
internal/backend/limiter/static_limiter.go
Normal file
114
internal/backend/limiter/static_limiter.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/juju/ratelimit"
|
||||
)
|
||||
|
||||
type staticLimiter struct {
|
||||
upstream *ratelimit.Bucket
|
||||
downstream *ratelimit.Bucket
|
||||
}
|
||||
|
||||
// Limits represents static upload and download limits.
|
||||
// For both, zero means unlimited.
|
||||
type Limits struct {
|
||||
UploadKb int
|
||||
DownloadKb int
|
||||
}
|
||||
|
||||
// NewStaticLimiter constructs a Limiter with a fixed (static) upload and
|
||||
// download rate cap
|
||||
func NewStaticLimiter(l Limits) Limiter {
|
||||
var (
|
||||
upstreamBucket *ratelimit.Bucket
|
||||
downstreamBucket *ratelimit.Bucket
|
||||
)
|
||||
|
||||
if l.UploadKb > 0 {
|
||||
upstreamBucket = ratelimit.NewBucketWithRate(toByteRate(l.UploadKb), int64(toByteRate(l.UploadKb)))
|
||||
}
|
||||
|
||||
if l.DownloadKb > 0 {
|
||||
downstreamBucket = ratelimit.NewBucketWithRate(toByteRate(l.DownloadKb), int64(toByteRate(l.DownloadKb)))
|
||||
}
|
||||
|
||||
return staticLimiter{
|
||||
upstream: upstreamBucket,
|
||||
downstream: downstreamBucket,
|
||||
}
|
||||
}
|
||||
|
||||
func (l staticLimiter) Upstream(r io.Reader) io.Reader {
|
||||
return l.limitReader(r, l.upstream)
|
||||
}
|
||||
|
||||
func (l staticLimiter) UpstreamWriter(w io.Writer) io.Writer {
|
||||
return l.limitWriter(w, l.upstream)
|
||||
}
|
||||
|
||||
func (l staticLimiter) Downstream(r io.Reader) io.Reader {
|
||||
return l.limitReader(r, l.downstream)
|
||||
}
|
||||
|
||||
func (l staticLimiter) DownstreamWriter(w io.Writer) io.Writer {
|
||||
return l.limitWriter(w, l.downstream)
|
||||
}
|
||||
|
||||
type roundTripper func(*http.Request) (*http.Response, error)
|
||||
|
||||
func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return rt(req)
|
||||
}
|
||||
|
||||
func (l staticLimiter) roundTripper(rt http.RoundTripper, req *http.Request) (*http.Response, error) {
|
||||
type readCloser struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
}
|
||||
|
||||
if req.Body != nil {
|
||||
req.Body = &readCloser{
|
||||
Reader: l.Upstream(req.Body),
|
||||
Closer: req.Body,
|
||||
}
|
||||
}
|
||||
|
||||
res, err := rt.RoundTrip(req)
|
||||
|
||||
if res != nil && res.Body != nil {
|
||||
res.Body = &readCloser{
|
||||
Reader: l.Downstream(res.Body),
|
||||
Closer: res.Body,
|
||||
}
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
// Transport returns an HTTP transport limited with the limiter l.
|
||||
func (l staticLimiter) Transport(rt http.RoundTripper) http.RoundTripper {
|
||||
return roundTripper(func(req *http.Request) (*http.Response, error) {
|
||||
return l.roundTripper(rt, req)
|
||||
})
|
||||
}
|
||||
|
||||
func (l staticLimiter) limitReader(r io.Reader, b *ratelimit.Bucket) io.Reader {
|
||||
if b == nil {
|
||||
return r
|
||||
}
|
||||
return ratelimit.Reader(r, b)
|
||||
}
|
||||
|
||||
func (l staticLimiter) limitWriter(w io.Writer, b *ratelimit.Bucket) io.Writer {
|
||||
if b == nil {
|
||||
return w
|
||||
}
|
||||
return ratelimit.Writer(w, b)
|
||||
}
|
||||
|
||||
func toByteRate(val int) float64 {
|
||||
return float64(val) * 1024.
|
||||
}
|
105
internal/backend/limiter/static_limiter_test.go
Normal file
105
internal/backend/limiter/static_limiter_test.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
func TestLimiterWrapping(t *testing.T) {
|
||||
reader := bytes.NewReader([]byte{})
|
||||
writer := new(bytes.Buffer)
|
||||
|
||||
for _, limits := range []Limits{
|
||||
{0, 0},
|
||||
{42, 0},
|
||||
{0, 42},
|
||||
{42, 42},
|
||||
} {
|
||||
limiter := NewStaticLimiter(limits)
|
||||
|
||||
mustWrapUpstream := limits.UploadKb > 0
|
||||
test.Equals(t, limiter.Upstream(reader) != reader, mustWrapUpstream)
|
||||
test.Equals(t, limiter.UpstreamWriter(writer) != writer, mustWrapUpstream)
|
||||
|
||||
mustWrapDownstream := limits.DownloadKb > 0
|
||||
test.Equals(t, limiter.Downstream(reader) != reader, mustWrapDownstream)
|
||||
test.Equals(t, limiter.DownstreamWriter(writer) != writer, mustWrapDownstream)
|
||||
}
|
||||
}
|
||||
|
||||
type tracedReadCloser struct {
|
||||
io.Reader
|
||||
Closed bool
|
||||
}
|
||||
|
||||
func newTracedReadCloser(rd io.Reader) *tracedReadCloser {
|
||||
return &tracedReadCloser{Reader: rd}
|
||||
}
|
||||
|
||||
func (r *tracedReadCloser) Close() error {
|
||||
r.Closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestRoundTripperReader(t *testing.T) {
|
||||
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
|
||||
data := make([]byte, 1234)
|
||||
_, err := io.ReadFull(rand.Reader, data)
|
||||
test.OK(t, err)
|
||||
|
||||
send := newTracedReadCloser(bytes.NewReader(data))
|
||||
var recv *tracedReadCloser
|
||||
|
||||
rt := limiter.Transport(roundTripper(func(req *http.Request) (*http.Response, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
_, err := io.Copy(buf, req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = req.Body.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
recv = newTracedReadCloser(bytes.NewReader(buf.Bytes()))
|
||||
return &http.Response{Body: recv}, nil
|
||||
}))
|
||||
|
||||
res, err := rt.RoundTrip(&http.Request{Body: send})
|
||||
test.OK(t, err)
|
||||
|
||||
out := new(bytes.Buffer)
|
||||
n, err := io.Copy(out, res.Body)
|
||||
test.OK(t, err)
|
||||
test.Equals(t, int64(len(data)), n)
|
||||
test.OK(t, res.Body.Close())
|
||||
|
||||
test.Assert(t, send.Closed, "request body not closed")
|
||||
test.Assert(t, recv.Closed, "result body not closed")
|
||||
test.Assert(t, bytes.Equal(data, out.Bytes()), "data ping-pong failed")
|
||||
}
|
||||
|
||||
func TestRoundTripperCornerCases(t *testing.T) {
|
||||
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
|
||||
|
||||
rt := limiter.Transport(roundTripper(func(req *http.Request) (*http.Response, error) {
|
||||
return &http.Response{}, nil
|
||||
}))
|
||||
|
||||
res, err := rt.RoundTrip(&http.Request{})
|
||||
test.OK(t, err)
|
||||
test.Assert(t, res != nil, "round tripper returned no response")
|
||||
|
||||
rt = limiter.Transport(roundTripper(func(req *http.Request) (*http.Response, error) {
|
||||
return nil, fmt.Errorf("error")
|
||||
}))
|
||||
|
||||
_, err = rt.RoundTrip(&http.Request{})
|
||||
test.Assert(t, err != nil, "round tripper lost an error")
|
||||
}
|
165
internal/backend/mock/backend.go
Normal file
165
internal/backend/mock/backend.go
Normal file
@@ -0,0 +1,165 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"hash"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// Backend implements a mock backend.
|
||||
type Backend struct {
|
||||
CloseFn func() error
|
||||
IsNotExistFn func(err error) bool
|
||||
SaveFn func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error
|
||||
OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
|
||||
StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error)
|
||||
ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error
|
||||
RemoveFn func(ctx context.Context, h restic.Handle) error
|
||||
TestFn func(ctx context.Context, h restic.Handle) (bool, error)
|
||||
DeleteFn func(ctx context.Context) error
|
||||
ConnectionsFn func() uint
|
||||
LocationFn func() string
|
||||
HasherFn func() hash.Hash
|
||||
HasAtomicReplaceFn func() bool
|
||||
}
|
||||
|
||||
// NewBackend returns new mock Backend instance
|
||||
func NewBackend() *Backend {
|
||||
be := &Backend{}
|
||||
return be
|
||||
}
|
||||
|
||||
// Close the backend.
|
||||
func (m *Backend) Close() error {
|
||||
if m.CloseFn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return m.CloseFn()
|
||||
}
|
||||
|
||||
func (m *Backend) Connections() uint {
|
||||
if m.ConnectionsFn == nil {
|
||||
return 2
|
||||
}
|
||||
|
||||
return m.ConnectionsFn()
|
||||
}
|
||||
|
||||
// Location returns a location string.
|
||||
func (m *Backend) Location() string {
|
||||
if m.LocationFn == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return m.LocationFn()
|
||||
}
|
||||
|
||||
// Hasher may return a hash function for calculating a content hash for the backend
|
||||
func (m *Backend) Hasher() hash.Hash {
|
||||
if m.HasherFn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return m.HasherFn()
|
||||
}
|
||||
|
||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
||||
func (m *Backend) HasAtomicReplace() bool {
|
||||
if m.HasAtomicReplaceFn == nil {
|
||||
return false
|
||||
}
|
||||
return m.HasAtomicReplaceFn()
|
||||
}
|
||||
|
||||
// IsNotExist returns true if the error is caused by a missing file.
|
||||
func (m *Backend) IsNotExist(err error) bool {
|
||||
if m.IsNotExistFn == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return m.IsNotExistFn(err)
|
||||
}
|
||||
|
||||
// Save data in the backend.
|
||||
func (m *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if m.SaveFn == nil {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.SaveFn(ctx, h, rd)
|
||||
}
|
||||
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
rd, err := m.openReader(ctx, h, length, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fn(rd)
|
||||
if err != nil {
|
||||
_ = rd.Close() // ignore secondary errors closing the reader
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
}
|
||||
|
||||
func (m *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
if m.OpenReaderFn == nil {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.OpenReaderFn(ctx, h, length, offset)
|
||||
}
|
||||
|
||||
// Stat an object in the backend.
|
||||
func (m *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
if m.StatFn == nil {
|
||||
return restic.FileInfo{}, errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.StatFn(ctx, h)
|
||||
}
|
||||
|
||||
// List items of type t.
|
||||
func (m *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
if m.ListFn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return m.ListFn(ctx, t, fn)
|
||||
}
|
||||
|
||||
// Remove data from the backend.
|
||||
func (m *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
if m.RemoveFn == nil {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.RemoveFn(ctx, h)
|
||||
}
|
||||
|
||||
// Test for the existence of a specific item.
|
||||
func (m *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
||||
if m.TestFn == nil {
|
||||
return false, errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.TestFn(ctx, h)
|
||||
}
|
||||
|
||||
// Delete all data.
|
||||
func (m *Backend) Delete(ctx context.Context) error {
|
||||
if m.DeleteFn == nil {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.DeleteFn(ctx)
|
||||
}
|
||||
|
||||
// Make sure that Backend implements the backend interface.
|
||||
var _ restic.Backend = &Backend{}
|
@@ -16,10 +16,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/limiter"
|
||||
"github.com/restic/restic/internal/backend/rest"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/limiter"
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
44
internal/backend/readerat.go
Normal file
44
internal/backend/readerat.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
type backendReaderAt struct {
|
||||
ctx context.Context
|
||||
be restic.Backend
|
||||
h restic.Handle
|
||||
}
|
||||
|
||||
func (brd backendReaderAt) ReadAt(p []byte, offset int64) (n int, err error) {
|
||||
return ReadAt(brd.ctx, brd.be, brd.h, offset, p)
|
||||
}
|
||||
|
||||
// ReaderAt returns an io.ReaderAt for a file in the backend. The returned reader
|
||||
// should not escape the caller function to avoid unexpected interactions with the
|
||||
// embedded context
|
||||
func ReaderAt(ctx context.Context, be restic.Backend, h restic.Handle) io.ReaderAt {
|
||||
return backendReaderAt{ctx: ctx, be: be, h: h}
|
||||
}
|
||||
|
||||
// ReadAt reads from the backend handle h at the given position.
|
||||
func ReadAt(ctx context.Context, be restic.Backend, h restic.Handle, offset int64, p []byte) (n int, err error) {
|
||||
debug.Log("ReadAt(%v) at %v, len %v", h, offset, len(p))
|
||||
|
||||
err = be.Load(ctx, h, len(p), offset, func(rd io.Reader) (ierr error) {
|
||||
n, ierr = io.ReadFull(rd, p)
|
||||
|
||||
return ierr
|
||||
})
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "ReadFull(%v)", h)
|
||||
}
|
||||
|
||||
debug.Log("ReadAt(%v) ReadFull returned %v bytes", h, n)
|
||||
return n, nil
|
||||
}
|
@@ -10,8 +10,8 @@ import (
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/mock"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
Reference in New Issue
Block a user