mirror of
https://github.com/restic/restic.git
synced 2025-08-12 21:27:41 +00:00
Merge pull request #1336 from rmdashrf/rate-limit-backup
Add basic rate limiting to backup
This commit is contained in:
17
internal/limiter/limiter.go
Normal file
17
internal/limiter/limiter.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// Limiter defines an interface that implementors can use to rate limit I/O
|
||||
// according to some policy defined and configured by the implementor.
|
||||
type Limiter interface {
|
||||
// Upstream returns a rate limited reader that is intended to be used in
|
||||
// uploads.
|
||||
Upstream(r io.Reader) io.Reader
|
||||
|
||||
// Downstream returns a rate limited reader that is intended to be used
|
||||
// for downloads.
|
||||
Downstream(r io.Reader) io.Reader
|
||||
}
|
53
internal/limiter/limiter_backend.go
Normal file
53
internal/limiter/limiter_backend.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// LimitBackend wraps a Backend and applies rate limiting to Load() and Save()
|
||||
// calls on the backend.
|
||||
func LimitBackend(be restic.Backend, l Limiter) restic.Backend {
|
||||
return rateLimitedBackend{
|
||||
Backend: be,
|
||||
limiter: l,
|
||||
}
|
||||
}
|
||||
|
||||
type rateLimitedBackend struct {
|
||||
restic.Backend
|
||||
limiter Limiter
|
||||
}
|
||||
|
||||
func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error {
|
||||
return r.Backend.Save(ctx, h, r.limiter.Upstream(rd))
|
||||
}
|
||||
|
||||
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
rc, err := r.Backend.Load(ctx, h, length, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return limitedReadCloser{
|
||||
original: rc,
|
||||
limited: r.limiter.Downstream(rc),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type limitedReadCloser struct {
|
||||
original io.ReadCloser
|
||||
limited io.Reader
|
||||
}
|
||||
|
||||
func (l limitedReadCloser) Read(b []byte) (n int, err error) {
|
||||
return l.limited.Read(b)
|
||||
}
|
||||
|
||||
func (l limitedReadCloser) Close() error {
|
||||
return l.original.Close()
|
||||
}
|
||||
|
||||
var _ restic.Backend = (*rateLimitedBackend)(nil)
|
53
internal/limiter/static_limiter.go
Normal file
53
internal/limiter/static_limiter.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/juju/ratelimit"
|
||||
)
|
||||
|
||||
type staticLimiter struct {
|
||||
upstream *ratelimit.Bucket
|
||||
downstream *ratelimit.Bucket
|
||||
}
|
||||
|
||||
// NewStaticLimiter constructs a Limiter with a fixed (static) upload and
|
||||
// download rate cap
|
||||
func NewStaticLimiter(uploadKb, downloadKb int) Limiter {
|
||||
var (
|
||||
upstreamBucket *ratelimit.Bucket
|
||||
downstreamBucket *ratelimit.Bucket
|
||||
)
|
||||
|
||||
if uploadKb > 0 {
|
||||
upstreamBucket = ratelimit.NewBucketWithRate(toByteRate(uploadKb), int64(toByteRate(uploadKb)))
|
||||
}
|
||||
|
||||
if downloadKb > 0 {
|
||||
downstreamBucket = ratelimit.NewBucketWithRate(toByteRate(downloadKb), int64(toByteRate(downloadKb)))
|
||||
}
|
||||
|
||||
return staticLimiter{
|
||||
upstream: upstreamBucket,
|
||||
downstream: downstreamBucket,
|
||||
}
|
||||
}
|
||||
|
||||
func (l staticLimiter) Upstream(r io.Reader) io.Reader {
|
||||
return l.limit(r, l.upstream)
|
||||
}
|
||||
|
||||
func (l staticLimiter) Downstream(r io.Reader) io.Reader {
|
||||
return l.limit(r, l.downstream)
|
||||
}
|
||||
|
||||
func (l staticLimiter) limit(r io.Reader, b *ratelimit.Bucket) io.Reader {
|
||||
if b == nil {
|
||||
return r
|
||||
}
|
||||
return ratelimit.Reader(r, b)
|
||||
}
|
||||
|
||||
func toByteRate(val int) float64 {
|
||||
return float64(val) * 1024.
|
||||
}
|
Reference in New Issue
Block a user