mirror of
https://github.com/restic/restic.git
synced 2025-12-10 21:41:46 +00:00
azure: remove saveSmall, use only PutBlob API
This commit is contained in:
@@ -41,8 +41,8 @@ type Backend struct {
|
|||||||
accessTier blob.AccessTier
|
accessTier blob.AccessTier
|
||||||
}
|
}
|
||||||
|
|
||||||
const saveLargeSize = 256 * 1024 * 1024
|
const singleBlobMaxSize = 5000 * 1024 * 1024 // 5000 MiB - max size for Put Blob API in service version 2019-12-12+
|
||||||
const singleBlobMaxSize = 5000 * 1024 * 1024 // 5000 MiB - max size for Put Blob API in service version 2019-12-12+
|
const singleBlockMaxSize = 4000 * 1024 * 1024 // 4000 MiB - max size for StageBlock API in service version 2019-12-12+
|
||||||
const defaultListMaxItems = 5000
|
const defaultListMaxItems = 5000
|
||||||
|
|
||||||
// make sure that *Backend implements backend.Backend
|
// make sure that *Backend implements backend.Backend
|
||||||
@@ -55,11 +55,6 @@ func NewFactory() location.Factory {
|
|||||||
func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||||
debug.Log("open, config %#v", cfg)
|
debug.Log("open, config %#v", cfg)
|
||||||
|
|
||||||
// Validate configuration
|
|
||||||
if err := cfg.Validate(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var client *azContainer.Client
|
var client *azContainer.Client
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@@ -262,46 +257,25 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind
|
|||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
uploadMethod := strings.ToLower(be.cfg.UploadMethod)
|
|
||||||
fileSize := rd.Length()
|
fileSize := rd.Length()
|
||||||
|
|
||||||
switch uploadMethod {
|
// If the file size is less than or equal to the max size for a single blob, use the single blob upload
|
||||||
case "single":
|
// otherwise, use the block-based upload
|
||||||
// Always use single blob upload
|
if fileSize <= singleBlobMaxSize {
|
||||||
if fileSize > singleBlobMaxSize {
|
|
||||||
return errors.Errorf("file size %d exceeds single blob limit of %d MiB", fileSize, singleBlobMaxSize/1024/1024)
|
|
||||||
}
|
|
||||||
err = be.saveSingleBlob(ctx, objName, rd, accessTier)
|
err = be.saveSingleBlob(ctx, objName, rd, accessTier)
|
||||||
|
} else {
|
||||||
case "blocks":
|
err = be.saveLarge(ctx, objName, rd, accessTier)
|
||||||
// Legacy block-based upload method
|
|
||||||
if fileSize < saveLargeSize {
|
|
||||||
err = be.saveSmall(ctx, objName, rd, accessTier)
|
|
||||||
} else {
|
|
||||||
err = be.saveLarge(ctx, objName, rd, accessTier)
|
|
||||||
}
|
|
||||||
|
|
||||||
case "auto", "":
|
|
||||||
// Automatic selection: use single blob for files <= 5000 MiB, blocks for larger files
|
|
||||||
if fileSize <= singleBlobMaxSize {
|
|
||||||
err = be.saveSingleBlob(ctx, objName, rd, accessTier)
|
|
||||||
} else {
|
|
||||||
err = be.saveLarge(ctx, objName, rd, accessTier)
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
return errors.Errorf("invalid upload method %q, must be 'auto', 'single', or 'blocks'", uploadMethod)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *Backend) saveSmall(ctx context.Context, objName string, rd backend.RewindReader, accessTier blob.AccessTier) error {
|
// saveSingleBlob uploads data using a single Put Blob operation.
|
||||||
|
// This method is more efficient for files under 5000 MiB as it requires only one API call
|
||||||
|
// instead of the two calls (StageBlock + CommitBlockList) required by the block-based approach.
|
||||||
|
func (be *Backend) saveSingleBlob(ctx context.Context, objName string, rd backend.RewindReader, accessTier blob.AccessTier) error {
|
||||||
blockBlobClient := be.container.NewBlockBlobClient(objName)
|
blockBlobClient := be.container.NewBlockBlobClient(objName)
|
||||||
|
|
||||||
// upload it as a new "block", use the base64 hash for the ID
|
|
||||||
id := base64.StdEncoding.EncodeToString(rd.Hash())
|
|
||||||
|
|
||||||
buf := make([]byte, rd.Length())
|
buf := make([]byte, rd.Length())
|
||||||
_, err := io.ReadFull(rd, buf)
|
_, err := io.ReadFull(rd, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -309,24 +283,20 @@ func (be *Backend) saveSmall(ctx context.Context, objName string, rd backend.Rew
|
|||||||
}
|
}
|
||||||
|
|
||||||
reader := bytes.NewReader(buf)
|
reader := bytes.NewReader(buf)
|
||||||
_, err = blockBlobClient.StageBlock(ctx, id, streaming.NopCloser(reader), &blockblob.StageBlockOptions{
|
opts := &blockblob.UploadOptions{
|
||||||
|
Tier: &accessTier,
|
||||||
TransactionalValidation: blob.TransferValidationTypeMD5(rd.Hash()),
|
TransactionalValidation: blob.TransferValidationTypeMD5(rd.Hash()),
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "StageBlock")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
blocks := []string{id}
|
debug.Log("Upload single blob %v with %d bytes", objName, len(buf))
|
||||||
_, err = blockBlobClient.CommitBlockList(ctx, blocks, &blockblob.CommitBlockListOptions{
|
_, err = blockBlobClient.Upload(ctx, streaming.NopCloser(reader), opts)
|
||||||
Tier: &accessTier,
|
return errors.Wrap(err, "Upload")
|
||||||
})
|
|
||||||
return errors.Wrap(err, "CommitBlockList")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *Backend) saveLarge(ctx context.Context, objName string, rd backend.RewindReader, accessTier blob.AccessTier) error {
|
func (be *Backend) saveLarge(ctx context.Context, objName string, rd backend.RewindReader, accessTier blob.AccessTier) error {
|
||||||
blockBlobClient := be.container.NewBlockBlobClient(objName)
|
blockBlobClient := be.container.NewBlockBlobClient(objName)
|
||||||
|
|
||||||
buf := make([]byte, 100*1024*1024)
|
buf := make([]byte, singleBlockMaxSize)
|
||||||
blocks := []string{}
|
blocks := []string{}
|
||||||
uploadedBytes := 0
|
uploadedBytes := 0
|
||||||
|
|
||||||
@@ -378,29 +348,6 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd backend.Rew
|
|||||||
return errors.Wrap(err, "CommitBlockList")
|
return errors.Wrap(err, "CommitBlockList")
|
||||||
}
|
}
|
||||||
|
|
||||||
// saveSingleBlob uploads data using a single Put Blob operation.
|
|
||||||
// This method is more efficient for files under 5000 MiB as it requires only one API call
|
|
||||||
// instead of the two calls (StageBlock + CommitBlockList) required by the block-based approach.
|
|
||||||
func (be *Backend) saveSingleBlob(ctx context.Context, objName string, rd backend.RewindReader, accessTier blob.AccessTier) error {
|
|
||||||
blockBlobClient := be.container.NewBlockBlobClient(objName)
|
|
||||||
|
|
||||||
buf := make([]byte, rd.Length())
|
|
||||||
_, err := io.ReadFull(rd, buf)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "ReadFull")
|
|
||||||
}
|
|
||||||
|
|
||||||
reader := bytes.NewReader(buf)
|
|
||||||
opts := &blockblob.UploadOptions{
|
|
||||||
Tier: &accessTier,
|
|
||||||
TransactionalValidation: blob.TransferValidationTypeMD5(rd.Hash()),
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("Upload single blob %v with %d bytes", objName, len(buf))
|
|
||||||
_, err = blockBlobClient.Upload(ctx, streaming.NopCloser(reader), opts)
|
|
||||||
return errors.Wrap(err, "Upload")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
|
|||||||
@@ -22,16 +22,14 @@ type Config struct {
|
|||||||
Container string
|
Container string
|
||||||
Prefix string
|
Prefix string
|
||||||
|
|
||||||
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
|
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
|
||||||
AccessTier string `option:"access-tier" help:"set the access tier for the blob storage (default: inferred from the storage account defaults)"`
|
AccessTier string `option:"access-tier" help:"set the access tier for the blob storage (default: inferred from the storage account defaults)"`
|
||||||
UploadMethod string `option:"upload-method" help:"blob upload method: 'auto' (single blob for <=5000 MiB), 'single' (always single blob), or 'blocks' (legacy block-based) (default: auto)"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig returns a new Config with the default values filled in.
|
// NewConfig returns a new Config with the default values filled in.
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
Connections: 5,
|
Connections: 5,
|
||||||
UploadMethod: "auto",
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,16 +85,3 @@ func (cfg *Config) ApplyEnvironment(prefix string) {
|
|||||||
cfg.EndpointSuffix = os.Getenv(prefix + "AZURE_ENDPOINT_SUFFIX")
|
cfg.EndpointSuffix = os.Getenv(prefix + "AZURE_ENDPOINT_SUFFIX")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate checks the configuration for errors.
|
|
||||||
func (cfg *Config) Validate() error {
|
|
||||||
// Normalize upload method to lowercase
|
|
||||||
uploadMethod := strings.ToLower(cfg.UploadMethod)
|
|
||||||
if uploadMethod != "auto" && uploadMethod != "single" && uploadMethod != "blocks" && uploadMethod != "" {
|
|
||||||
return errors.Errorf("invalid upload method %q, must be 'auto', 'single', or 'blocks'", cfg.UploadMethod)
|
|
||||||
}
|
|
||||||
if uploadMethod != "" {
|
|
||||||
cfg.UploadMethod = uploadMethod
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user