Basic rate limiting implementation.

Added `--limit-upload` and `--limit-download` flags to rate limit
backups and restores.
This commit is contained in:
rmdashrf
2017-10-08 11:28:03 -07:00
parent 8ceb22fe8a
commit 32637a0328
12 changed files with 1179 additions and 4 deletions

View File

@@ -0,0 +1,17 @@
package limiter
import (
"io"
)
// Limiter defines an interface that implementors can use to rate limit I/O
// according to some policy defined and configured by the implementor.
type Limiter interface {
// Upstream returns a rate limited reader that is intended to be used in
// uploads.
Upstream(r io.Reader) io.Reader
// Downstream returns a rate limited reader that is intended to be used
// for downloads.
Downstream(r io.Reader) io.Reader
}

View File

@@ -0,0 +1,53 @@
package limiter
import (
"context"
"io"
"github.com/restic/restic/internal/restic"
)
// LimitBackend wraps a Backend and applies rate limiting to Load() and Save()
// calls on the backend.
func LimitBackend(be restic.Backend, l Limiter) restic.Backend {
return rateLimitedBackend{
Backend: be,
limiter: l,
}
}
type rateLimitedBackend struct {
restic.Backend
limiter Limiter
}
func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error {
return r.Backend.Save(ctx, h, r.limiter.Upstream(rd))
}
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
rc, err := r.Backend.Load(ctx, h, length, offset)
if err != nil {
return nil, err
}
return limitedReadCloser{
original: rc,
limited: r.limiter.Downstream(rc),
}, nil
}
type limitedReadCloser struct {
original io.ReadCloser
limited io.Reader
}
func (l limitedReadCloser) Read(b []byte) (n int, err error) {
return l.limited.Read(b)
}
func (l limitedReadCloser) Close() error {
return l.original.Close()
}
var _ restic.Backend = (*rateLimitedBackend)(nil)

View File

@@ -0,0 +1,53 @@
package limiter
import (
"io"
"github.com/juju/ratelimit"
)
type staticLimiter struct {
upstream *ratelimit.Bucket
downstream *ratelimit.Bucket
}
// NewStaticLimiter constructs a Limiter with a fixed (static) upload and
// download rate cap
func NewStaticLimiter(uploadKb, downloadKb int) Limiter {
var (
upstreamBucket *ratelimit.Bucket
downstreamBucket *ratelimit.Bucket
)
if uploadKb > 0 {
upstreamBucket = ratelimit.NewBucketWithRate(toByteRate(uploadKb), int64(toByteRate(uploadKb)))
}
if downloadKb > 0 {
downstreamBucket = ratelimit.NewBucketWithRate(toByteRate(downloadKb), int64(toByteRate(downloadKb)))
}
return staticLimiter{
upstream: upstreamBucket,
downstream: downstreamBucket,
}
}
func (l staticLimiter) Upstream(r io.Reader) io.Reader {
return l.limit(r, l.upstream)
}
func (l staticLimiter) Downstream(r io.Reader) io.Reader {
return l.limit(r, l.downstream)
}
func (l staticLimiter) limit(r io.Reader, b *ratelimit.Bucket) io.Reader {
if b == nil {
return r
}
return ratelimit.Reader(r, b)
}
func toByteRate(val int) float64 {
return float64(val) * 1024.
}