Swap deprecated GCS lib with replacement

This commit is contained in:
Ingo Gottwald
2020-10-03 11:03:41 +02:00
parent 7c3c6fa431
commit 8b8e230771
3 changed files with 85 additions and 116 deletions

View File

@@ -3,13 +3,13 @@ package gs
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path"
"strings"
"cloud.google.com/go/storage"
"github.com/pkg/errors"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
@@ -18,8 +18,8 @@ import (
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
storage "google.golang.org/api/storage/v1"
)
// Backend stores data in a GCS bucket.
@@ -30,10 +30,11 @@ import (
// * storage.objects.get
// * storage.objects.list
type Backend struct {
service *storage.Service
gcsClient *storage.Client
projectID string
sem *backend.Semaphore
bucketName string
bucket *storage.BucketHandle
prefix string
listMaxItems int
backend.Layout
@@ -42,7 +43,7 @@ type Backend struct {
// Ensure that *Backend implements restic.Backend.
var _ restic.Backend = &Backend{}
func getStorageService(rt http.RoundTripper) (*storage.Service, error) {
func getStorageClient(rt http.RoundTripper) (*storage.Client, error) {
// create a new HTTP client
httpClient := &http.Client{
Transport: rt,
@@ -59,20 +60,28 @@ func getStorageService(rt http.RoundTripper) (*storage.Service, error) {
})
} else {
var err error
ts, err = google.DefaultTokenSource(ctx, storage.DevstorageReadWriteScope)
ts, err = google.DefaultTokenSource(ctx, storage.ScopeReadWrite)
if err != nil {
return nil, err
}
}
client := oauth2.NewClient(ctx, ts)
oauthClient := oauth2.NewClient(ctx, ts)
service, err := storage.NewService(ctx, option.WithHTTPClient(client))
gcsClient, err := storage.NewClient(ctx, option.WithHTTPClient(oauthClient))
if err != nil {
return nil, err
}
return service, nil
return gcsClient, nil
}
func (be *Backend) bucketExists(ctx context.Context, bucket *storage.BucketHandle) (bool, error) {
_, err := bucket.Attrs(ctx)
if err == storage.ErrBucketNotExist {
return false, nil
}
return err == nil, err
}
const defaultListMaxItems = 1000
@@ -80,9 +89,9 @@ const defaultListMaxItems = 1000
func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
debug.Log("open, config %#v", cfg)
service, err := getStorageService(rt)
gcsClient, err := getStorageClient(rt)
if err != nil {
return nil, errors.Wrap(err, "getStorageService")
return nil, errors.Wrap(err, "getStorageClient")
}
sem, err := backend.NewSemaphore(cfg.Connections)
@@ -91,10 +100,11 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
}
be := &Backend{
service: service,
gcsClient: gcsClient,
projectID: cfg.ProjectID,
sem: sem,
bucketName: cfg.Bucket,
bucket: gcsClient.Bucket(cfg.Bucket),
prefix: cfg.Prefix,
Layout: &backend.DefaultLayout{
Path: cfg.Prefix,
@@ -123,47 +133,19 @@ func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
}
// Try to determine if the bucket exists. If it does not, try to create it.
//
// A Get call has three typical error cases:
//
// * nil: Bucket exists and we have access to the metadata (returned).
//
// * 403: Bucket exists and we do not have access to the metadata. We
// don't have storage.buckets.get permission to the bucket, but we may
// still be able to access objects in the bucket.
//
// * 404: Bucket doesn't exist.
//
// Determining if the bucket is accessible is best-effort because the
// 403 case is ambiguous.
if _, err := be.service.Buckets.Get(be.bucketName).Do(); err != nil {
gerr, ok := err.(*googleapi.Error)
if !ok {
// Don't know what to do with this error.
return nil, errors.Wrap(err, "service.Buckets.Get")
ctx := context.Background()
exists, err := be.bucketExists(ctx, be.bucket)
if err != nil {
return nil, errors.Wrap(err, "service.Buckets.Get")
}
if !exists {
// Bucket doesn't exist, try to create it.
if err := be.bucket.Create(ctx, be.projectID, nil); err != nil {
// Always an error, as the bucket definitely doesn't exist.
return nil, errors.Wrap(err, "service.Buckets.Insert")
}
switch gerr.Code {
case 403:
// Bucket exists, but we don't know if it is
// accessible. Optimistically assume it is; if not,
// future Backend calls will fail.
debug.Log("Unable to determine if bucket %s is accessible (err %v). Continuing as if it is.", be.bucketName, err)
case 404:
// Bucket doesn't exist, try to create it.
bucket := &storage.Bucket{
Name: be.bucketName,
}
if _, err := be.service.Buckets.Insert(be.projectID, bucket).Do(); err != nil {
// Always an error, as the bucket definitely
// doesn't exist.
return nil, errors.Wrap(err, "service.Buckets.Insert")
}
default:
// Don't know what to do with this error.
return nil, errors.Wrap(err, "service.Buckets.Get")
}
}
return be, nil
@@ -245,13 +227,10 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
//
// restic typically writes small blobs (4MB-30MB), so the resumable
// uploads are not providing significant benefit anyways.
cs := googleapi.ChunkSize(0)
info, err := be.service.Objects.Insert(be.bucketName,
&storage.Object{
Name: objName,
Size: uint64(rd.Length()),
}).Media(rd, cs).Do()
w := be.bucket.Object(objName).NewWriter(ctx)
w.ChunkSize = 0
wbytes, err := io.Copy(w, rd)
w.Close()
be.sem.ReleaseToken()
@@ -260,7 +239,7 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
return errors.Wrap(err, "service.Objects.Insert")
}
debug.Log("%v -> %v bytes", objName, info.Size)
debug.Log("%v -> %v bytes", objName, wbytes)
return nil
}
@@ -295,29 +274,23 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
if length < 0 {
return nil, errors.Errorf("invalid length %d", length)
}
if length == 0 {
// negative length indicates read till end to GCS lib
length = -1
}
objName := be.Filename(h)
be.sem.GetToken()
var byteRange string
if length > 0 {
byteRange = fmt.Sprintf("bytes=%d-%d", offset, offset+int64(length-1))
} else {
byteRange = fmt.Sprintf("bytes=%d-", offset)
}
req := be.service.Objects.Get(be.bucketName, objName)
// https://cloud.google.com/storage/docs/json_api/v1/parameters#range
req.Header().Set("Range", byteRange)
res, err := req.Download()
r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length))
if err != nil {
be.sem.ReleaseToken()
return nil, err
}
closeRd := wrapReader{
ReadCloser: res.Body,
ReadCloser: r,
f: func() {
debug.Log("Close()")
be.sem.ReleaseToken()
@@ -334,15 +307,15 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
objName := be.Filename(h)
be.sem.GetToken()
obj, err := be.service.Objects.Get(be.bucketName, objName).Do()
attr, err := be.bucket.Object(objName).Attrs(ctx)
be.sem.ReleaseToken()
if err != nil {
debug.Log("GetObject() err %v", err)
debug.Log("GetObjectAttributes() err %v", err)
return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get")
}
return restic.FileInfo{Size: int64(obj.Size), Name: h.Name}, nil
return restic.FileInfo{Size: attr.Size, Name: h.Name}, nil
}
// Test returns true if a blob of the given type and name exists in the backend.
@@ -351,7 +324,7 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
objName := be.Filename(h)
be.sem.GetToken()
_, err := be.service.Objects.Get(be.bucketName, objName).Do()
_, err := be.bucket.Object(objName).Attrs(ctx)
be.sem.ReleaseToken()
if err == nil {
@@ -366,13 +339,11 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h)
be.sem.GetToken()
err := be.service.Objects.Delete(be.bucketName, objName).Do()
err := be.bucket.Object(objName).Delete(ctx)
be.sem.ReleaseToken()
if er, ok := err.(*googleapi.Error); ok {
if er.Code == 404 {
err = nil
}
if err == storage.ErrObjectNotExist {
err = nil
}
debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
@@ -394,47 +365,36 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
ctx, cancel := context.WithCancel(ctx)
defer cancel()
listReq := be.service.Objects.List(be.bucketName).Context(ctx).Prefix(prefix).MaxResults(int64(be.listMaxItems))
itr := be.bucket.Objects(ctx, &storage.Query{Prefix: prefix})
for {
be.sem.GetToken()
obj, err := listReq.Do()
attrs, err := itr.Next()
be.sem.ReleaseToken()
if err == iterator.Done {
break
}
if err != nil {
return err
}
m := strings.TrimPrefix(attrs.Name, prefix)
if m == "" {
continue
}
fi := restic.FileInfo{
Name: path.Base(m),
Size: int64(attrs.Size),
}
err = fn(fi)
if err != nil {
return err
}
debug.Log("returned %v items", len(obj.Items))
for _, item := range obj.Items {
m := strings.TrimPrefix(item.Name, prefix)
if m == "" {
continue
}
if ctx.Err() != nil {
return ctx.Err()
}
fi := restic.FileInfo{
Name: path.Base(m),
Size: int64(item.Size),
}
err := fn(fi)
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
if ctx.Err() != nil {
return ctx.Err()
}
if obj.NextPageToken == "" {
break
}
listReq.PageToken(obj.NextPageToken)
}
return ctx.Err()