backend: refactor backend Connections and HasAtomicReplace into Properties

This commit is contained in:
Michael Eischer 2025-02-16 22:27:58 +01:00
parent 5ddda7f5e9
commit c970e58739
18 changed files with 89 additions and 97 deletions

View File

@ -217,8 +217,11 @@ func (be *Backend) IsPermanentError(err error) bool {
return false return false
} }
func (be *Backend) Connections() uint { func (be *Backend) Properties() backend.Properties {
return be.connections return backend.Properties{
Connections: be.connections,
HasAtomicReplace: true,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -226,11 +229,6 @@ func (be *Backend) Hasher() hash.Hash {
return md5.New() return md5.New()
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend. // Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string { func (be *Backend) Path() string {
return be.prefix return be.prefix

View File

@ -154,8 +154,11 @@ func (be *b2Backend) SetListMaxItems(i int) {
be.listMaxItems = i be.listMaxItems = i
} }
func (be *b2Backend) Connections() uint { func (be *b2Backend) Properties() backend.Properties {
return be.cfg.Connections return backend.Properties{
Connections: be.cfg.Connections,
HasAtomicReplace: true,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -163,11 +166,6 @@ func (be *b2Backend) Hasher() hash.Hash {
return nil return nil
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *b2Backend) HasAtomicReplace() bool {
return true
}
// IsNotExist returns true if the error is caused by a non-existing file. // IsNotExist returns true if the error is caused by a non-existing file.
func (be *b2Backend) IsNotExist(err error) bool { func (be *b2Backend) IsNotExist(err error) bool {
// blazer/b2 does not export its error types and values, // blazer/b2 does not export its error types and values,

View File

@ -17,15 +17,12 @@ var ErrNoRepository = fmt.Errorf("repository does not exist")
// the context package need not be wrapped, as context cancellation is checked // the context package need not be wrapped, as context cancellation is checked
// separately by the retrying logic. // separately by the retrying logic.
type Backend interface { type Backend interface {
// Connections returns the maximum number of concurrent backend operations. // Properties returns information about the backend
Connections() uint Properties() Properties
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
Hasher() hash.Hash Hasher() hash.Hash
// HasAtomicReplace returns whether Save() can atomically replace files
HasAtomicReplace() bool
// Remove removes a File described by h. // Remove removes a File described by h.
Remove(ctx context.Context, h Handle) error Remove(ctx context.Context, h Handle) error
@ -92,6 +89,14 @@ type Backend interface {
WarmupWait(ctx context.Context, h []Handle) error WarmupWait(ctx context.Context, h []Handle) error
} }
type Properties struct {
// Connections states the maximum number of concurrent backend operations.
Connections uint
// HasAtomicReplace states whether Save() can atomically replace files
HasAtomicReplace bool
}
type Unwrapper interface { type Unwrapper interface {
// Unwrap returns the underlying backend or nil if there is none. // Unwrap returns the underlying backend or nil if there is none.
Unwrap() Backend Unwrap() Backend

View File

@ -42,8 +42,8 @@ func (be *Backend) Remove(_ context.Context, _ backend.Handle) error {
return nil return nil
} }
func (be *Backend) Connections() uint { func (be *Backend) Properties() backend.Properties {
return be.b.Connections() return be.b.Properties()
} }
// Delete removes all data in the backend. // Delete removes all data in the backend.
@ -59,10 +59,6 @@ func (be *Backend) Hasher() hash.Hash {
return be.b.Hasher() return be.b.Hasher()
} }
func (be *Backend) HasAtomicReplace() bool {
return be.b.HasAtomicReplace()
}
func (be *Backend) IsNotExist(err error) bool { func (be *Backend) IsNotExist(err error) bool {
return be.b.IsNotExist(err) return be.b.IsNotExist(err)
} }

View File

@ -186,8 +186,11 @@ func (be *Backend) IsPermanentError(err error) bool {
return false return false
} }
func (be *Backend) Connections() uint { func (be *Backend) Properties() backend.Properties {
return be.connections return backend.Properties{
Connections: be.connections,
HasAtomicReplace: true,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -195,11 +198,6 @@ func (be *Backend) Hasher() hash.Hash {
return md5.New() return md5.New()
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend. // Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string { func (be *Backend) Path() string {
return be.prefix return be.prefix

View File

@ -84,8 +84,11 @@ func Create(_ context.Context, cfg Config) (*Local, error) {
return be, nil return be, nil
} }
func (b *Local) Connections() uint { func (b *Local) Properties() backend.Properties {
return b.Config.Connections return backend.Properties{
Connections: b.Config.Connections,
HasAtomicReplace: true,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -93,11 +96,6 @@ func (b *Local) Hasher() hash.Hash {
return nil return nil
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (b *Local) HasAtomicReplace() bool {
return true
}
// IsNotExist returns true if the error is caused by a non existing file. // IsNotExist returns true if the error is caused by a non existing file.
func (b *Local) IsNotExist(err error) bool { func (b *Local) IsNotExist(err error) bool {
return errors.Is(err, os.ErrNotExist) return errors.Is(err, os.ErrNotExist)

View File

@ -218,8 +218,11 @@ func (be *MemoryBackend) List(ctx context.Context, t backend.FileType, fn func(b
return ctx.Err() return ctx.Err()
} }
func (be *MemoryBackend) Connections() uint { func (be *MemoryBackend) Properties() backend.Properties {
return connectionCount return backend.Properties{
Connections: connectionCount,
HasAtomicReplace: false,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -227,11 +230,6 @@ func (be *MemoryBackend) Hasher() hash.Hash {
return xxhash.New() return xxhash.New()
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *MemoryBackend) HasAtomicReplace() bool {
return false
}
// Delete removes all data in the backend. // Delete removes all data in the backend.
func (be *MemoryBackend) Delete(ctx context.Context) error { func (be *MemoryBackend) Delete(ctx context.Context) error {
be.m.Lock() be.m.Lock()

View File

@ -22,9 +22,8 @@ type Backend struct {
DeleteFn func(ctx context.Context) error DeleteFn func(ctx context.Context) error
WarmupFn func(ctx context.Context, h []backend.Handle) ([]backend.Handle, error) WarmupFn func(ctx context.Context, h []backend.Handle) ([]backend.Handle, error)
WarmupWaitFn func(ctx context.Context, h []backend.Handle) error WarmupWaitFn func(ctx context.Context, h []backend.Handle) error
ConnectionsFn func() uint PropertiesFn func() backend.Properties
HasherFn func() hash.Hash HasherFn func() hash.Hash
HasAtomicReplaceFn func() bool
} }
// NewBackend returns new mock Backend instance // NewBackend returns new mock Backend instance
@ -42,12 +41,15 @@ func (m *Backend) Close() error {
return m.CloseFn() return m.CloseFn()
} }
func (m *Backend) Connections() uint { func (m *Backend) Properties() backend.Properties {
if m.ConnectionsFn == nil { if m.PropertiesFn == nil {
return 2 return backend.Properties{
Connections: 2,
HasAtomicReplace: false,
}
} }
return m.ConnectionsFn() return m.PropertiesFn()
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -59,14 +61,6 @@ func (m *Backend) Hasher() hash.Hash {
return m.HasherFn() return m.HasherFn()
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (m *Backend) HasAtomicReplace() bool {
if m.HasAtomicReplaceFn == nil {
return false
}
return m.HasAtomicReplaceFn()
}
// IsNotExist returns true if the error is caused by a missing file. // IsNotExist returns true if the error is caused by a missing file.
func (m *Backend) IsNotExist(err error) bool { func (m *Backend) IsNotExist(err error) bool {
if m.IsNotExistFn == nil { if m.IsNotExistFn == nil {

View File

@ -116,8 +116,12 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, er
return be, nil return be, nil
} }
func (b *Backend) Connections() uint { func (b *Backend) Properties() backend.Properties {
return b.connections return backend.Properties{
Connections: b.connections,
// rest-server prevents overwriting
HasAtomicReplace: false,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -125,12 +129,6 @@ func (b *Backend) Hasher() hash.Hash {
return nil return nil
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (b *Backend) HasAtomicReplace() bool {
// rest-server prevents overwriting
return false
}
// Save stores data in the backend at the handle. // Save stores data in the backend at the handle.
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error { func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)

View File

@ -166,7 +166,7 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind
return nil return nil
} }
if be.Backend.HasAtomicReplace() { if be.Backend.Properties().HasAtomicReplace {
debug.Log("Save(%v) failed with error: %v", h, err) debug.Log("Save(%v) failed with error: %v", h, err)
// there is no need to remove files from backends which can atomically replace files // there is no need to remove files from backends which can atomically replace files
// in fact if something goes wrong at the backend side the delete operation might delete the wrong instance of the file // in fact if something goes wrong at the backend side the delete operation might delete the wrong instance of the file

View File

@ -69,7 +69,12 @@ func TestBackendSaveRetryAtomic(t *testing.T) {
calledRemove = true calledRemove = true
return nil return nil
}, },
HasAtomicReplaceFn: func() bool { return true }, PropertiesFn: func() backend.Properties {
return backend.Properties{
Connections: 2,
HasAtomicReplace: true,
}
},
} }
TestFastRetries(t) TestFastRetries(t)

View File

@ -261,8 +261,11 @@ func (be *Backend) IsPermanentError(err error) bool {
return false return false
} }
func (be *Backend) Connections() uint { func (be *Backend) Properties() backend.Properties {
return be.cfg.Connections return backend.Properties{
Connections: be.cfg.Connections,
HasAtomicReplace: true,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -270,11 +273,6 @@ func (be *Backend) Hasher() hash.Hash {
return nil return nil
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend. // Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string { func (be *Backend) Path() string {
return be.cfg.Prefix return be.cfg.Prefix

View File

@ -22,7 +22,7 @@ type connectionLimitedBackend struct {
// NewBackend creates a backend that limits the concurrent operations on the underlying backend // NewBackend creates a backend that limits the concurrent operations on the underlying backend
func NewBackend(be backend.Backend) backend.Backend { func NewBackend(be backend.Backend) backend.Backend {
sem, err := newSemaphore(be.Connections()) sem, err := newSemaphore(be.Properties().Connections)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -106,7 +106,12 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b
m := mock.NewBackend() m := mock.NewBackend()
setup(m) setup(m)
m.ConnectionsFn = func() uint { return uint(expectBlocked) } m.PropertiesFn = func() backend.Properties {
return backend.Properties{
Connections: uint(expectBlocked),
HasAtomicReplace: false,
}
}
be := sema.NewBackend(m) be := sema.NewBackend(m)
var wg errgroup.Group var wg errgroup.Group
@ -206,7 +211,12 @@ func TestFreeze(t *testing.T) {
atomic.AddInt64(&counter, 1) atomic.AddInt64(&counter, 1)
return nil return nil
} }
m.ConnectionsFn = func() uint { return 2 } m.PropertiesFn = func() backend.Properties {
return backend.Properties{
Connections: 2,
HasAtomicReplace: false,
}
}
be := sema.NewBackend(m) be := sema.NewBackend(m)
fb := be.(backend.FreezeBackend) fb := be.(backend.FreezeBackend)

View File

@ -264,8 +264,11 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) {
return open(sftp, cfg) return open(sftp, cfg)
} }
func (r *SFTP) Connections() uint { func (r *SFTP) Properties() backend.Properties {
return r.Config.Connections return backend.Properties{
Connections: r.Config.Connections,
HasAtomicReplace: r.posixRename,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -273,11 +276,6 @@ func (r *SFTP) Hasher() hash.Hash {
return nil return nil
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (r *SFTP) HasAtomicReplace() bool {
return r.posixRename
}
// tempSuffix generates a random string suffix that should be sufficiently long // tempSuffix generates a random string suffix that should be sufficiently long
// to avoid accidental conflicts // to avoid accidental conflicts
func tempSuffix() string { func tempSuffix() string {

View File

@ -111,8 +111,11 @@ func (be *beSwift) createContainer(ctx context.Context, policy string) error {
return be.conn.ContainerCreate(ctx, be.container, h) return be.conn.ContainerCreate(ctx, be.container, h)
} }
func (be *beSwift) Connections() uint { func (be *beSwift) Properties() backend.Properties {
return be.connections return backend.Properties{
Connections: be.connections,
HasAtomicReplace: true,
}
} }
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
@ -120,11 +123,6 @@ func (be *beSwift) Hasher() hash.Hash {
return md5.New() return md5.New()
} }
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *beSwift) HasAtomicReplace() bool {
return true
}
// Load runs fn with a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. // given offset.
func (be *beSwift) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { func (be *beSwift) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {

View File

@ -552,7 +552,7 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
innerWg, ctx := errgroup.WithContext(ctx) innerWg, ctx := errgroup.WithContext(ctx)
r.packerWg = innerWg r.packerWg = innerWg
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections())
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker) r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker)
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker) r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker)
@ -587,7 +587,7 @@ func (r *Repository) flushPacks(ctx context.Context) error {
} }
func (r *Repository) Connections() uint { func (r *Repository) Connections() uint {
return r.be.Connections() return r.be.Properties().Connections
} }
func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob { func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob {

View File

@ -33,7 +33,7 @@ func (err *upgradeRepoV2Error) Unwrap() error {
func upgradeRepository(ctx context.Context, repo *Repository) error { func upgradeRepository(ctx context.Context, repo *Repository) error {
h := backend.Handle{Type: backend.ConfigFile} h := backend.Handle{Type: backend.ConfigFile}
if !repo.be.HasAtomicReplace() { if !repo.be.Properties().HasAtomicReplace {
// remove the original file for backends which do not support atomic overwriting // remove the original file for backends which do not support atomic overwriting
err := repo.be.Remove(ctx, h) err := repo.be.Remove(ctx, h)
if err != nil { if err != nil {