cache: move to backend package

This commit is contained in:
Michael Eischer
2024-05-24 23:04:06 +02:00
parent 80132e71d8
commit 8e5d7d719c
15 changed files with 6 additions and 6 deletions

215
internal/backend/cache/backend.go vendored Normal file
View File

@@ -0,0 +1,215 @@
package cache
import (
"context"
"io"
"sync"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
)
// Backend wraps a restic.Backend and adds a cache.
type Backend struct {
backend.Backend
*Cache
// inProgress contains the handle for all files that are currently
// downloaded. The channel in the value is closed as soon as the download
// is finished.
inProgressMutex sync.Mutex
inProgress map[backend.Handle]chan struct{}
}
// ensure Backend implements backend.Backend
var _ backend.Backend = &Backend{}
func newBackend(be backend.Backend, c *Cache) *Backend {
return &Backend{
Backend: be,
Cache: c,
inProgress: make(map[backend.Handle]chan struct{}),
}
}
// Remove deletes a file from the backend and the cache if it has been cached.
func (b *Backend) Remove(ctx context.Context, h backend.Handle) error {
debug.Log("cache Remove(%v)", h)
err := b.Backend.Remove(ctx, h)
if err != nil {
return err
}
_, err = b.Cache.remove(h)
return err
}
func autoCacheTypes(h backend.Handle) bool {
switch h.Type {
case backend.IndexFile, backend.SnapshotFile:
return true
case backend.PackFile:
return h.IsMetadata
}
return false
}
// Save stores a new file in the backend and the cache.
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
if !autoCacheTypes(h) {
return b.Backend.Save(ctx, h, rd)
}
debug.Log("Save(%v): auto-store in the cache", h)
// make sure the reader is at the start
err := rd.Rewind()
if err != nil {
return err
}
// first, save in the backend
err = b.Backend.Save(ctx, h, rd)
if err != nil {
return err
}
// next, save in the cache
err = rd.Rewind()
if err != nil {
return err
}
err = b.Cache.save(h, rd)
if err != nil {
debug.Log("unable to save %v to cache: %v", h, err)
return err
}
return nil
}
func (b *Backend) cacheFile(ctx context.Context, h backend.Handle) error {
finish := make(chan struct{})
b.inProgressMutex.Lock()
other, alreadyDownloading := b.inProgress[h]
if !alreadyDownloading {
b.inProgress[h] = finish
}
b.inProgressMutex.Unlock()
if alreadyDownloading {
debug.Log("readahead %v is already performed by somebody else, delegating...", h)
<-other
debug.Log("download %v finished", h)
return nil
}
defer func() {
// signal other waiting goroutines that the file may now be cached
close(finish)
// remove the finish channel from the map
b.inProgressMutex.Lock()
delete(b.inProgress, h)
b.inProgressMutex.Unlock()
}()
// test again, maybe the file was cached in the meantime
if !b.Cache.Has(h) {
// nope, it's still not in the cache, pull it from the repo and save it
err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error {
return b.Cache.save(h, rd)
})
if err != nil {
// try to remove from the cache, ignore errors
_, _ = b.Cache.remove(h)
}
return err
}
return nil
}
// loadFromCache will try to load the file from the cache.
func (b *Backend) loadFromCache(h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) (bool, error) {
rd, inCache, err := b.Cache.load(h, length, offset)
if err != nil {
return inCache, err
}
err = consumer(rd)
if err != nil {
_ = rd.Close() // ignore secondary errors
return true, err
}
return true, rd.Close()
}
// Load loads a file from the cache or the backend.
func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
b.inProgressMutex.Lock()
waitForFinish, inProgress := b.inProgress[h]
b.inProgressMutex.Unlock()
if inProgress {
debug.Log("downloading %v is already in progress, waiting for finish", h)
<-waitForFinish
debug.Log("downloading %v finished", h)
}
// try loading from cache without checking that the handle is actually cached
inCache, err := b.loadFromCache(h, length, offset, consumer)
if inCache {
debug.Log("error loading %v from cache: %v", h, err)
// the caller must explicitly use cache.Forget() to remove the cache entry
return err
}
// if we don't automatically cache this file type, fall back to the backend
if !autoCacheTypes(h) {
debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset)
return b.Backend.Load(ctx, h, length, offset, consumer)
}
debug.Log("auto-store %v in the cache", h)
err = b.cacheFile(ctx, h)
if err != nil {
return err
}
inCache, err = b.loadFromCache(h, length, offset, consumer)
if inCache {
if err != nil {
debug.Log("error loading %v from cache: %v", h, err)
}
return err
}
debug.Log("error caching %v: %v, falling back to backend", h, err)
return b.Backend.Load(ctx, h, length, offset, consumer)
}
// Stat tests whether the backend has a file. If it does not exist but still
// exists in the cache, it is removed from the cache.
func (b *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo, error) {
debug.Log("cache Stat(%v)", h)
fi, err := b.Backend.Stat(ctx, h)
if err != nil && b.Backend.IsNotExist(err) {
// try to remove from the cache, ignore errors
_, _ = b.Cache.remove(h)
}
return fi, err
}
// IsNotExist returns true if the error is caused by a non-existing file.
func (b *Backend) IsNotExist(err error) bool {
return b.Backend.IsNotExist(err)
}
func (b *Backend) Unwrap() backend.Backend {
return b.Backend
}

240
internal/backend/cache/backend_test.go vendored Normal file
View File

@@ -0,0 +1,240 @@
package cache
import (
"bytes"
"context"
"io"
"math/rand"
"strings"
"sync"
"testing"
"time"
"github.com/pkg/errors"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/mem"
backendtest "github.com/restic/restic/internal/backend/test"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
func loadAndCompare(t testing.TB, be backend.Backend, h backend.Handle, data []byte) {
buf, err := backendtest.LoadAll(context.TODO(), be, h)
if err != nil {
t.Fatal(err)
}
if len(buf) != len(data) {
t.Fatalf("wrong number of bytes read, want %v, got %v", len(data), len(buf))
}
if !bytes.Equal(buf, data) {
t.Fatalf("wrong data returned, want:\n %02x\ngot:\n %02x", data[:16], buf[:16])
}
}
func save(t testing.TB, be backend.Backend, h backend.Handle, data []byte) {
err := be.Save(context.TODO(), h, backend.NewByteReader(data, be.Hasher()))
if err != nil {
t.Fatal(err)
}
}
func remove(t testing.TB, be backend.Backend, h backend.Handle) {
err := be.Remove(context.TODO(), h)
if err != nil {
t.Fatal(err)
}
}
func randomData(n int) (backend.Handle, []byte) {
data := test.Random(rand.Int(), n)
id := restic.Hash(data)
h := backend.Handle{
Type: backend.IndexFile,
Name: id.String(),
}
return h, data
}
func TestBackend(t *testing.T) {
be := mem.New()
c := TestNewCache(t)
wbe := c.Wrap(be)
h, data := randomData(5234142)
// save directly in backend
save(t, be, h, data)
if c.Has(h) {
t.Errorf("cache has file too early")
}
// load data via cache
loadAndCompare(t, wbe, h, data)
if !c.Has(h) {
t.Errorf("cache doesn't have file after load")
}
// remove via cache
remove(t, wbe, h)
if c.Has(h) {
t.Errorf("cache has file after remove")
}
// save via cache
save(t, wbe, h, data)
if !c.Has(h) {
t.Errorf("cache doesn't have file after load")
}
// load data directly from backend
loadAndCompare(t, be, h, data)
// load data via cache
loadAndCompare(t, wbe, h, data)
// remove directly
remove(t, be, h)
if !c.Has(h) {
t.Errorf("file not in cache any more")
}
// run stat
_, err := wbe.Stat(context.TODO(), h)
if err == nil {
t.Errorf("expected error for removed file not found, got nil")
}
if !wbe.IsNotExist(err) {
t.Errorf("Stat() returned error that does not match IsNotExist(): %v", err)
}
if c.Has(h) {
t.Errorf("removed file still in cache after stat")
}
}
type loadCountingBackend struct {
backend.Backend
ctr int
}
func (l *loadCountingBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
l.ctr++
return l.Backend.Load(ctx, h, length, offset, fn)
}
func TestOutOfBoundsAccess(t *testing.T) {
be := &loadCountingBackend{Backend: mem.New()}
c := TestNewCache(t)
wbe := c.Wrap(be)
h, data := randomData(50)
save(t, be, h, data)
// load out of bounds
err := wbe.Load(context.TODO(), h, 100, 100, func(rd io.Reader) error {
t.Error("cache returned non-existant file section")
return errors.New("broken")
})
test.Assert(t, strings.Contains(err.Error(), " is too short"), "expected too short error, got %v", err)
test.Equals(t, 1, be.ctr, "expected file to be loaded only once")
// file must nevertheless get cached
if !c.Has(h) {
t.Errorf("cache doesn't have file after load")
}
// start within bounds, but request too large chunk
err = wbe.Load(context.TODO(), h, 100, 0, func(rd io.Reader) error {
t.Error("cache returned non-existant file section")
return errors.New("broken")
})
test.Assert(t, strings.Contains(err.Error(), " is too short"), "expected too short error, got %v", err)
test.Equals(t, 1, be.ctr, "expected file to be loaded only once")
}
func TestForget(t *testing.T) {
be := &loadCountingBackend{Backend: mem.New()}
c := TestNewCache(t)
wbe := c.Wrap(be)
h, data := randomData(50)
save(t, be, h, data)
loadAndCompare(t, wbe, h, data)
test.Equals(t, 1, be.ctr, "expected file to be loaded once")
// must still exist even if load returns an error
exp := errors.New("error")
err := wbe.Load(context.TODO(), h, 0, 0, func(rd io.Reader) error {
return exp
})
test.Equals(t, exp, err, "wrong error")
test.Assert(t, c.Has(h), "missing cache entry")
test.OK(t, c.Forget(h))
test.Assert(t, !c.Has(h), "cache entry should have been removed")
// cache it again
loadAndCompare(t, wbe, h, data)
test.Assert(t, c.Has(h), "missing cache entry")
// forget must delete file only once
err = c.Forget(h)
test.Assert(t, strings.Contains(err.Error(), "circuit breaker prevents repeated deletion of cached file"), "wrong error message %q", err)
test.Assert(t, c.Has(h), "cache entry should still exist")
}
type loadErrorBackend struct {
backend.Backend
loadError error
}
func (be loadErrorBackend) Load(_ context.Context, _ backend.Handle, _ int, _ int64, _ func(rd io.Reader) error) error {
time.Sleep(10 * time.Millisecond)
return be.loadError
}
func TestErrorBackend(t *testing.T) {
be := mem.New()
c := TestNewCache(t)
h, data := randomData(5234142)
// save directly in backend
save(t, be, h, data)
testErr := errors.New("test error")
errBackend := loadErrorBackend{
Backend: be,
loadError: testErr,
}
loadTest := func(wg *sync.WaitGroup, be backend.Backend) {
defer wg.Done()
buf, err := backendtest.LoadAll(context.TODO(), be, h)
if err == testErr {
return
}
if err != nil {
t.Error(err)
return
}
if !bytes.Equal(buf, data) {
t.Errorf("data does not match")
}
time.Sleep(time.Millisecond)
}
wrappedBE := c.Wrap(errBackend)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go loadTest(&wg, wrappedBE)
}
wg.Wait()
}

248
internal/backend/cache/cache.go vendored Normal file
View File

@@ -0,0 +1,248 @@
package cache
import (
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"sync"
"time"
"github.com/pkg/errors"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
)
// Cache manages a local cache.
type Cache struct {
path string
Base string
Created bool
forgotten sync.Map
}
const dirMode = 0700
const fileMode = 0644
func readVersion(dir string) (v uint, err error) {
buf, err := os.ReadFile(filepath.Join(dir, "version"))
if err != nil {
return 0, errors.Wrap(err, "readVersion")
}
ver, err := strconv.ParseUint(string(buf), 10, 32)
if err != nil {
return 0, errors.Wrap(err, "readVersion")
}
return uint(ver), nil
}
const cacheVersion = 1
var cacheLayoutPaths = map[restic.FileType]string{
restic.PackFile: "data",
restic.SnapshotFile: "snapshots",
restic.IndexFile: "index",
}
const cachedirTagSignature = "Signature: 8a477f597d28d172789f06886806bc55\n"
func writeCachedirTag(dir string) error {
tagfile := filepath.Join(dir, "CACHEDIR.TAG")
f, err := fs.OpenFile(tagfile, os.O_CREATE|os.O_EXCL|os.O_WRONLY, fileMode)
if err != nil {
if errors.Is(err, os.ErrExist) {
return nil
}
return errors.WithStack(err)
}
debug.Log("Create CACHEDIR.TAG at %v", dir)
if _, err := f.Write([]byte(cachedirTagSignature)); err != nil {
_ = f.Close()
return errors.WithStack(err)
}
return errors.WithStack(f.Close())
}
// New returns a new cache for the repo ID at basedir. If basedir is the empty
// string, the default cache location (according to the XDG standard) is used.
//
// For partial files, the complete file is loaded and stored in the cache when
// performReadahead returns true.
func New(id string, basedir string) (c *Cache, err error) {
if basedir == "" {
basedir, err = DefaultDir()
if err != nil {
return nil, err
}
}
err = fs.MkdirAll(basedir, dirMode)
if err != nil {
return nil, errors.WithStack(err)
}
// create base dir and tag it as a cache directory
if err = writeCachedirTag(basedir); err != nil {
return nil, err
}
cachedir := filepath.Join(basedir, id)
debug.Log("using cache dir %v", cachedir)
created := false
v, err := readVersion(cachedir)
switch {
case err == nil:
if v > cacheVersion {
return nil, errors.New("cache version is newer")
}
// Update the timestamp so that we can detect old cache dirs.
err = updateTimestamp(cachedir)
if err != nil {
return nil, err
}
case errors.Is(err, os.ErrNotExist):
// Create the repo cache dir. The parent exists, so Mkdir suffices.
err := fs.Mkdir(cachedir, dirMode)
switch {
case err == nil:
created = true
case errors.Is(err, os.ErrExist):
default:
return nil, errors.WithStack(err)
}
default:
return nil, errors.Wrap(err, "readVersion")
}
if v < cacheVersion {
err = os.WriteFile(filepath.Join(cachedir, "version"), []byte(fmt.Sprintf("%d", cacheVersion)), fileMode)
if err != nil {
return nil, errors.WithStack(err)
}
}
for _, p := range cacheLayoutPaths {
if err = fs.MkdirAll(filepath.Join(cachedir, p), dirMode); err != nil {
return nil, errors.WithStack(err)
}
}
c = &Cache{
path: cachedir,
Base: basedir,
Created: created,
}
return c, nil
}
// updateTimestamp sets the modification timestamp (mtime and atime) for the
// directory d to the current time.
func updateTimestamp(d string) error {
t := time.Now()
return fs.Chtimes(d, t, t)
}
// MaxCacheAge is the default age (30 days) after which cache directories are considered old.
const MaxCacheAge = 30 * 24 * time.Hour
func validCacheDirName(s string) bool {
r := regexp.MustCompile(`^[a-fA-F0-9]{64}$|^restic-check-cache-[0-9]+$`)
return r.MatchString(s)
}
// listCacheDirs returns the list of cache directories.
func listCacheDirs(basedir string) ([]os.FileInfo, error) {
f, err := fs.Open(basedir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
err = nil
}
return nil, err
}
entries, err := f.Readdir(-1)
if err != nil {
return nil, err
}
err = f.Close()
if err != nil {
return nil, err
}
result := make([]os.FileInfo, 0, len(entries))
for _, entry := range entries {
if !entry.IsDir() {
continue
}
if !validCacheDirName(entry.Name()) {
continue
}
result = append(result, entry)
}
return result, nil
}
// All returns a list of cache directories.
func All(basedir string) (dirs []os.FileInfo, err error) {
return listCacheDirs(basedir)
}
// OlderThan returns the list of cache directories older than max.
func OlderThan(basedir string, max time.Duration) ([]os.FileInfo, error) {
entries, err := listCacheDirs(basedir)
if err != nil {
return nil, err
}
var oldCacheDirs []os.FileInfo
for _, fi := range entries {
if !IsOld(fi.ModTime(), max) {
continue
}
oldCacheDirs = append(oldCacheDirs, fi)
}
debug.Log("%d old cache dirs found", len(oldCacheDirs))
return oldCacheDirs, nil
}
// Old returns a list of cache directories with a modification time of more
// than 30 days ago.
func Old(basedir string) ([]os.FileInfo, error) {
return OlderThan(basedir, MaxCacheAge)
}
// IsOld returns true if the timestamp is considered old.
func IsOld(t time.Time, maxAge time.Duration) bool {
oldest := time.Now().Add(-maxAge)
return t.Before(oldest)
}
// Wrap returns a backend with a cache.
func (c *Cache) Wrap(be backend.Backend) backend.Backend {
return newBackend(be, c)
}
// BaseDir returns the base directory.
func (c *Cache) BaseDir() string {
return c.Base
}

46
internal/backend/cache/cache_test.go vendored Normal file
View File

@@ -0,0 +1,46 @@
package cache
import (
"os"
"path/filepath"
"testing"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func TestNew(t *testing.T) {
parent := rtest.TempDir(t)
basedir := filepath.Join(parent, "cache")
id := restic.NewRandomID().String()
tagFile := filepath.Join(basedir, "CACHEDIR.TAG")
versionFile := filepath.Join(basedir, id, "version")
const (
stepCreate = iota
stepComplete
stepRmTag
stepRmVersion
stepEnd
)
for step := stepCreate; step < stepEnd; step++ {
switch step {
case stepRmTag:
rtest.OK(t, os.Remove(tagFile))
case stepRmVersion:
rtest.OK(t, os.Remove(versionFile))
}
c, err := New(id, basedir)
rtest.OK(t, err)
rtest.Equals(t, basedir, c.Base)
rtest.Equals(t, step == stepCreate, c.Created)
for _, name := range []string{tagFile, versionFile} {
info, err := os.Lstat(name)
rtest.OK(t, err)
rtest.Assert(t, info.Mode().IsRegular(), "")
}
}
}

28
internal/backend/cache/dir.go vendored Normal file
View File

@@ -0,0 +1,28 @@
package cache
import (
"fmt"
"os"
"path/filepath"
)
// EnvDir return $RESTIC_CACHE_DIR env
func EnvDir() string {
return os.Getenv("RESTIC_CACHE_DIR")
}
// DefaultDir returns $RESTIC_CACHE_DIR, or the default cache directory
// for the current OS if that variable is not set.
func DefaultDir() (cachedir string, err error) {
cachedir = EnvDir()
if cachedir != "" {
return cachedir, nil
}
cachedir, err = os.UserCacheDir()
if err != nil {
return "", fmt.Errorf("unable to locate cache directory: %v", err)
}
return filepath.Join(cachedir, "restic"), nil
}

31
internal/backend/cache/dir_test.go vendored Normal file
View File

@@ -0,0 +1,31 @@
package cache
import (
"os"
"testing"
rtest "github.com/restic/restic/internal/test"
)
// DefaultDir should honor RESTIC_CACHE_DIR on all platforms.
func TestCacheDirEnv(t *testing.T) {
cachedir := os.Getenv("RESTIC_CACHE_DIR")
if cachedir == "" {
cachedir = "/doesnt/exist"
err := os.Setenv("RESTIC_CACHE_DIR", cachedir)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.Unsetenv("RESTIC_CACHE_DIR")
if err != nil {
t.Fatal(err)
}
}()
}
dir, err := DefaultDir()
rtest.Equals(t, cachedir, dir)
rtest.OK(t, err)
}

241
internal/backend/cache/file.go vendored Normal file
View File

@@ -0,0 +1,241 @@
package cache
import (
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"github.com/pkg/errors"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/util"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
)
func (c *Cache) filename(h backend.Handle) string {
if len(h.Name) < 2 {
panic("Name is empty or too short")
}
subdir := h.Name[:2]
return filepath.Join(c.path, cacheLayoutPaths[h.Type], subdir, h.Name)
}
func (c *Cache) canBeCached(t backend.FileType) bool {
if c == nil {
return false
}
_, ok := cacheLayoutPaths[t]
return ok
}
// load returns a reader that yields the contents of the file with the
// given handle. rd must be closed after use. If an error is returned, the
// ReadCloser is nil. The bool return value indicates whether the requested
// file exists in the cache. It can be true even when no reader is returned
// because length or offset are out of bounds
func (c *Cache) load(h backend.Handle, length int, offset int64) (io.ReadCloser, bool, error) {
debug.Log("Load(%v, %v, %v) from cache", h, length, offset)
if !c.canBeCached(h.Type) {
return nil, false, errors.New("cannot be cached")
}
f, err := fs.Open(c.filename(h))
if err != nil {
return nil, false, errors.WithStack(err)
}
fi, err := f.Stat()
if err != nil {
_ = f.Close()
return nil, true, errors.WithStack(err)
}
size := fi.Size()
if size <= int64(crypto.CiphertextLength(0)) {
_ = f.Close()
return nil, true, errors.Errorf("cached file %v is truncated", h)
}
if size < offset+int64(length) {
_ = f.Close()
return nil, true, errors.Errorf("cached file %v is too short", h)
}
if offset > 0 {
if _, err = f.Seek(offset, io.SeekStart); err != nil {
_ = f.Close()
return nil, true, err
}
}
if length <= 0 {
return f, true, nil
}
return util.LimitReadCloser(f, int64(length)), true, nil
}
// save saves a file in the cache.
func (c *Cache) save(h backend.Handle, rd io.Reader) error {
debug.Log("Save to cache: %v", h)
if rd == nil {
return errors.New("Save() called with nil reader")
}
if !c.canBeCached(h.Type) {
return errors.New("cannot be cached")
}
finalname := c.filename(h)
dir := filepath.Dir(finalname)
err := fs.Mkdir(dir, 0700)
if err != nil && !errors.Is(err, os.ErrExist) {
return err
}
// First save to a temporary location. This allows multiple concurrent
// restics to use a single cache dir.
f, err := os.CreateTemp(dir, "tmp-")
if err != nil {
return err
}
n, err := io.Copy(f, rd)
if err != nil {
_ = f.Close()
_ = fs.Remove(f.Name())
return errors.Wrap(err, "Copy")
}
if n <= int64(crypto.CiphertextLength(0)) {
_ = f.Close()
_ = fs.Remove(f.Name())
debug.Log("trying to cache truncated file %v, removing", h)
return nil
}
// Close, then rename. Windows doesn't like the reverse order.
if err = f.Close(); err != nil {
_ = fs.Remove(f.Name())
return errors.WithStack(err)
}
err = fs.Rename(f.Name(), finalname)
if err != nil {
_ = fs.Remove(f.Name())
}
if runtime.GOOS == "windows" && errors.Is(err, os.ErrPermission) {
// On Windows, renaming over an existing file is ok
// (os.Rename is MoveFileExW with MOVEFILE_REPLACE_EXISTING
// since Go 1.5), but not when someone else has the file open.
//
// When we get Access denied, we assume that's the case
// and the other process has written the desired contents to f.
err = nil
}
return errors.WithStack(err)
}
func (c *Cache) Forget(h backend.Handle) error {
h.IsMetadata = false
if _, ok := c.forgotten.Load(h); ok {
// Delete a file at most once while restic runs.
// This prevents repeatedly caching and forgetting broken files
return fmt.Errorf("circuit breaker prevents repeated deletion of cached file %v", h)
}
removed, err := c.remove(h)
if removed {
c.forgotten.Store(h, struct{}{})
}
return err
}
// remove deletes a file. When the file is not cached, no error is returned.
func (c *Cache) remove(h backend.Handle) (bool, error) {
if !c.canBeCached(h.Type) {
return false, nil
}
err := fs.Remove(c.filename(h))
removed := err == nil
if errors.Is(err, os.ErrNotExist) {
err = nil
}
return removed, err
}
// Clear removes all files of type t from the cache that are not contained in
// the set valid.
func (c *Cache) Clear(t restic.FileType, valid restic.IDSet) error {
debug.Log("Clearing cache for %v: %v valid files", t, len(valid))
if !c.canBeCached(t) {
return nil
}
list, err := c.list(t)
if err != nil {
return err
}
for id := range list {
if valid.Has(id) {
continue
}
// ignore ErrNotExist to gracefully handle multiple processes running Clear() concurrently
if err = fs.Remove(c.filename(backend.Handle{Type: t, Name: id.String()})); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
}
return nil
}
func isFile(fi os.FileInfo) bool {
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
}
// list returns a list of all files of type T in the cache.
func (c *Cache) list(t restic.FileType) (restic.IDSet, error) {
if !c.canBeCached(t) {
return nil, errors.New("cannot be cached")
}
list := restic.NewIDSet()
dir := filepath.Join(c.path, cacheLayoutPaths[t])
err := filepath.Walk(dir, func(name string, fi os.FileInfo, err error) error {
if err != nil {
return errors.Wrap(err, "Walk")
}
if !isFile(fi) {
return nil
}
id, err := restic.ParseID(filepath.Base(name))
if err != nil {
return nil
}
list.Insert(id)
return nil
})
return list, err
}
// Has returns true if the file is cached.
func (c *Cache) Has(h backend.Handle) bool {
if !c.canBeCached(h.Type) {
return false
}
_, err := fs.Stat(c.filename(h))
return err == nil
}

288
internal/backend/cache/file_test.go vendored Normal file
View File

@@ -0,0 +1,288 @@
package cache
import (
"bytes"
"fmt"
"io"
"math/rand"
"os"
"runtime"
"testing"
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)
func generateRandomFiles(t testing.TB, tpe backend.FileType, c *Cache) restic.IDSet {
ids := restic.NewIDSet()
for i := 0; i < rand.Intn(15)+10; i++ {
buf := rtest.Random(rand.Int(), 1<<19)
id := restic.Hash(buf)
h := backend.Handle{Type: tpe, Name: id.String()}
if c.Has(h) {
t.Errorf("index %v present before save", id)
}
err := c.save(h, bytes.NewReader(buf))
if err != nil {
t.Fatal(err)
}
ids.Insert(id)
}
return ids
}
// randomID returns a random ID from s.
func randomID(s restic.IDSet) restic.ID {
for id := range s {
return id
}
panic("set is empty")
}
func load(t testing.TB, c *Cache, h backend.Handle) []byte {
rd, inCache, err := c.load(h, 0, 0)
if err != nil {
t.Fatal(err)
}
rtest.Equals(t, true, inCache, "expected inCache flag to be true")
if rd == nil {
t.Fatalf("load() returned nil reader")
}
buf, err := io.ReadAll(rd)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatal(err)
}
return buf
}
func listFiles(t testing.TB, c *Cache, tpe restic.FileType) restic.IDSet {
list, err := c.list(tpe)
if err != nil {
t.Errorf("listing failed: %v", err)
}
return list
}
func clearFiles(t testing.TB, c *Cache, tpe restic.FileType, valid restic.IDSet) {
if err := c.Clear(tpe, valid); err != nil {
t.Error(err)
}
}
func TestFiles(t *testing.T) {
seed := time.Now().Unix()
t.Logf("seed is %v", seed)
rand.Seed(seed)
c := TestNewCache(t)
var tests = []restic.FileType{
restic.SnapshotFile,
restic.PackFile,
restic.IndexFile,
}
for _, tpe := range tests {
t.Run(tpe.String(), func(t *testing.T) {
ids := generateRandomFiles(t, tpe, c)
id := randomID(ids)
h := backend.Handle{Type: tpe, Name: id.String()}
id2 := restic.Hash(load(t, c, h))
if !id.Equal(id2) {
t.Errorf("wrong data returned, want %v, got %v", id.Str(), id2.Str())
}
if !c.Has(h) {
t.Errorf("cache thinks index %v isn't present", id.Str())
}
list := listFiles(t, c, tpe)
if !ids.Equals(list) {
t.Errorf("wrong list of index IDs returned, want:\n %v\ngot:\n %v", ids, list)
}
clearFiles(t, c, tpe, restic.NewIDSet(id))
list2 := listFiles(t, c, tpe)
ids.Delete(id)
want := restic.NewIDSet(id)
if !list2.Equals(want) {
t.Errorf("ClearIndexes removed indexes, want:\n %v\ngot:\n %v", list2, want)
}
clearFiles(t, c, tpe, restic.NewIDSet())
want = restic.NewIDSet()
list3 := listFiles(t, c, tpe)
if !list3.Equals(want) {
t.Errorf("ClearIndexes returned a wrong list, want:\n %v\ngot:\n %v", want, list3)
}
})
}
}
func TestFileLoad(t *testing.T) {
seed := time.Now().Unix()
t.Logf("seed is %v", seed)
rand.Seed(seed)
c := TestNewCache(t)
// save about 5 MiB of data in the cache
data := rtest.Random(rand.Int(), 5234142)
id := restic.ID{}
copy(id[:], data)
h := backend.Handle{
Type: restic.PackFile,
Name: id.String(),
}
if err := c.save(h, bytes.NewReader(data)); err != nil {
t.Fatalf("Save() returned error: %v", err)
}
var tests = []struct {
offset int64
length int
}{
{0, 0},
{5, 0},
{32*1024 + 5, 0},
{0, 123},
{0, 64*1024 + 234},
{100, 5234142 - 100},
}
for _, test := range tests {
t.Run(fmt.Sprintf("%v/%v", test.length, test.offset), func(t *testing.T) {
rd, inCache, err := c.load(h, test.length, test.offset)
if err != nil {
t.Fatal(err)
}
rtest.Equals(t, true, inCache, "expected inCache flag to be true")
buf, err := io.ReadAll(rd)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatal(err)
}
o := int(test.offset)
l := test.length
if test.length == 0 {
l = len(data) - o
}
if l > len(data)-o {
l = len(data) - o
}
if len(buf) != l {
t.Fatalf("wrong number of bytes returned: want %d, got %d", l, len(buf))
}
if !bytes.Equal(buf, data[o:o+l]) {
t.Fatalf("wrong data returned, want:\n %02x\ngot:\n %02x", data[o:o+16], buf[:16])
}
})
}
}
// Simulate multiple processes writing to a cache, using goroutines.
//
// The possibility of sharing a cache between multiple concurrent restic
// processes isn't guaranteed in the docs and doesn't always work on Windows, hence the
// check on GOOS. Cache sharing is considered a "nice to have" on POSIX, for now.
//
// The cache first creates a temporary file and then renames it to its final name.
// On Windows renaming internally creates a file handle with a shareMode which
// includes FILE_SHARE_DELETE. The Go runtime opens files without FILE_SHARE_DELETE,
// thus Open(fn) will fail until the file handle used for renaming was closed.
// See https://devblogs.microsoft.com/oldnewthing/20211022-00/?p=105822
// for hints on how to fix this properly.
func TestFileSaveConcurrent(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("may not work due to FILE_SHARE_DELETE issue")
}
const nproc = 40
var (
c = TestNewCache(t)
data = rtest.Random(1, 10000)
g errgroup.Group
id restic.ID
)
rand.Read(id[:])
h := backend.Handle{
Type: restic.PackFile,
Name: id.String(),
}
for i := 0; i < nproc/2; i++ {
g.Go(func() error { return c.save(h, bytes.NewReader(data)) })
// Can't use load because only the main goroutine may call t.Fatal.
g.Go(func() error {
// The timing is hard to get right, but the main thing we want to
// ensure is ENOENT or nil error.
time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond)
f, _, err := c.load(h, 0, 0)
t.Logf("Load error: %v", err)
switch {
case err == nil:
case errors.Is(err, os.ErrNotExist):
return nil
default:
return err
}
defer func() { _ = f.Close() }()
read, err := io.ReadAll(f)
if err == nil && !bytes.Equal(read, data) {
err = errors.New("mismatch between Save and Load")
}
return err
})
}
rtest.OK(t, g.Wait())
saved := load(t, c, h)
rtest.Equals(t, data, saved)
}
func TestFileSaveAfterDamage(t *testing.T) {
c := TestNewCache(t)
rtest.OK(t, fs.RemoveAll(c.path))
// save a few bytes of data in the cache
data := rtest.Random(123456789, 42)
id := restic.Hash(data)
h := backend.Handle{
Type: restic.PackFile,
Name: id.String(),
}
if err := c.save(h, bytes.NewReader(data)); err == nil {
t.Fatal("Missing error when saving to deleted cache directory")
}
}

20
internal/backend/cache/testing.go vendored Normal file
View File

@@ -0,0 +1,20 @@
package cache
import (
"testing"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
// TestNewCache returns a cache in a temporary directory which is removed when
// cleanup is called.
func TestNewCache(t testing.TB) *Cache {
dir := test.TempDir(t)
t.Logf("created new cache at %v", dir)
cache, err := New(restic.NewRandomID().String(), dir)
if err != nil {
t.Fatal(err)
}
return cache
}