limiter: move to internal/backend

This commit is contained in:
Michael Eischer
2022-06-12 14:38:19 +02:00
parent 163ab9c025
commit a0cef9f247
7 changed files with 2 additions and 2 deletions

View File

@@ -0,0 +1,29 @@
package limiter
import (
"io"
"net/http"
)
// Limiter defines an interface that implementors can use to rate limit I/O
// according to some policy defined and configured by the implementor.
type Limiter interface {
// Upstream returns a rate limited reader that is intended to be used in
// uploads.
Upstream(r io.Reader) io.Reader
// UpstreamWriter returns a rate limited writer that is intended to be used
// in uploads.
UpstreamWriter(w io.Writer) io.Writer
// Downstream returns a rate limited reader that is intended to be used
// for downloads.
Downstream(r io.Reader) io.Reader
// Downstream returns a rate limited reader that is intended to be used
// for downloads.
DownstreamWriter(r io.Writer) io.Writer
// Transport returns an http.RoundTripper limited with the limiter.
Transport(http.RoundTripper) http.RoundTripper
}

View File

@@ -0,0 +1,71 @@
package limiter
import (
"context"
"io"
"github.com/restic/restic/internal/restic"
)
// LimitBackend wraps a Backend and applies rate limiting to Load() and Save()
// calls on the backend.
func LimitBackend(be restic.Backend, l Limiter) restic.Backend {
return rateLimitedBackend{
Backend: be,
limiter: l,
}
}
type rateLimitedBackend struct {
restic.Backend
limiter Limiter
}
func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
limited := limitedRewindReader{
RewindReader: rd,
limited: r.limiter.Upstream(rd),
}
return r.Backend.Save(ctx, h, limited)
}
type limitedRewindReader struct {
restic.RewindReader
limited io.Reader
}
func (l limitedRewindReader) Read(b []byte) (int, error) {
return l.limited.Read(b)
}
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
return consumer(newDownstreamLimitedReader(rd, r.limiter))
})
}
type limitedReader struct {
io.Reader
writerTo io.WriterTo
limiter Limiter
}
func newDownstreamLimitedReader(rd io.Reader, limiter Limiter) io.Reader {
lrd := limiter.Downstream(rd)
if wt, ok := rd.(io.WriterTo); ok {
lrd = &limitedReader{
Reader: lrd,
writerTo: wt,
limiter: limiter,
}
}
return lrd
}
func (l *limitedReader) WriteTo(w io.Writer) (int64, error) {
return l.writerTo.WriteTo(l.limiter.DownstreamWriter(w))
}
var _ restic.Backend = (*rateLimitedBackend)(nil)

View File

@@ -0,0 +1,109 @@
package limiter
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"testing"
"github.com/restic/restic/internal/backend/mock"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func randomBytes(t *testing.T, size int) []byte {
data := make([]byte, size)
_, err := io.ReadFull(rand.Reader, data)
rtest.OK(t, err)
return data
}
func TestLimitBackendSave(t *testing.T) {
testHandle := restic.Handle{Type: restic.PackFile, Name: "test"}
data := randomBytes(t, 1234)
be := mock.NewBackend()
be.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
buf := new(bytes.Buffer)
_, err := io.Copy(buf, rd)
if err != nil {
return nil
}
if !bytes.Equal(data, buf.Bytes()) {
return fmt.Errorf("data mismatch")
}
return nil
}
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
limbe := LimitBackend(be, limiter)
rd := restic.NewByteReader(data, nil)
err := limbe.Save(context.TODO(), testHandle, rd)
rtest.OK(t, err)
}
type tracedReadWriteToCloser struct {
io.Reader
io.WriterTo
Traced bool
}
func newTracedReadWriteToCloser(rd *bytes.Reader) *tracedReadWriteToCloser {
return &tracedReadWriteToCloser{Reader: rd, WriterTo: rd}
}
func (r *tracedReadWriteToCloser) WriteTo(w io.Writer) (n int64, err error) {
r.Traced = true
return r.WriterTo.WriteTo(w)
}
func (r *tracedReadWriteToCloser) Close() error {
return nil
}
func TestLimitBackendLoad(t *testing.T) {
testHandle := restic.Handle{Type: restic.PackFile, Name: "test"}
data := randomBytes(t, 1234)
for _, test := range []struct {
innerWriteTo, outerWriteTo bool
}{{false, false}, {false, true}, {true, false}, {true, true}} {
be := mock.NewBackend()
src := newTracedReadWriteToCloser(bytes.NewReader(data))
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
if length != 0 || offset != 0 {
return nil, fmt.Errorf("Not supported")
}
// test both code paths in WriteTo of limitedReadCloser
if test.innerWriteTo {
return src, nil
}
return newTracedReadCloser(src), nil
}
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
limbe := LimitBackend(be, limiter)
err := limbe.Load(context.TODO(), testHandle, 0, 0, func(rd io.Reader) error {
dataRead := new(bytes.Buffer)
// test both Read and WriteTo
if !test.outerWriteTo {
rd = newTracedReadCloser(rd)
}
_, err := io.Copy(dataRead, rd)
if err != nil {
return err
}
if !bytes.Equal(data, dataRead.Bytes()) {
return fmt.Errorf("read broken data")
}
return nil
})
rtest.OK(t, err)
rtest.Assert(t, src.Traced == (test.innerWriteTo && test.outerWriteTo),
"unexpected/missing writeTo call innerWriteTo %v outerWriteTo %v",
test.innerWriteTo, test.outerWriteTo)
}
}

View File

@@ -0,0 +1,114 @@
package limiter
import (
"io"
"net/http"
"github.com/juju/ratelimit"
)
type staticLimiter struct {
upstream *ratelimit.Bucket
downstream *ratelimit.Bucket
}
// Limits represents static upload and download limits.
// For both, zero means unlimited.
type Limits struct {
UploadKb int
DownloadKb int
}
// NewStaticLimiter constructs a Limiter with a fixed (static) upload and
// download rate cap
func NewStaticLimiter(l Limits) Limiter {
var (
upstreamBucket *ratelimit.Bucket
downstreamBucket *ratelimit.Bucket
)
if l.UploadKb > 0 {
upstreamBucket = ratelimit.NewBucketWithRate(toByteRate(l.UploadKb), int64(toByteRate(l.UploadKb)))
}
if l.DownloadKb > 0 {
downstreamBucket = ratelimit.NewBucketWithRate(toByteRate(l.DownloadKb), int64(toByteRate(l.DownloadKb)))
}
return staticLimiter{
upstream: upstreamBucket,
downstream: downstreamBucket,
}
}
func (l staticLimiter) Upstream(r io.Reader) io.Reader {
return l.limitReader(r, l.upstream)
}
func (l staticLimiter) UpstreamWriter(w io.Writer) io.Writer {
return l.limitWriter(w, l.upstream)
}
func (l staticLimiter) Downstream(r io.Reader) io.Reader {
return l.limitReader(r, l.downstream)
}
func (l staticLimiter) DownstreamWriter(w io.Writer) io.Writer {
return l.limitWriter(w, l.downstream)
}
type roundTripper func(*http.Request) (*http.Response, error)
func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return rt(req)
}
func (l staticLimiter) roundTripper(rt http.RoundTripper, req *http.Request) (*http.Response, error) {
type readCloser struct {
io.Reader
io.Closer
}
if req.Body != nil {
req.Body = &readCloser{
Reader: l.Upstream(req.Body),
Closer: req.Body,
}
}
res, err := rt.RoundTrip(req)
if res != nil && res.Body != nil {
res.Body = &readCloser{
Reader: l.Downstream(res.Body),
Closer: res.Body,
}
}
return res, err
}
// Transport returns an HTTP transport limited with the limiter l.
func (l staticLimiter) Transport(rt http.RoundTripper) http.RoundTripper {
return roundTripper(func(req *http.Request) (*http.Response, error) {
return l.roundTripper(rt, req)
})
}
func (l staticLimiter) limitReader(r io.Reader, b *ratelimit.Bucket) io.Reader {
if b == nil {
return r
}
return ratelimit.Reader(r, b)
}
func (l staticLimiter) limitWriter(w io.Writer, b *ratelimit.Bucket) io.Writer {
if b == nil {
return w
}
return ratelimit.Writer(w, b)
}
func toByteRate(val int) float64 {
return float64(val) * 1024.
}

View File

@@ -0,0 +1,105 @@
package limiter
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"net/http"
"testing"
"github.com/restic/restic/internal/test"
)
func TestLimiterWrapping(t *testing.T) {
reader := bytes.NewReader([]byte{})
writer := new(bytes.Buffer)
for _, limits := range []Limits{
{0, 0},
{42, 0},
{0, 42},
{42, 42},
} {
limiter := NewStaticLimiter(limits)
mustWrapUpstream := limits.UploadKb > 0
test.Equals(t, limiter.Upstream(reader) != reader, mustWrapUpstream)
test.Equals(t, limiter.UpstreamWriter(writer) != writer, mustWrapUpstream)
mustWrapDownstream := limits.DownloadKb > 0
test.Equals(t, limiter.Downstream(reader) != reader, mustWrapDownstream)
test.Equals(t, limiter.DownstreamWriter(writer) != writer, mustWrapDownstream)
}
}
type tracedReadCloser struct {
io.Reader
Closed bool
}
func newTracedReadCloser(rd io.Reader) *tracedReadCloser {
return &tracedReadCloser{Reader: rd}
}
func (r *tracedReadCloser) Close() error {
r.Closed = true
return nil
}
func TestRoundTripperReader(t *testing.T) {
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
data := make([]byte, 1234)
_, err := io.ReadFull(rand.Reader, data)
test.OK(t, err)
send := newTracedReadCloser(bytes.NewReader(data))
var recv *tracedReadCloser
rt := limiter.Transport(roundTripper(func(req *http.Request) (*http.Response, error) {
buf := new(bytes.Buffer)
_, err := io.Copy(buf, req.Body)
if err != nil {
return nil, err
}
err = req.Body.Close()
if err != nil {
return nil, err
}
recv = newTracedReadCloser(bytes.NewReader(buf.Bytes()))
return &http.Response{Body: recv}, nil
}))
res, err := rt.RoundTrip(&http.Request{Body: send})
test.OK(t, err)
out := new(bytes.Buffer)
n, err := io.Copy(out, res.Body)
test.OK(t, err)
test.Equals(t, int64(len(data)), n)
test.OK(t, res.Body.Close())
test.Assert(t, send.Closed, "request body not closed")
test.Assert(t, recv.Closed, "result body not closed")
test.Assert(t, bytes.Equal(data, out.Bytes()), "data ping-pong failed")
}
func TestRoundTripperCornerCases(t *testing.T) {
limiter := NewStaticLimiter(Limits{42 * 1024, 42 * 1024})
rt := limiter.Transport(roundTripper(func(req *http.Request) (*http.Response, error) {
return &http.Response{}, nil
}))
res, err := rt.RoundTrip(&http.Request{})
test.OK(t, err)
test.Assert(t, res != nil, "round tripper returned no response")
rt = limiter.Transport(roundTripper(func(req *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("error")
}))
_, err = rt.RoundTrip(&http.Request{})
test.Assert(t, err != nil, "round tripper lost an error")
}

View File

@@ -16,10 +16,10 @@ import (
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/limiter"
"github.com/restic/restic/internal/backend/rest"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/limiter"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/net/http2"
)