349 lines
7.7 KiB
Go
Raw Normal View History

2018-03-13 22:30:51 +01:00
package rclone
import (
"bufio"
2018-03-13 22:30:51 +01:00
"context"
"crypto/tls"
"fmt"
2018-05-22 20:48:17 +02:00
"io"
"math/rand"
2018-03-13 22:30:51 +01:00
"net"
"net/http"
"net/url"
"os"
"os/exec"
"sync"
"syscall"
2018-03-13 22:30:51 +01:00
"time"
"github.com/cenkalti/backoff/v4"
2018-03-13 22:30:51 +01:00
"github.com/restic/restic/internal/backend"
2022-06-12 14:38:19 +02:00
"github.com/restic/restic/internal/backend/limiter"
"github.com/restic/restic/internal/backend/location"
2018-03-13 22:30:51 +01:00
"github.com/restic/restic/internal/backend/rest"
"github.com/restic/restic/internal/backend/util"
2018-03-13 22:30:51 +01:00
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"golang.org/x/net/http2"
)
// Backend is used to access data stored somewhere via rclone.
type Backend struct {
*rest.Backend
tr *http2.Transport
cmd *exec.Cmd
waitCh <-chan struct{}
waitResult error
wg *sync.WaitGroup
conn *StdioConn
2018-03-13 22:30:51 +01:00
}
func NewFactory() location.Factory {
return location.NewLimitedBackendFactory("rclone", ParseConfig, location.NoPassword, Create, Open)
}
2018-03-13 22:30:51 +01:00
// run starts command with args and initializes the StdioConn.
func run(command string, args ...string) (*StdioConn, *sync.WaitGroup, chan struct{}, func() error, error) {
2018-03-13 22:30:51 +01:00
cmd := exec.Command(command, args...)
p, err := cmd.StderrPipe()
if err != nil {
return nil, nil, nil, nil, err
}
var wg sync.WaitGroup
waitCh := make(chan struct{})
// start goroutine to add a prefix to all messages printed by to stderr by rclone
wg.Add(1)
go func() {
defer wg.Done()
defer close(waitCh)
sc := bufio.NewScanner(p)
for sc.Scan() {
fmt.Fprintf(os.Stderr, "rclone: %v\n", sc.Text())
}
debug.Log("command has exited, closing waitCh")
}()
2018-03-13 22:30:51 +01:00
r, stdin, err := os.Pipe()
if err != nil {
return nil, nil, nil, nil, err
2018-03-13 22:30:51 +01:00
}
stdout, w, err := os.Pipe()
if err != nil {
2021-01-30 16:46:34 +01:00
// close first pipe and ignore subsequent errors
_ = r.Close()
_ = stdin.Close()
return nil, nil, nil, nil, err
2018-03-13 22:30:51 +01:00
}
cmd.Stdin = r
cmd.Stdout = w
bg, err := util.StartForeground(cmd)
// close rclone side of pipes
errR := r.Close()
errW := w.Close()
// return first error
if err == nil {
err = errR
}
if err == nil {
err = errW
}
2018-03-13 22:30:51 +01:00
if err != nil {
2024-07-26 19:07:14 +02:00
if errors.Is(err, exec.ErrDot) {
return nil, nil, nil, nil, errors.Errorf("cannot implicitly run relative executable %v found in current directory, use -o rclone.program=./<program> to override", cmd.Path)
}
return nil, nil, nil, nil, err
2018-03-13 22:30:51 +01:00
}
c := &StdioConn{
receive: stdout,
send: stdin,
cmd: cmd,
2018-03-13 22:30:51 +01:00
}
return c, &wg, waitCh, bg, nil
2018-03-13 22:30:51 +01:00
}
2018-05-22 20:48:17 +02:00
// wrappedConn adds bandwidth limiting capabilities to the StdioConn by
// wrapping the Read/Write methods.
type wrappedConn struct {
*StdioConn
io.Reader
io.Writer
}
func (c *wrappedConn) Read(p []byte) (int, error) {
2018-05-22 20:48:17 +02:00
return c.Reader.Read(p)
}
func (c *wrappedConn) Write(p []byte) (int, error) {
2018-05-22 20:48:17 +02:00
return c.Writer.Write(p)
}
func wrapConn(c *StdioConn, lim limiter.Limiter) *wrappedConn {
wc := &wrappedConn{
2018-05-22 20:48:17 +02:00
StdioConn: c,
Reader: c,
Writer: c,
}
if lim != nil {
wc.Reader = lim.Downstream(c)
wc.Writer = lim.UpstreamWriter(c)
}
return wc
}
2018-03-13 22:30:51 +01:00
// New initializes a Backend and starts the process.
func newBackend(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
2018-03-13 22:30:51 +01:00
var (
args []string
err error
)
// build program args, start with the program
if cfg.Program != "" {
a, err := backend.SplitShellStrings(cfg.Program)
if err != nil {
return nil, err
}
args = append(args, a...)
}
// then add the arguments
if cfg.Args != "" {
a, err := backend.SplitShellStrings(cfg.Args)
if err != nil {
return nil, err
}
args = append(args, a...)
}
// finally, add the remote
args = append(args, cfg.Remote)
arg0, args := args[0], args[1:]
debug.Log("running command: %v %v", arg0, args)
stdioConn, wg, waitCh, bg, err := run(arg0, args...)
2018-03-13 22:30:51 +01:00
if err != nil {
return nil, err
}
2018-05-22 20:48:17 +02:00
var conn net.Conn = stdioConn
if lim != nil {
conn = wrapConn(stdioConn, lim)
}
dialCount := 0
2018-03-13 22:30:51 +01:00
tr := &http2.Transport{
AllowHTTP: true, // this is not really HTTP, just stdin/stdout
DialTLSContext: func(_ context.Context, network, address string, _ *tls.Config) (net.Conn, error) {
2018-03-13 22:30:51 +01:00
debug.Log("new connection requested, %v %v", network, address)
if dialCount > 0 {
// the connection to the child process is already closed
return nil, backoff.Permanent(errors.New("rclone stdio connection already closed"))
}
dialCount++
2018-03-13 22:30:51 +01:00
return conn, nil
},
}
cmd := stdioConn.cmd
2018-03-13 22:30:51 +01:00
be := &Backend{
tr: tr,
cmd: cmd,
waitCh: waitCh,
2018-05-22 20:48:17 +02:00
conn: stdioConn,
wg: wg,
2018-03-13 22:30:51 +01:00
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg.Add(1)
2018-03-13 22:30:51 +01:00
go func() {
defer wg.Done()
<-waitCh
cancel()
// according to the documentation of StdErrPipe, Wait() must only be called after the former has completed
2018-03-13 22:30:51 +01:00
err := cmd.Wait()
debug.Log("Wait returned %v", err)
be.waitResult = err
2021-01-30 16:46:34 +01:00
// close our side of the pipes to rclone, ignore errors
_ = stdioConn.CloseAll()
2018-03-13 22:30:51 +01:00
}()
// send an HTTP request to the base URL, see if the server is there
client := http.Client{
2018-10-21 19:58:40 +02:00
Transport: debug.RoundTripper(tr),
2021-11-07 17:47:18 +01:00
Timeout: cfg.Timeout,
2018-03-13 22:30:51 +01:00
}
// request a random file which does not exist. we just want to test when
// rclone is able to accept HTTP requests.
url := fmt.Sprintf("http://localhost/file-%d", rand.Uint64())
2021-01-30 20:43:53 +01:00
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
2018-03-13 22:30:51 +01:00
if err != nil {
return nil, err
}
req.Header.Set("Accept", rest.ContentTypeV2)
res, err := client.Do(req)
2018-03-13 22:30:51 +01:00
if err != nil {
2021-01-30 16:46:34 +01:00
// ignore subsequent errors
_ = bg()
2018-03-13 22:30:51 +01:00
_ = cmd.Process.Kill()
// wait for rclone to exit
wg.Wait()
// try to return the program exit code if communication with rclone has failed
if be.waitResult != nil && (errors.Is(err, context.Canceled) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) || errors.Is(err, os.ErrClosed)) {
err = be.waitResult
}
return nil, fmt.Errorf("error talking HTTP to rclone: %w", err)
2018-03-13 22:30:51 +01:00
}
2024-01-06 15:38:57 +01:00
_ = res.Body.Close()
2018-03-13 22:30:51 +01:00
debug.Log("HTTP status %q returned, moving instance to background", res.Status)
2021-01-30 16:46:34 +01:00
err = bg()
if err != nil {
return nil, fmt.Errorf("error moving process to background: %w", err)
}
2018-03-13 22:30:51 +01:00
return be, nil
}
// Open starts an rclone process with the given config.
func Open(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
be, err := newBackend(ctx, cfg, lim)
2018-03-13 22:30:51 +01:00
if err != nil {
return nil, err
}
url, err := url.Parse("http://localhost/")
if err != nil {
return nil, err
}
restConfig := rest.Config{
Connections: cfg.Connections,
2018-03-13 22:30:51 +01:00
URL: url,
}
restBackend, err := rest.Open(ctx, restConfig, debug.RoundTripper(be.tr))
2018-03-13 22:30:51 +01:00
if err != nil {
_ = be.Close()
2018-03-13 22:30:51 +01:00
return nil, err
}
be.Backend = restBackend
return be, nil
}
// Create initializes a new restic repo with rclone.
func Create(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
be, err := newBackend(ctx, cfg, lim)
2018-03-13 22:30:51 +01:00
if err != nil {
return nil, err
}
debug.Log("new backend created")
url, err := url.Parse("http://localhost/")
if err != nil {
return nil, err
}
restConfig := rest.Config{
Connections: cfg.Connections,
2018-03-13 22:30:51 +01:00
URL: url,
}
restBackend, err := rest.Create(ctx, restConfig, debug.RoundTripper(be.tr))
2018-03-13 22:30:51 +01:00
if err != nil {
_ = be.Close()
2018-03-13 22:30:51 +01:00
return nil, err
}
be.Backend = restBackend
return be, nil
}
const waitForExit = 5 * time.Second
2018-03-13 22:30:51 +01:00
// Close terminates the backend.
func (be *Backend) Close() error {
2018-03-15 19:00:25 +01:00
debug.Log("exiting rclone")
2018-03-13 22:30:51 +01:00
be.tr.CloseIdleConnections()
select {
case <-be.waitCh:
debug.Log("rclone exited")
case <-time.After(waitForExit):
debug.Log("timeout, closing file descriptors")
err := be.conn.CloseAll()
if err != nil {
return err
}
}
be.wg.Wait()
2018-03-13 22:30:51 +01:00
debug.Log("wait for rclone returned: %v", be.waitResult)
return be.waitResult
}
feat(backends/s3): add warmup support before repacks and restores (#5173) * feat(backends/s3): add warmup support before repacks and restores This commit introduces basic support for transitioning pack files stored in cold storage to hot storage on S3 and S3-compatible providers. To prevent unexpected behavior for existing users, the feature is gated behind new flags: - `s3.enable-restore`: opt-in flag (defaults to false) - `s3.restore-days`: number of days for the restored objects to remain in hot storage (defaults to `7`) - `s3.restore-timeout`: maximum time to wait for a single restoration (default to `1 day`) - `s3.restore-tier`: retrieval tier at which the restore will be processed. (default to `Standard`) As restoration times can be lengthy, this implementation preemptively restores selected packs to prevent incessant restore-delays during downloads. This is slightly sub-optimal as we could process packs out-of-order (as soon as they're transitioned), but this would really add too much complexity for a marginal gain in speed. To maintain simplicity and prevent resources exhautions with lots of packs, no new concurrency mechanisms or goroutines were added. This just hooks gracefully into the existing routines. **Limitations:** - Tests against the backend were not written due to the lack of cold storage class support in MinIO. Testing was done manually on Scaleway's S3-compatible object storage. If necessary, we could explore testing with LocalStack or mocks, though this requires further discussion. - Currently, this feature only warms up before restores and repacks (prune/copy), as those are the two main use-cases I came across. Support for other commands may be added in future iterations, as long as affected packs can be calculated in advance. - The feature is gated behind a new alpha `s3-restore` feature flag to make it explicit that the feature is still wet behind the ears. - There is no explicit user notification for ongoing pack restorations. While I think it is not necessary because of the opt-in flag, showing some notice may improve usability (but would probably require major refactoring in the progress bar which I didn't want to start). Another possibility would be to add a flag to send restores requests and fail early. See https://github.com/restic/restic/issues/3202 * ui: warn user when files are warming up from cold storage * refactor: remove the PacksWarmer struct It's easier to handle multiple handles in the backend directly, and it may open the door to reducing the number of requests made to the backend in the future.
2025-02-01 19:26:27 +01:00
func (be *Backend) Properties() backend.Properties {
properties := be.Backend.Properties()
properties.HasFlakyErrors = true
return properties
}