Merge pull request #3704 from MichaelEischer/compression-migrations

Support migration to repository format with compression
This commit is contained in:
Alexander Neumann
2022-05-29 15:52:21 +02:00
committed by GitHub
26 changed files with 439 additions and 38 deletions

View File

@@ -125,6 +125,11 @@ func (be *Backend) Hasher() hash.Hash {
return md5.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string {
return be.prefix

View File

@@ -147,6 +147,11 @@ func (be *b2Backend) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *b2Backend) HasAtomicReplace() bool {
return true
}
// IsNotExist returns true if the error is caused by a non-existing file.
func (be *b2Backend) IsNotExist(err error) bool {
return b2.IsNotExist(errors.Cause(err))

View File

@@ -67,6 +67,10 @@ func (be *Backend) Hasher() hash.Hash {
return be.b.Hasher()
}
func (be *Backend) HasAtomicReplace() bool {
return be.b.HasAtomicReplace()
}
func (be *Backend) IsNotExist(err error) bool {
return be.b.IsNotExist(err)
}

View File

@@ -201,6 +201,11 @@ func (be *Backend) Hasher() hash.Hash {
return md5.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string {
return be.prefix

View File

@@ -102,6 +102,11 @@ func (b *Local) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (b *Local) HasAtomicReplace() bool {
return true
}
// IsNotExist returns true if the error is caused by a non existing file.
func (b *Local) IsNotExist(err error) bool {
return errors.Is(err, os.ErrNotExist)

View File

@@ -268,6 +268,11 @@ func (be *MemoryBackend) Hasher() hash.Hash {
return md5.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *MemoryBackend) HasAtomicReplace() bool {
return false
}
// Delete removes all data in the backend.
func (be *MemoryBackend) Delete(ctx context.Context) error {
be.m.Lock()

View File

@@ -121,6 +121,12 @@ func (b *Backend) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (b *Backend) HasAtomicReplace() bool {
// rest-server prevents overwriting
return false
}
// Save stores data in the backend at the handle.
func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {

View File

@@ -269,6 +269,11 @@ func (be *Backend) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string {
return be.cfg.Prefix

View File

@@ -267,6 +267,12 @@ func (r *SFTP) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (r *SFTP) HasAtomicReplace() bool {
// we use sftp's 'Rename()' in 'Save()' which does not allow overwriting
return false
}
// Join joins the given paths and cleans them afterwards. This always uses
// forward slashes, which is required by sftp.
func Join(parts ...string) string {

View File

@@ -129,6 +129,11 @@ func (be *beSwift) Hasher() hash.Hash {
return md5.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *beSwift) HasAtomicReplace() bool {
return true
}
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {

View File

@@ -63,7 +63,7 @@ type ErrDuplicatePacks struct {
Indexes restic.IDSet
}
func (e ErrDuplicatePacks) Error() string {
func (e *ErrDuplicatePacks) Error() string {
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes)
}
@@ -73,7 +73,7 @@ type ErrOldIndexFormat struct {
restic.ID
}
func (err ErrOldIndexFormat) Error() string {
func (err *ErrOldIndexFormat) Error() string {
return fmt.Sprintf("index %v has old format", err.ID.Str())
}
@@ -93,7 +93,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
if oldFormat {
debug.Log("index %v has old format", id.Str())
hints = append(hints, ErrOldIndexFormat{id})
hints = append(hints, &ErrOldIndexFormat{id})
}
err = errors.Wrapf(err, "error loading index %v", id.Str())
@@ -137,7 +137,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
for packID := range c.packs {
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
if len(packToIndex[packID]) > 1 {
hints = append(hints, ErrDuplicatePacks{
hints = append(hints, &ErrDuplicatePacks{
PackID: packID,
Indexes: packToIndex[packID],
})
@@ -257,7 +257,7 @@ type TreeError struct {
Errors []error
}
func (e TreeError) Error() string {
func (e *TreeError) Error() string {
return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors)
}
@@ -276,7 +276,7 @@ func (c *Checker) checkTreeWorker(ctx context.Context, trees <-chan restic.TreeI
if len(errs) == 0 {
continue
}
treeError := TreeError{ID: job.ID, Errors: errs}
treeError := &TreeError{ID: job.ID, Errors: errs}
select {
case <-ctx.Done():
return

View File

@@ -289,7 +289,7 @@ func TestDuplicatePacksInIndex(t *testing.T) {
found := false
for _, hint := range hints {
if _, ok := hint.(checker.ErrDuplicatePacks); ok {
if _, ok := hint.(*checker.ErrDuplicatePacks); ok {
found = true
} else {
t.Errorf("got unexpected hint: %v", hint)

View File

@@ -6,11 +6,16 @@ import (
"github.com/restic/restic/internal/restic"
)
type RepositoryCheckOptions struct {
}
// Migration implements a data migration.
type Migration interface {
// Check returns true if the migration can be applied to a repo.
Check(context.Context, restic.Repository) (bool, error)
RepoCheckOptions() *RepositoryCheckOptions
// Apply runs the migration.
Apply(context.Context, restic.Repository) error

View File

@@ -37,6 +37,10 @@ func (m *S3Layout) Check(ctx context.Context, repo restic.Repository) (bool, err
return true, nil
}
func (m *S3Layout) RepoCheckOptions() *RepositoryCheckOptions {
return nil
}
func retry(max int, fail func(err error), f func() error) error {
var err error
for i := 0; i < max; i++ {

View File

@@ -0,0 +1,126 @@
package migrations
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"github.com/restic/restic/internal/restic"
)
func init() {
register(&UpgradeRepoV2{})
}
type UpgradeRepoV2Error struct {
UploadNewConfigError error
ReuploadOldConfigError error
BackupFilePath string
}
func (err *UpgradeRepoV2Error) Error() string {
if err.ReuploadOldConfigError != nil {
return fmt.Sprintf("error uploading config (%v), re-uploading old config filed failed as well (%v), but there is a backup of the config file in %v", err.UploadNewConfigError, err.ReuploadOldConfigError, err.BackupFilePath)
}
return fmt.Sprintf("error uploading config (%v), re-uploaded old config was successful, there is a backup of the config file in %v", err.UploadNewConfigError, err.BackupFilePath)
}
func (err *UpgradeRepoV2Error) Unwrap() error {
// consider the original upload error as the primary cause
return err.UploadNewConfigError
}
type UpgradeRepoV2 struct{}
func (*UpgradeRepoV2) Name() string {
return "upgrade_repo_v2"
}
func (*UpgradeRepoV2) Desc() string {
return "upgrade a repository to version 2"
}
func (*UpgradeRepoV2) Check(ctx context.Context, repo restic.Repository) (bool, error) {
isV1 := repo.Config().Version == 1
return isV1, nil
}
func (*UpgradeRepoV2) RepoCheckOptions() *RepositoryCheckOptions {
return &RepositoryCheckOptions{}
}
func (*UpgradeRepoV2) upgrade(ctx context.Context, repo restic.Repository) error {
h := restic.Handle{Type: restic.ConfigFile}
if !repo.Backend().HasAtomicReplace() {
// remove the original file for backends which do not support atomic overwriting
err := repo.Backend().Remove(ctx, h)
if err != nil {
return fmt.Errorf("remove config failed: %w", err)
}
}
// upgrade config
cfg := repo.Config()
cfg.Version = 2
_, err := repo.SaveJSONUnpacked(ctx, restic.ConfigFile, cfg)
if err != nil {
return fmt.Errorf("save new config file failed: %w", err)
}
return nil
}
func (m *UpgradeRepoV2) Apply(ctx context.Context, repo restic.Repository) error {
tempdir, err := ioutil.TempDir("", "restic-migrate-upgrade-repo-v2-")
if err != nil {
return fmt.Errorf("create temp dir failed: %w", err)
}
h := restic.Handle{Type: restic.ConfigFile}
// read raw config file and save it to a temp dir, just in case
var rawConfigFile []byte
err = repo.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (err error) {
rawConfigFile, err = ioutil.ReadAll(rd)
return err
})
if err != nil {
return fmt.Errorf("load config file failed: %w", err)
}
backupFileName := filepath.Join(tempdir, "config")
err = ioutil.WriteFile(backupFileName, rawConfigFile, 0600)
if err != nil {
return fmt.Errorf("write config file backup to %v failed: %w", tempdir, err)
}
// run the upgrade
err = m.upgrade(ctx, repo)
if err != nil {
// build an error we can return to the caller
repoError := &UpgradeRepoV2Error{
UploadNewConfigError: err,
BackupFilePath: backupFileName,
}
// try contingency methods, reupload the original file
_ = repo.Backend().Remove(ctx, h)
err = repo.Backend().Save(ctx, h, restic.NewByteReader(rawConfigFile, nil))
if err != nil {
repoError.ReuploadOldConfigError = err
}
return repoError
}
_ = os.Remove(backupFileName)
_ = os.Remove(tempdir)
return nil
}

View File

@@ -0,0 +1,112 @@
package migrations
import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
func TestUpgradeRepoV2(t *testing.T) {
repo, cleanup := repository.TestRepositoryWithVersion(t, 1)
defer cleanup()
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}
m := &UpgradeRepoV2{}
ok, err := m.Check(context.Background(), repo)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("migration check returned false")
}
err = m.Apply(context.Background(), repo)
if err != nil {
t.Fatal(err)
}
}
type failBackend struct {
restic.Backend
mu sync.Mutex
ConfigFileSavesUntilError uint
}
func (be *failBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if h.Type != restic.ConfigFile {
return be.Backend.Save(ctx, h, rd)
}
be.mu.Lock()
if be.ConfigFileSavesUntilError == 0 {
be.mu.Unlock()
return errors.New("failure induced for testing")
}
be.ConfigFileSavesUntilError--
be.mu.Unlock()
return be.Backend.Save(ctx, h, rd)
}
func TestUpgradeRepoV2Failure(t *testing.T) {
be, cleanup := repository.TestBackend(t)
defer cleanup()
// wrap backend so that it fails upgrading the config after the initial write
be = &failBackend{
ConfigFileSavesUntilError: 1,
Backend: be,
}
repo, cleanup := repository.TestRepositoryWithBackend(t, be, 1)
defer cleanup()
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}
m := &UpgradeRepoV2{}
ok, err := m.Check(context.Background(), repo)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("migration check returned false")
}
err = m.Apply(context.Background(), repo)
if err == nil {
t.Fatal("expected error returned from Apply(), got nil")
}
upgradeErr := err.(*UpgradeRepoV2Error)
if upgradeErr.UploadNewConfigError == nil {
t.Fatal("expected upload error, got nil")
}
if upgradeErr.ReuploadOldConfigError == nil {
t.Fatal("expected reupload error, got nil")
}
if upgradeErr.BackupFilePath == "" {
t.Fatal("no backup file path found")
}
test.OK(t, os.Remove(upgradeErr.BackupFilePath))
test.OK(t, os.Remove(filepath.Dir(upgradeErr.BackupFilePath)))
}

View File

@@ -11,18 +11,19 @@ import (
// Backend implements a mock backend.
type Backend struct {
CloseFn func() error
IsNotExistFn func(err error) bool
SaveFn func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error
OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error)
ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error
RemoveFn func(ctx context.Context, h restic.Handle) error
TestFn func(ctx context.Context, h restic.Handle) (bool, error)
DeleteFn func(ctx context.Context) error
ConnectionsFn func() uint
LocationFn func() string
HasherFn func() hash.Hash
CloseFn func() error
IsNotExistFn func(err error) bool
SaveFn func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error
OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error)
ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error
RemoveFn func(ctx context.Context, h restic.Handle) error
TestFn func(ctx context.Context, h restic.Handle) (bool, error)
DeleteFn func(ctx context.Context) error
ConnectionsFn func() uint
LocationFn func() string
HasherFn func() hash.Hash
HasAtomicReplaceFn func() bool
}
// NewBackend returns new mock Backend instance
@@ -66,6 +67,14 @@ func (m *Backend) Hasher() hash.Hash {
return m.HasherFn()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (m *Backend) HasAtomicReplace() bool {
if m.HasAtomicReplaceFn == nil {
return false
}
return m.HasAtomicReplaceFn()
}
// IsNotExist returns true if the error is caused by a missing file.
func (m *Backend) IsNotExist(err error) bool {
if m.IsNotExistFn == nil {

View File

@@ -24,6 +24,9 @@ type Backend interface {
// Hasher may return a hash function for calculating a content hash for the backend
Hasher() hash.Hash
// HasAtomicReplace returns whether Save() can atomically replace files
HasAtomicReplace() bool
// Test a boolean value whether a File with the name and type exists.
Test(ctx context.Context, h Handle) (bool, error)

View File

@@ -23,7 +23,7 @@ const MaxRepoVersion = 2
// StableRepoVersion is the version that is written to the config when a repository
// is newly created with Init().
const StableRepoVersion = 1
const StableRepoVersion = 2
// JSONUnpackedLoader loads unpacked JSON.
type JSONUnpackedLoader interface {