index: move to repository package

This commit is contained in:
Michael Eischer
2024-05-24 23:06:44 +02:00
parent 8e5d7d719c
commit 50ec408302
19 changed files with 12 additions and 12 deletions

View File

@@ -0,0 +1,156 @@
package index
import (
"context"
"sort"
"github.com/restic/restic/internal/restic"
)
type associatedSetSub[T any] struct {
value []T
isSet []bool
}
// AssociatedSet is a memory efficient implementation of a BlobSet that can
// store a small data item for each BlobHandle. It relies on a special property
// of our MasterIndex implementation. A BlobHandle can be permanently identified
// using an offset that never changes as MasterIndex entries cannot be modified (only added).
//
// The AssociatedSet thus can use an array with the size of the MasterIndex to store
// its data. Access to an individual entry is possible by looking up the BlobHandle's
// offset from the MasterIndex.
//
// BlobHandles that are not part of the MasterIndex can be stored by placing them in
// an overflow set that is expected to be empty in the normal case.
type AssociatedSet[T any] struct {
byType [restic.NumBlobTypes]associatedSetSub[T]
overflow map[restic.BlobHandle]T
idx *MasterIndex
}
func NewAssociatedSet[T any](mi *MasterIndex) *AssociatedSet[T] {
a := AssociatedSet[T]{
overflow: make(map[restic.BlobHandle]T),
idx: mi,
}
for typ := range a.byType {
if typ == 0 {
continue
}
// index starts counting at 1
count := mi.stableLen(restic.BlobType(typ)) + 1
a.byType[typ].value = make([]T, count)
a.byType[typ].isSet = make([]bool, count)
}
return &a
}
func (a *AssociatedSet[T]) Get(bh restic.BlobHandle) (T, bool) {
if val, ok := a.overflow[bh]; ok {
return val, true
}
idx := a.idx.blobIndex(bh)
bt := &a.byType[bh.Type]
if idx >= len(bt.value) || idx == -1 {
var zero T
return zero, false
}
has := bt.isSet[idx]
if has {
return bt.value[idx], has
}
var zero T
return zero, false
}
func (a *AssociatedSet[T]) Has(bh restic.BlobHandle) bool {
_, ok := a.Get(bh)
return ok
}
func (a *AssociatedSet[T]) Set(bh restic.BlobHandle, val T) {
if _, ok := a.overflow[bh]; ok {
a.overflow[bh] = val
return
}
idx := a.idx.blobIndex(bh)
bt := &a.byType[bh.Type]
if idx >= len(bt.value) || idx == -1 {
a.overflow[bh] = val
} else {
bt.value[idx] = val
bt.isSet[idx] = true
}
}
func (a *AssociatedSet[T]) Insert(bh restic.BlobHandle) {
var zero T
a.Set(bh, zero)
}
func (a *AssociatedSet[T]) Delete(bh restic.BlobHandle) {
if _, ok := a.overflow[bh]; ok {
delete(a.overflow, bh)
return
}
idx := a.idx.blobIndex(bh)
bt := &a.byType[bh.Type]
if idx < len(bt.value) && idx != -1 {
bt.isSet[idx] = false
}
}
func (a *AssociatedSet[T]) Len() int {
count := 0
a.For(func(_ restic.BlobHandle, _ T) {
count++
})
return count
}
func (a *AssociatedSet[T]) For(cb func(bh restic.BlobHandle, val T)) {
for k, v := range a.overflow {
cb(k, v)
}
_ = a.idx.Each(context.Background(), func(pb restic.PackedBlob) {
if _, ok := a.overflow[pb.BlobHandle]; ok {
// already reported via overflow set
return
}
val, known := a.Get(pb.BlobHandle)
if known {
cb(pb.BlobHandle, val)
}
})
}
// List returns a sorted slice of all BlobHandle in the set.
func (a *AssociatedSet[T]) List() restic.BlobHandles {
list := make(restic.BlobHandles, 0)
a.For(func(bh restic.BlobHandle, _ T) {
list = append(list, bh)
})
return list
}
func (a *AssociatedSet[T]) String() string {
list := a.List()
sort.Sort(list)
str := list.String()
if len(str) < 2 {
return "{}"
}
return "{" + str[1:len(str)-1] + "}"
}

View File

@@ -0,0 +1,154 @@
package index
import (
"context"
"testing"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
type noopSaver struct{}
func (n *noopSaver) Connections() uint {
return 2
}
func (n *noopSaver) SaveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (restic.ID, error) {
return restic.Hash(buf), nil
}
func makeFakePackedBlob() (restic.BlobHandle, restic.PackedBlob) {
bh := restic.NewRandomBlobHandle()
blob := restic.PackedBlob{
PackID: restic.NewRandomID(),
Blob: restic.Blob{
BlobHandle: bh,
Length: uint(crypto.CiphertextLength(10)),
Offset: 0,
},
}
return bh, blob
}
func TestAssociatedSet(t *testing.T) {
bh, blob := makeFakePackedBlob()
mi := NewMasterIndex()
mi.StorePack(blob.PackID, []restic.Blob{blob.Blob})
test.OK(t, mi.SaveIndex(context.TODO(), &noopSaver{}))
bs := NewAssociatedSet[uint8](mi)
test.Equals(t, bs.Len(), 0)
test.Equals(t, bs.List(), restic.BlobHandles{})
// check non existent
test.Equals(t, bs.Has(bh), false)
_, ok := bs.Get(bh)
test.Equals(t, false, ok)
// test insert
bs.Insert(bh)
test.Equals(t, bs.Has(bh), true)
test.Equals(t, bs.Len(), 1)
test.Equals(t, bs.List(), restic.BlobHandles{bh})
test.Equals(t, 0, len(bs.overflow))
// test set
bs.Set(bh, 42)
test.Equals(t, bs.Has(bh), true)
test.Equals(t, bs.Len(), 1)
val, ok := bs.Get(bh)
test.Equals(t, true, ok)
test.Equals(t, uint8(42), val)
s := bs.String()
test.Assert(t, len(s) > 10, "invalid string: %v", s)
// test remove
bs.Delete(bh)
test.Equals(t, bs.Len(), 0)
test.Equals(t, bs.Has(bh), false)
test.Equals(t, bs.List(), restic.BlobHandles{})
test.Equals(t, "{}", bs.String())
// test set
bs.Set(bh, 43)
test.Equals(t, bs.Has(bh), true)
test.Equals(t, bs.Len(), 1)
val, ok = bs.Get(bh)
test.Equals(t, true, ok)
test.Equals(t, uint8(43), val)
test.Equals(t, 0, len(bs.overflow))
// test update
bs.Set(bh, 44)
val, ok = bs.Get(bh)
test.Equals(t, true, ok)
test.Equals(t, uint8(44), val)
test.Equals(t, 0, len(bs.overflow))
// test overflow blob
of := restic.NewRandomBlobHandle()
test.Equals(t, false, bs.Has(of))
// set
bs.Set(of, 7)
test.Equals(t, 1, len(bs.overflow))
test.Equals(t, bs.Len(), 2)
// get
val, ok = bs.Get(of)
test.Equals(t, true, ok)
test.Equals(t, uint8(7), val)
test.Equals(t, bs.List(), restic.BlobHandles{of, bh})
// update
bs.Set(of, 8)
val, ok = bs.Get(of)
test.Equals(t, true, ok)
test.Equals(t, uint8(8), val)
test.Equals(t, 1, len(bs.overflow))
// delete
bs.Delete(of)
test.Equals(t, bs.Len(), 1)
test.Equals(t, bs.Has(of), false)
test.Equals(t, bs.List(), restic.BlobHandles{bh})
test.Equals(t, 0, len(bs.overflow))
}
func TestAssociatedSetWithExtendedIndex(t *testing.T) {
_, blob := makeFakePackedBlob()
mi := NewMasterIndex()
mi.StorePack(blob.PackID, []restic.Blob{blob.Blob})
test.OK(t, mi.SaveIndex(context.TODO(), &noopSaver{}))
bs := NewAssociatedSet[uint8](mi)
// add new blobs to index after building the set
of, blob2 := makeFakePackedBlob()
mi.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
test.OK(t, mi.SaveIndex(context.TODO(), &noopSaver{}))
// non-existant
test.Equals(t, false, bs.Has(of))
// set
bs.Set(of, 5)
test.Equals(t, 1, len(bs.overflow))
test.Equals(t, bs.Len(), 1)
// get
val, ok := bs.Get(of)
test.Equals(t, true, ok)
test.Equals(t, uint8(5), val)
test.Equals(t, bs.List(), restic.BlobHandles{of})
// update
bs.Set(of, 8)
val, ok = bs.Get(of)
test.Equals(t, true, ok)
test.Equals(t, uint8(8), val)
test.Equals(t, 1, len(bs.overflow))
// delete
bs.Delete(of)
test.Equals(t, bs.Len(), 0)
test.Equals(t, bs.Has(of), false)
test.Equals(t, bs.List(), restic.BlobHandles{})
test.Equals(t, 0, len(bs.overflow))
}

View File

@@ -0,0 +1,588 @@
package index
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"sync"
"time"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/feature"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/debug"
)
// In large repositories, millions of blobs are stored in the repository
// and restic needs to store an index entry for each blob in memory for
// most operations.
// Hence the index data structure defined here is one of the main contributions
// to the total memory requirements of restic.
//
// We store the index entries in indexMaps. In these maps, entries take 56
// bytes each, plus 8/4 = 2 bytes of unused pointers on average, not counting
// malloc and header struct overhead and ignoring duplicates (those are only
// present in edge cases and are also removed by prune runs).
//
// In the index entries, we need to reference the packID. As one pack may
// contain many blobs the packIDs are saved in a separate array and only the index
// within this array is saved in the indexEntry
//
// We assume on average a minimum of 8 blobs per pack; BP=8.
// (Note that for large files there should be 3 blobs per pack as the average chunk
// size is 1.5 MB and the minimum pack size is 4 MB)
//
// We have the following sizes:
// indexEntry: 56 bytes (on amd64)
// each packID: 32 bytes
//
// To save N index entries, we therefore need:
// N * (56 + 2) bytes + N * 32 bytes / BP = N * 62 bytes,
// i.e., fewer than 64 bytes per blob in an index.
// Index holds lookup tables for id -> pack.
type Index struct {
m sync.RWMutex
byType [restic.NumBlobTypes]indexMap
packs restic.IDs
final bool // set to true for all indexes read from the backend ("finalized")
ids restic.IDs // set to the IDs of the contained finalized indexes
created time.Time
}
// NewIndex returns a new index.
func NewIndex() *Index {
return &Index{
created: time.Now(),
}
}
// addToPacks saves the given pack ID and return the index.
// This procedere allows to use pack IDs which can be easily garbage collected after.
func (idx *Index) addToPacks(id restic.ID) int {
idx.packs = append(idx.packs, id)
return len(idx.packs) - 1
}
func (idx *Index) store(packIndex int, blob restic.Blob) {
// assert that offset and length fit into uint32!
if blob.Offset > math.MaxUint32 || blob.Length > math.MaxUint32 || blob.UncompressedLength > math.MaxUint32 {
panic("offset or length does not fit in uint32. You have packs > 4GB!")
}
m := &idx.byType[blob.Type]
m.add(blob.ID, packIndex, uint32(blob.Offset), uint32(blob.Length), uint32(blob.UncompressedLength))
}
// Final returns true iff the index is already written to the repository, it is
// finalized.
func (idx *Index) Final() bool {
idx.m.RLock()
defer idx.m.RUnlock()
return idx.final
}
const (
indexMaxBlobs = 50000
indexMaxAge = 10 * time.Minute
)
// IndexFull returns true iff the index is "full enough" to be saved as a preliminary index.
var IndexFull = func(idx *Index) bool {
idx.m.RLock()
defer idx.m.RUnlock()
debug.Log("checking whether index %p is full", idx)
var blobs uint
for typ := range idx.byType {
blobs += idx.byType[typ].len()
}
age := time.Since(idx.created)
switch {
case age >= indexMaxAge:
debug.Log("index %p is old enough", idx, age)
return true
case blobs >= indexMaxBlobs:
debug.Log("index %p has %d blobs", idx, blobs)
return true
}
debug.Log("index %p only has %d blobs and is too young (%v)", idx, blobs, age)
return false
}
// StorePack remembers the ids of all blobs of a given pack
// in the index
func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("store new item in finalized index")
}
debug.Log("%v", blobs)
packIndex := idx.addToPacks(id)
for _, blob := range blobs {
idx.store(packIndex, blob)
}
}
func (idx *Index) toPackedBlob(e *indexEntry, t restic.BlobType) restic.PackedBlob {
return restic.PackedBlob{
Blob: restic.Blob{
BlobHandle: restic.BlobHandle{
ID: e.id,
Type: t},
Length: uint(e.length),
Offset: uint(e.offset),
UncompressedLength: uint(e.uncompressedLength),
},
PackID: idx.packs[e.packIndex],
}
}
// Lookup queries the index for the blob ID and returns all entries including
// duplicates. Adds found entries to blobs and returns the result.
func (idx *Index) Lookup(bh restic.BlobHandle, pbs []restic.PackedBlob) []restic.PackedBlob {
idx.m.RLock()
defer idx.m.RUnlock()
idx.byType[bh.Type].foreachWithID(bh.ID, func(e *indexEntry) {
pbs = append(pbs, idx.toPackedBlob(e, bh.Type))
})
return pbs
}
// Has returns true iff the id is listed in the index.
func (idx *Index) Has(bh restic.BlobHandle) bool {
idx.m.RLock()
defer idx.m.RUnlock()
return idx.byType[bh.Type].get(bh.ID) != nil
}
// LookupSize returns the length of the plaintext content of the blob with the
// given id.
func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found bool) {
idx.m.RLock()
defer idx.m.RUnlock()
e := idx.byType[bh.Type].get(bh.ID)
if e == nil {
return 0, false
}
if e.uncompressedLength != 0 {
return uint(e.uncompressedLength), true
}
return uint(crypto.PlaintextLength(int(e.length))), true
}
// Each passes all blobs known to the index to the callback fn. This blocks any
// modification of the index.
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
idx.m.RLock()
defer idx.m.RUnlock()
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
if ctx.Err() != nil {
return false
}
fn(idx.toPackedBlob(e, restic.BlobType(typ)))
return true
})
}
return ctx.Err()
}
type EachByPackResult struct {
PackID restic.ID
Blobs []restic.Blob
}
// EachByPack returns a channel that yields all blobs known to the index
// grouped by packID but ignoring blobs with a packID in packPlacklist for
// finalized indexes.
// This filtering is used when rebuilding the index where we need to ignore packs
// from the finalized index which have been re-read into a non-finalized index.
// When the context is cancelled, the background goroutine
// terminates. This blocks any modification of the index.
func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult {
idx.m.RLock()
ch := make(chan EachByPackResult)
go func() {
defer idx.m.RUnlock()
defer close(ch)
byPack := make(map[restic.ID][restic.NumBlobTypes][]*indexEntry)
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
packID := idx.packs[e.packIndex]
if !idx.final || !packBlacklist.Has(packID) {
v := byPack[packID]
v[typ] = append(v[typ], e)
byPack[packID] = v
}
return true
})
}
for packID, packByType := range byPack {
var result EachByPackResult
result.PackID = packID
for typ, pack := range packByType {
for _, e := range pack {
result.Blobs = append(result.Blobs, idx.toPackedBlob(e, restic.BlobType(typ)).Blob)
}
}
// allow GC once entry is no longer necessary
delete(byPack, packID)
select {
case <-ctx.Done():
return
case ch <- result:
}
}
}()
return ch
}
// Packs returns all packs in this index
func (idx *Index) Packs() restic.IDSet {
idx.m.RLock()
defer idx.m.RUnlock()
packs := restic.NewIDSet()
for _, packID := range idx.packs {
packs.Insert(packID)
}
return packs
}
type packJSON struct {
ID restic.ID `json:"id"`
Blobs []blobJSON `json:"blobs"`
}
type blobJSON struct {
ID restic.ID `json:"id"`
Type restic.BlobType `json:"type"`
Offset uint `json:"offset"`
Length uint `json:"length"`
UncompressedLength uint `json:"uncompressed_length,omitempty"`
}
// generatePackList returns a list of packs.
func (idx *Index) generatePackList() ([]packJSON, error) {
list := make([]packJSON, 0, len(idx.packs))
packs := make(map[restic.ID]int, len(list)) // Maps to index in list.
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
packID := idx.packs[e.packIndex]
if packID.IsNull() {
panic("null pack id")
}
i, ok := packs[packID]
if !ok {
i = len(list)
list = append(list, packJSON{ID: packID})
packs[packID] = i
}
p := &list[i]
// add blob
p.Blobs = append(p.Blobs, blobJSON{
ID: e.id,
Type: restic.BlobType(typ),
Offset: uint(e.offset),
Length: uint(e.length),
UncompressedLength: uint(e.uncompressedLength),
})
return true
})
}
return list, nil
}
type jsonIndex struct {
// removed: Supersedes restic.IDs `json:"supersedes,omitempty"`
Packs []packJSON `json:"packs"`
}
// Encode writes the JSON serialization of the index to the writer w.
func (idx *Index) Encode(w io.Writer) error {
debug.Log("encoding index")
idx.m.RLock()
defer idx.m.RUnlock()
list, err := idx.generatePackList()
if err != nil {
return err
}
enc := json.NewEncoder(w)
idxJSON := jsonIndex{
Packs: list,
}
return enc.Encode(idxJSON)
}
// SaveIndex saves an index in the repository.
func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := idx.Encode(buf)
if err != nil {
return restic.ID{}, err
}
id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes())
ierr := idx.SetID(id)
if ierr != nil {
// logic bug
panic(ierr)
}
return id, err
}
// Finalize sets the index to final.
func (idx *Index) Finalize() {
debug.Log("finalizing index")
idx.m.Lock()
defer idx.m.Unlock()
idx.final = true
}
// IDs returns the IDs of the index, if available. If the index is not yet
// finalized, an error is returned.
func (idx *Index) IDs() (restic.IDs, error) {
idx.m.RLock()
defer idx.m.RUnlock()
if !idx.final {
return nil, errors.New("index not finalized")
}
return idx.ids, nil
}
// SetID sets the ID the index has been written to. This requires that
// Finalize() has been called before, otherwise an error is returned.
func (idx *Index) SetID(id restic.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if !idx.final {
return errors.New("index is not final")
}
if len(idx.ids) > 0 {
return errors.New("ID already set")
}
debug.Log("ID set to %v", id)
idx.ids = append(idx.ids, id)
return nil
}
// Dump writes the pretty-printed JSON representation of the index to w.
func (idx *Index) Dump(w io.Writer) error {
debug.Log("dumping index")
idx.m.RLock()
defer idx.m.RUnlock()
list, err := idx.generatePackList()
if err != nil {
return err
}
outer := jsonIndex{
Packs: list,
}
buf, err := json.MarshalIndent(outer, "", " ")
if err != nil {
return err
}
_, err = w.Write(append(buf, '\n'))
if err != nil {
return errors.Wrap(err, "Write")
}
debug.Log("done")
return nil
}
// merge() merges indexes, i.e. idx.merge(idx2) merges the contents of idx2 into idx.
// During merging exact duplicates are removed; idx2 is not changed by this method.
func (idx *Index) merge(idx2 *Index) error {
idx.m.Lock()
defer idx.m.Unlock()
idx2.m.Lock()
defer idx2.m.Unlock()
if !idx2.final {
return errors.New("index to merge is not final")
}
packlen := len(idx.packs)
// first append packs as they might be accessed when looking for duplicates below
idx.packs = append(idx.packs, idx2.packs...)
// copy all index entries of idx2 to idx
for typ := range idx2.byType {
m2 := &idx2.byType[typ]
m := &idx.byType[typ]
// helper func to test if identical entry is contained in idx
hasIdenticalEntry := func(e2 *indexEntry) (found bool) {
m.foreachWithID(e2.id, func(e *indexEntry) {
b := idx.toPackedBlob(e, restic.BlobType(typ))
b2 := idx2.toPackedBlob(e2, restic.BlobType(typ))
if b == b2 {
found = true
}
})
return found
}
m2.foreach(func(e2 *indexEntry) bool {
if !hasIdenticalEntry(e2) {
// packIndex needs to be changed as idx2.pack was appended to idx.pack, see above
m.add(e2.id, e2.packIndex+packlen, e2.offset, e2.length, e2.uncompressedLength)
}
return true
})
}
idx.ids = append(idx.ids, idx2.ids...)
return nil
}
// isErrOldIndex returns true if the error may be caused by an old index
// format.
func isErrOldIndex(err error) bool {
e, ok := err.(*json.UnmarshalTypeError)
return ok && e.Value == "array"
}
// DecodeIndex unserializes an index from buf.
func DecodeIndex(buf []byte, id restic.ID) (idx *Index, oldFormat bool, err error) {
debug.Log("Start decoding index")
idxJSON := &jsonIndex{}
err = json.Unmarshal(buf, idxJSON)
if err != nil {
debug.Log("Error %v", err)
if isErrOldIndex(err) {
if feature.Flag.Enabled(feature.DeprecateLegacyIndex) {
return nil, false, fmt.Errorf("index seems to use the legacy format. update it using `restic repair index`")
}
debug.Log("index is probably old format, trying that")
idx, err = decodeOldIndex(buf)
idx.ids = append(idx.ids, id)
return idx, err == nil, err
}
return nil, false, errors.Wrap(err, "DecodeIndex")
}
idx = NewIndex()
for _, pack := range idxJSON.Packs {
packID := idx.addToPacks(pack.ID)
for _, blob := range pack.Blobs {
idx.store(packID, restic.Blob{
BlobHandle: restic.BlobHandle{
Type: blob.Type,
ID: blob.ID},
Offset: blob.Offset,
Length: blob.Length,
UncompressedLength: blob.UncompressedLength,
})
}
}
idx.ids = append(idx.ids, id)
idx.final = true
debug.Log("done")
return idx, false, nil
}
// DecodeOldIndex loads and unserializes an index in the old format from rd.
func decodeOldIndex(buf []byte) (idx *Index, err error) {
debug.Log("Start decoding old index")
list := []*packJSON{}
err = json.Unmarshal(buf, &list)
if err != nil {
debug.Log("Error %#v", err)
return nil, errors.Wrap(err, "Decode")
}
idx = NewIndex()
for _, pack := range list {
packID := idx.addToPacks(pack.ID)
for _, blob := range pack.Blobs {
idx.store(packID, restic.Blob{
BlobHandle: restic.BlobHandle{
Type: blob.Type,
ID: blob.ID},
Offset: blob.Offset,
Length: blob.Length,
// no compressed length in the old index format
})
}
}
idx.final = true
debug.Log("done")
return idx, nil
}
func (idx *Index) BlobIndex(bh restic.BlobHandle) int {
idx.m.RLock()
defer idx.m.RUnlock()
return idx.byType[bh.Type].firstIndex(bh.ID)
}
func (idx *Index) Len(t restic.BlobType) uint {
idx.m.RLock()
defer idx.m.RUnlock()
return idx.byType[t].len()
}

View File

@@ -0,0 +1,36 @@
package index
import (
"context"
"runtime"
"sync"
"github.com/restic/restic/internal/restic"
)
// ForAllIndexes loads all index files in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the callback
// returns an error, this function is cancelled and also returns that error.
func ForAllIndexes(ctx context.Context, lister restic.Lister, repo restic.LoaderUnpacked,
fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error {
// decoding an index can take quite some time such that this can be both CPU- or IO-bound
// as the whole index is kept in memory anyways, a few workers too much don't matter
workerCount := repo.Connections() + uint(runtime.GOMAXPROCS(0))
var m sync.Mutex
return restic.ParallelList(ctx, lister, restic.IndexFile, workerCount, func(ctx context.Context, id restic.ID, _ int64) error {
var err error
var idx *Index
oldFormat := false
buf, err := repo.LoadUnpacked(ctx, restic.IndexFile, id)
if err == nil {
idx, oldFormat, err = DecodeIndex(buf, id)
}
m.Lock()
defer m.Unlock()
return fn(id, idx, oldFormat, err)
})
}

View File

@@ -0,0 +1,48 @@
package index_test
import (
"context"
"path/filepath"
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
var repoFixture = filepath.Join("..", "testdata", "test-repo.tar.gz")
func TestRepositoryForAllIndexes(t *testing.T) {
repo, _, cleanup := repository.TestFromFixture(t, repoFixture)
defer cleanup()
expectedIndexIDs := restic.NewIDSet()
rtest.OK(t, repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
expectedIndexIDs.Insert(id)
return nil
}))
// check that all expected indexes are loaded without errors
indexIDs := restic.NewIDSet()
var indexErr error
rtest.OK(t, index.ForAllIndexes(context.TODO(), repo, repo, func(id restic.ID, index *index.Index, oldFormat bool, err error) error {
if err != nil {
indexErr = err
}
indexIDs.Insert(id)
return nil
}))
rtest.OK(t, indexErr)
rtest.Equals(t, expectedIndexIDs, indexIDs)
// must failed with the returned error
iterErr := errors.New("error to pass upwards")
err := index.ForAllIndexes(context.TODO(), repo, repo, func(id restic.ID, index *index.Index, oldFormat bool, err error) error {
return iterErr
})
rtest.Equals(t, iterErr, err)
}

View File

@@ -0,0 +1,670 @@
package index_test
import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"testing"
"github.com/restic/restic/internal/feature"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func TestIndexSerialize(t *testing.T) {
tests := []restic.PackedBlob{}
idx := index.NewIndex()
// create 50 packs with 20 blobs each
for i := 0; i < 50; i++ {
packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0)
for j := 0; j < 20; j++ {
length := uint(i*100 + j)
uncompressedLength := uint(0)
if i >= 25 {
// test a mix of compressed and uncompressed packs
uncompressedLength = 2 * length
}
pb := restic.PackedBlob{
Blob: restic.Blob{
BlobHandle: restic.NewRandomBlobHandle(),
Offset: pos,
Length: length,
UncompressedLength: uncompressedLength,
},
PackID: packID,
}
blobs = append(blobs, pb.Blob)
tests = append(tests, pb)
pos += length
}
idx.StorePack(packID, blobs)
}
wr := bytes.NewBuffer(nil)
err := idx.Encode(wr)
rtest.OK(t, err)
idx2ID := restic.NewRandomID()
idx2, oldFormat, err := index.DecodeIndex(wr.Bytes(), idx2ID)
rtest.OK(t, err)
rtest.Assert(t, idx2 != nil,
"nil returned for decoded index")
rtest.Assert(t, !oldFormat, "new index format recognized as old format")
indexID, err := idx2.IDs()
rtest.OK(t, err)
rtest.Equals(t, indexID, restic.IDs{idx2ID})
wr2 := bytes.NewBuffer(nil)
err = idx2.Encode(wr2)
rtest.OK(t, err)
for _, testBlob := range tests {
list := idx.Lookup(testBlob.BlobHandle, nil)
if len(list) != 1 {
t.Errorf("expected one result for blob %v, got %v: %v", testBlob.ID.Str(), len(list), list)
}
result := list[0]
rtest.Equals(t, testBlob, result)
list2 := idx2.Lookup(testBlob.BlobHandle, nil)
if len(list2) != 1 {
t.Errorf("expected one result for blob %v, got %v: %v", testBlob.ID.Str(), len(list2), list2)
}
result2 := list2[0]
rtest.Equals(t, testBlob, result2)
}
// add more blobs to idx
newtests := []restic.PackedBlob{}
for i := 0; i < 10; i++ {
packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0)
for j := 0; j < 10; j++ {
length := uint(i*100 + j)
pb := restic.PackedBlob{
Blob: restic.Blob{
BlobHandle: restic.NewRandomBlobHandle(),
Offset: pos,
Length: length,
},
PackID: packID,
}
blobs = append(blobs, pb.Blob)
newtests = append(newtests, pb)
pos += length
}
idx.StorePack(packID, blobs)
}
// finalize; serialize idx, unserialize to idx3
idx.Finalize()
wr3 := bytes.NewBuffer(nil)
err = idx.Encode(wr3)
rtest.OK(t, err)
rtest.Assert(t, idx.Final(),
"index not final after encoding")
id := restic.NewRandomID()
rtest.OK(t, idx.SetID(id))
ids, err := idx.IDs()
rtest.OK(t, err)
rtest.Equals(t, restic.IDs{id}, ids)
idx3, oldFormat, err := index.DecodeIndex(wr3.Bytes(), id)
rtest.OK(t, err)
rtest.Assert(t, idx3 != nil,
"nil returned for decoded index")
rtest.Assert(t, idx3.Final(),
"decoded index is not final")
rtest.Assert(t, !oldFormat, "new index format recognized as old format")
// all new blobs must be in the index
for _, testBlob := range newtests {
list := idx3.Lookup(testBlob.BlobHandle, nil)
if len(list) != 1 {
t.Errorf("expected one result for blob %v, got %v: %v", testBlob.ID.Str(), len(list), list)
}
blob := list[0]
rtest.Equals(t, testBlob, blob)
}
}
func TestIndexSize(t *testing.T) {
idx := index.NewIndex()
packs := 200
blobCount := 100
for i := 0; i < packs; i++ {
packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0)
for j := 0; j < blobCount; j++ {
length := uint(i*100 + j)
blobs = append(blobs, restic.Blob{
BlobHandle: restic.NewRandomBlobHandle(),
Offset: pos,
Length: length,
})
pos += length
}
idx.StorePack(packID, blobs)
}
wr := bytes.NewBuffer(nil)
err := idx.Encode(wr)
rtest.OK(t, err)
rtest.Equals(t, uint(packs*blobCount), idx.Len(restic.DataBlob))
rtest.Equals(t, uint(0), idx.Len(restic.TreeBlob))
t.Logf("Index file size for %d blobs in %d packs is %d", blobCount*packs, packs, wr.Len())
}
// example index serialization from doc/Design.rst
var docExampleV1 = []byte(`
{
"supersedes": [
"ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452"
],
"packs": [
{
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 38
},{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 112
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123
}
]
}
]
}
`)
var docExampleV2 = []byte(`
{
"supersedes": [
"ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452"
],
"packs": [
{
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 38
},
{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 112,
"uncompressed_length": 511
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123,
"uncompressed_length": 234
}
]
}
]
}
`)
var docOldExample = []byte(`
[ {
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 38
},{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 112
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123
}
]
} ]
`)
var exampleTests = []struct {
id, packID restic.ID
tpe restic.BlobType
offset, length uint
uncompressedLength uint
}{
{
restic.TestParseID("3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce"),
restic.TestParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
restic.DataBlob, 0, 38, 0,
}, {
restic.TestParseID("9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae"),
restic.TestParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
restic.TreeBlob, 38, 112, 511,
}, {
restic.TestParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66"),
restic.TestParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
restic.DataBlob, 150, 123, 234,
},
}
var exampleLookupTest = struct {
packID restic.ID
blobs map[restic.ID]restic.BlobType
}{
restic.TestParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
map[restic.ID]restic.BlobType{
restic.TestParseID("3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce"): restic.DataBlob,
restic.TestParseID("9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae"): restic.TreeBlob,
restic.TestParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66"): restic.DataBlob,
},
}
func TestIndexUnserialize(t *testing.T) {
for _, task := range []struct {
idxBytes []byte
version int
}{
{docExampleV1, 1},
{docExampleV2, 2},
} {
idx, oldFormat, err := index.DecodeIndex(task.idxBytes, restic.NewRandomID())
rtest.OK(t, err)
rtest.Assert(t, !oldFormat, "new index format recognized as old format")
for _, test := range exampleTests {
list := idx.Lookup(restic.BlobHandle{ID: test.id, Type: test.tpe}, nil)
if len(list) != 1 {
t.Errorf("expected one result for blob %v, got %v: %v", test.id.Str(), len(list), list)
}
blob := list[0]
t.Logf("looking for blob %v/%v, got %v", test.tpe, test.id.Str(), blob)
rtest.Equals(t, test.packID, blob.PackID)
rtest.Equals(t, test.tpe, blob.Type)
rtest.Equals(t, test.offset, blob.Offset)
rtest.Equals(t, test.length, blob.Length)
if task.version == 1 {
rtest.Equals(t, uint(0), blob.UncompressedLength)
} else if task.version == 2 {
rtest.Equals(t, test.uncompressedLength, blob.UncompressedLength)
} else {
t.Fatal("Invalid index version")
}
}
blobs := listPack(t, idx, exampleLookupTest.packID)
if len(blobs) != len(exampleLookupTest.blobs) {
t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs))
}
for _, blob := range blobs {
b, ok := exampleLookupTest.blobs[blob.ID]
if !ok {
t.Errorf("unexpected blob %v found", blob.ID.Str())
}
if blob.Type != b {
t.Errorf("unexpected type for blob %v: want %v, got %v", blob.ID.Str(), b, blob.Type)
}
}
}
}
func listPack(t testing.TB, idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) {
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.PackID.Equal(id) {
pbs = append(pbs, pb)
}
}))
return pbs
}
var (
benchmarkIndexJSON []byte
benchmarkIndexJSONOnce sync.Once
)
func initBenchmarkIndexJSON() {
idx, _ := createRandomIndex(rand.New(rand.NewSource(0)), 200000)
var buf bytes.Buffer
err := idx.Encode(&buf)
if err != nil {
panic(err)
}
benchmarkIndexJSON = buf.Bytes()
}
func BenchmarkDecodeIndex(b *testing.B) {
benchmarkIndexJSONOnce.Do(initBenchmarkIndexJSON)
id := restic.NewRandomID()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := index.DecodeIndex(benchmarkIndexJSON, id)
rtest.OK(b, err)
}
}
func BenchmarkDecodeIndexParallel(b *testing.B) {
benchmarkIndexJSONOnce.Do(initBenchmarkIndexJSON)
id := restic.NewRandomID()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _, err := index.DecodeIndex(benchmarkIndexJSON, id)
rtest.OK(b, err)
}
})
}
func BenchmarkEncodeIndex(b *testing.B) {
for _, n := range []int{100, 1000, 10000} {
idx, _ := createRandomIndex(rand.New(rand.NewSource(0)), n)
b.Run(fmt.Sprint(n), func(b *testing.B) {
buf := new(bytes.Buffer)
err := idx.Encode(buf)
rtest.OK(b, err)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
buf.Reset()
_ = idx.Encode(buf)
}
})
}
}
func TestIndexUnserializeOld(t *testing.T) {
defer feature.TestSetFlag(t, feature.Flag, feature.DeprecateLegacyIndex, false)()
idx, oldFormat, err := index.DecodeIndex(docOldExample, restic.NewRandomID())
rtest.OK(t, err)
rtest.Assert(t, oldFormat, "old index format recognized as new format")
for _, test := range exampleTests {
list := idx.Lookup(restic.BlobHandle{ID: test.id, Type: test.tpe}, nil)
if len(list) != 1 {
t.Errorf("expected one result for blob %v, got %v: %v", test.id.Str(), len(list), list)
}
blob := list[0]
rtest.Equals(t, test.packID, blob.PackID)
rtest.Equals(t, test.tpe, blob.Type)
rtest.Equals(t, test.offset, blob.Offset)
rtest.Equals(t, test.length, blob.Length)
}
}
func TestIndexPacks(t *testing.T) {
idx := index.NewIndex()
packs := restic.NewIDSet()
for i := 0; i < 20; i++ {
packID := restic.NewRandomID()
idx.StorePack(packID, []restic.Blob{
{
BlobHandle: restic.NewRandomBlobHandle(),
Offset: 0,
Length: 23,
},
})
packs.Insert(packID)
}
idxPacks := idx.Packs()
rtest.Assert(t, packs.Equals(idxPacks), "packs in index do not match packs added to index")
}
const maxPackSize = 16 * 1024 * 1024
// This function generates a (insecure) random ID, similar to NewRandomID
func NewRandomTestID(rng *rand.Rand) restic.ID {
id := restic.ID{}
rng.Read(id[:])
return id
}
func createRandomIndex(rng *rand.Rand, packfiles int) (idx *index.Index, lookupBh restic.BlobHandle) {
idx = index.NewIndex()
// create index with given number of pack files
for i := 0; i < packfiles; i++ {
packID := NewRandomTestID(rng)
var blobs []restic.Blob
offset := 0
for offset < maxPackSize {
size := 2000 + rng.Intn(4*1024*1024)
id := NewRandomTestID(rng)
blobs = append(blobs, restic.Blob{
BlobHandle: restic.BlobHandle{
Type: restic.DataBlob,
ID: id,
},
Length: uint(size),
UncompressedLength: uint(2 * size),
Offset: uint(offset),
})
offset += size
}
idx.StorePack(packID, blobs)
if i == 0 {
lookupBh = restic.BlobHandle{
Type: restic.DataBlob,
ID: blobs[rng.Intn(len(blobs))].ID,
}
}
}
return idx, lookupBh
}
func BenchmarkIndexHasUnknown(b *testing.B) {
idx, _ := createRandomIndex(rand.New(rand.NewSource(0)), 200000)
lookupBh := restic.NewRandomBlobHandle()
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx.Has(lookupBh)
}
}
func BenchmarkIndexHasKnown(b *testing.B) {
idx, lookupBh := createRandomIndex(rand.New(rand.NewSource(0)), 200000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx.Has(lookupBh)
}
}
func BenchmarkIndexAlloc(b *testing.B) {
rng := rand.New(rand.NewSource(0))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
createRandomIndex(rng, 200000)
}
}
func BenchmarkIndexAllocParallel(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
rng := rand.New(rand.NewSource(0))
for pb.Next() {
createRandomIndex(rng, 200000)
}
})
}
func TestIndexHas(t *testing.T) {
tests := []restic.PackedBlob{}
idx := index.NewIndex()
// create 50 packs with 20 blobs each
for i := 0; i < 50; i++ {
packID := restic.NewRandomID()
var blobs []restic.Blob
pos := uint(0)
for j := 0; j < 20; j++ {
length := uint(i*100 + j)
uncompressedLength := uint(0)
if i >= 25 {
// test a mix of compressed and uncompressed packs
uncompressedLength = 2 * length
}
pb := restic.PackedBlob{
Blob: restic.Blob{
BlobHandle: restic.NewRandomBlobHandle(),
Offset: pos,
Length: length,
UncompressedLength: uncompressedLength,
},
PackID: packID,
}
blobs = append(blobs, pb.Blob)
tests = append(tests, pb)
pos += length
}
idx.StorePack(packID, blobs)
}
for _, testBlob := range tests {
rtest.Assert(t, idx.Has(testBlob.BlobHandle), "Index reports not having data blob added to it")
}
rtest.Assert(t, !idx.Has(restic.NewRandomBlobHandle()), "Index reports having a data blob not added to it")
rtest.Assert(t, !idx.Has(restic.BlobHandle{ID: tests[0].ID, Type: restic.TreeBlob}), "Index reports having a tree blob added to it with the same id as a data blob")
}
func TestMixedEachByPack(t *testing.T) {
idx := index.NewIndex()
expected := make(map[restic.ID]int)
// create 50 packs with 2 blobs each
for i := 0; i < 50; i++ {
packID := restic.NewRandomID()
expected[packID] = 1
blobs := []restic.Blob{
{
BlobHandle: restic.BlobHandle{Type: restic.DataBlob, ID: restic.NewRandomID()},
Offset: 0,
Length: 42,
},
{
BlobHandle: restic.BlobHandle{Type: restic.TreeBlob, ID: restic.NewRandomID()},
Offset: 42,
Length: 43,
},
}
idx.StorePack(packID, blobs)
}
reported := make(map[restic.ID]int)
for bp := range idx.EachByPack(context.TODO(), restic.NewIDSet()) {
reported[bp.PackID]++
rtest.Equals(t, 2, len(bp.Blobs)) // correct blob count
if bp.Blobs[0].Offset > bp.Blobs[1].Offset {
bp.Blobs[1], bp.Blobs[0] = bp.Blobs[0], bp.Blobs[1]
}
b0 := bp.Blobs[0]
rtest.Assert(t, b0.Type == restic.DataBlob && b0.Offset == 0 && b0.Length == 42, "wrong blob", b0)
b1 := bp.Blobs[1]
rtest.Assert(t, b1.Type == restic.TreeBlob && b1.Offset == 42 && b1.Length == 43, "wrong blob", b1)
}
rtest.Equals(t, expected, reported)
}
func TestEachByPackIgnoes(t *testing.T) {
idx := index.NewIndex()
ignores := restic.NewIDSet()
expected := make(map[restic.ID]int)
// create 50 packs with one blob each
for i := 0; i < 50; i++ {
packID := restic.NewRandomID()
if i < 3 {
ignores.Insert(packID)
} else {
expected[packID] = 1
}
blobs := []restic.Blob{
{
BlobHandle: restic.BlobHandle{Type: restic.DataBlob, ID: restic.NewRandomID()},
Offset: 0,
Length: 42,
},
}
idx.StorePack(packID, blobs)
}
idx.Finalize()
reported := make(map[restic.ID]int)
for bp := range idx.EachByPack(context.TODO(), ignores) {
reported[bp.PackID]++
rtest.Equals(t, 1, len(bp.Blobs)) // correct blob count
b0 := bp.Blobs[0]
rtest.Assert(t, b0.Type == restic.DataBlob && b0.Offset == 0 && b0.Length == 42, "wrong blob", b0)
}
rtest.Equals(t, expected, reported)
}

View File

@@ -0,0 +1,258 @@
package index
import (
"hash/maphash"
"github.com/restic/restic/internal/restic"
)
// An indexMap is a chained hash table that maps blob IDs to indexEntries.
// It allows storing multiple entries with the same key.
//
// IndexMap uses some optimizations that are not compatible with supporting
// deletions.
//
// The buckets in this hash table contain only pointers, rather than inlined
// key-value pairs like the standard Go map. This way, only a pointer array
// needs to be resized when the table grows, preventing memory usage spikes.
type indexMap struct {
// The number of buckets is always a power of two and never zero.
buckets []uint
numentries uint
mh maphash.Hash
blockList hashedArrayTree
}
const (
growthFactor = 2 // Must be a power of 2.
maxLoad = 4 // Max. number of entries per bucket.
)
// add inserts an indexEntry for the given arguments into the map,
// using id as the key.
func (m *indexMap) add(id restic.ID, packIdx int, offset, length uint32, uncompressedLength uint32) {
switch {
case m.numentries == 0: // Lazy initialization.
m.init()
case m.numentries >= maxLoad*uint(len(m.buckets)):
m.grow()
}
h := m.hash(id)
e, idx := m.newEntry()
e.id = id
e.next = m.buckets[h] // Prepend to existing chain.
e.packIndex = packIdx
e.offset = offset
e.length = length
e.uncompressedLength = uncompressedLength
m.buckets[h] = idx
m.numentries++
}
// foreach calls fn for all entries in the map, until fn returns false.
func (m *indexMap) foreach(fn func(*indexEntry) bool) {
blockCount := m.blockList.Size()
for i := uint(1); i < blockCount; i++ {
if !fn(m.resolve(i)) {
return
}
}
}
// foreachWithID calls fn for all entries with the given id.
func (m *indexMap) foreachWithID(id restic.ID, fn func(*indexEntry)) {
if len(m.buckets) == 0 {
return
}
h := m.hash(id)
ei := m.buckets[h]
for ei != 0 {
e := m.resolve(ei)
ei = e.next
if e.id != id {
continue
}
fn(e)
}
}
// get returns the first entry for the given id.
func (m *indexMap) get(id restic.ID) *indexEntry {
if len(m.buckets) == 0 {
return nil
}
h := m.hash(id)
ei := m.buckets[h]
for ei != 0 {
e := m.resolve(ei)
if e.id == id {
return e
}
ei = e.next
}
return nil
}
// firstIndex returns the index of the first entry for ID id.
// This index is guaranteed to never change.
func (m *indexMap) firstIndex(id restic.ID) int {
if len(m.buckets) == 0 {
return -1
}
idx := -1
h := m.hash(id)
ei := m.buckets[h]
for ei != 0 {
e := m.resolve(ei)
cur := ei
ei = e.next
if e.id != id {
continue
}
if int(cur) < idx || idx == -1 {
// casting from uint to int is unproblematic as we'd run out of memory
// before this can result in an overflow.
idx = int(cur)
}
}
return idx
}
func (m *indexMap) grow() {
m.buckets = make([]uint, growthFactor*len(m.buckets))
blockCount := m.blockList.Size()
for i := uint(1); i < blockCount; i++ {
e := m.resolve(i)
h := m.hash(e.id)
e.next = m.buckets[h]
m.buckets[h] = i
}
}
func (m *indexMap) hash(id restic.ID) uint {
// We use maphash to prevent backups of specially crafted inputs
// from degrading performance.
// While SHA-256 should be collision-resistant, for hash table indices
// we use only a few bits of it and finding collisions for those is
// much easier than breaking the whole algorithm.
mh := maphash.Hash{}
mh.SetSeed(m.mh.Seed())
_, _ = mh.Write(id[:])
h := uint(mh.Sum64())
return h & uint(len(m.buckets)-1)
}
func (m *indexMap) init() {
const initialBuckets = 64
m.buckets = make([]uint, initialBuckets)
// first entry in blockList serves as null byte
m.blockList = *newHAT()
m.newEntry()
}
func (m *indexMap) len() uint { return m.numentries }
func (m *indexMap) newEntry() (*indexEntry, uint) {
return m.blockList.Alloc()
}
func (m *indexMap) resolve(idx uint) *indexEntry {
return m.blockList.Ref(idx)
}
type indexEntry struct {
id restic.ID
next uint
packIndex int // Position in containing Index's packs field.
offset uint32
length uint32
uncompressedLength uint32
}
type hashedArrayTree struct {
mask uint
maskShift uint
blockSize uint
size uint
blockList [][]indexEntry
}
func newHAT() *hashedArrayTree {
// start with a small block size
blockSizePower := uint(2)
blockSize := uint(1 << blockSizePower)
return &hashedArrayTree{
mask: blockSize - 1,
maskShift: blockSizePower,
blockSize: blockSize,
size: 0,
blockList: make([][]indexEntry, blockSize),
}
}
func (h *hashedArrayTree) Alloc() (*indexEntry, uint) {
h.grow()
size := h.size
idx, subIdx := h.index(size)
h.size++
return &h.blockList[idx][subIdx], size
}
func (h *hashedArrayTree) index(pos uint) (idx uint, subIdx uint) {
subIdx = pos & h.mask
idx = pos >> h.maskShift
return
}
func (h *hashedArrayTree) Ref(pos uint) *indexEntry {
if pos >= h.size {
panic("array index out of bounds")
}
idx, subIdx := h.index(pos)
return &h.blockList[idx][subIdx]
}
func (h *hashedArrayTree) Size() uint {
return h.size
}
func (h *hashedArrayTree) grow() {
idx, subIdx := h.index(h.size)
if int(idx) == len(h.blockList) {
// blockList is too short -> double list and block size
h.blockSize *= 2
h.mask = h.mask*2 + 1
h.maskShift++
idx = idx / 2
oldBlocks := h.blockList
h.blockList = make([][]indexEntry, h.blockSize)
// pairwise merging of blocks
for i := 0; i < len(oldBlocks); i += 2 {
block := make([]indexEntry, 0, h.blockSize)
block = append(block, oldBlocks[i]...)
block = append(block, oldBlocks[i+1]...)
h.blockList[i/2] = block
// allow GC
oldBlocks[i] = nil
oldBlocks[i+1] = nil
}
}
if subIdx == 0 {
// new index entry batch
h.blockList[idx] = make([]indexEntry, h.blockSize)
}
}

View File

@@ -0,0 +1,187 @@
package index
import (
"math/rand"
"testing"
"time"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func TestIndexMapBasic(t *testing.T) {
t.Parallel()
var (
id restic.ID
m indexMap
r = rand.New(rand.NewSource(98765))
)
for i := 1; i <= 400; i++ {
r.Read(id[:])
rtest.Assert(t, m.get(id) == nil, "%v retrieved but not added", id)
m.add(id, 0, 0, 0, 0)
rtest.Assert(t, m.get(id) != nil, "%v added but not retrieved", id)
rtest.Equals(t, uint(i), m.len())
}
}
func TestIndexMapForeach(t *testing.T) {
t.Parallel()
const N = 10
var m indexMap
// Don't crash on empty map.
m.foreach(func(*indexEntry) bool { return true })
for i := 0; i < N; i++ {
var id restic.ID
id[0] = byte(i)
m.add(id, i, uint32(i), uint32(i), uint32(i/2))
}
seen := make(map[int]struct{})
m.foreach(func(e *indexEntry) bool {
i := int(e.id[0])
rtest.Assert(t, i < N, "unknown id %v in indexMap", e.id)
rtest.Equals(t, i, e.packIndex)
rtest.Equals(t, i, int(e.length))
rtest.Equals(t, i, int(e.offset))
rtest.Equals(t, i/2, int(e.uncompressedLength))
seen[i] = struct{}{}
return true
})
rtest.Equals(t, N, len(seen))
ncalls := 0
m.foreach(func(*indexEntry) bool {
ncalls++
return false
})
rtest.Equals(t, 1, ncalls)
}
func TestIndexMapForeachWithID(t *testing.T) {
t.Parallel()
const ndups = 3
var (
id restic.ID
m indexMap
r = rand.New(rand.NewSource(1234321))
)
r.Read(id[:])
// No result (and no crash) for empty map.
n := 0
m.foreachWithID(id, func(*indexEntry) { n++ })
rtest.Equals(t, 0, n)
// Test insertion and retrieval of duplicates.
for i := 0; i < ndups; i++ {
m.add(id, i, 0, 0, 0)
}
for i := 0; i < 100; i++ {
var otherid restic.ID
r.Read(otherid[:])
m.add(otherid, -1, 0, 0, 0)
}
n = 0
var packs [ndups]bool
m.foreachWithID(id, func(e *indexEntry) {
packs[e.packIndex] = true
n++
})
rtest.Equals(t, ndups, n)
for i := range packs {
rtest.Assert(t, packs[i], "duplicate from pack %d not retrieved", i)
}
}
func TestHashedArrayTree(t *testing.T) {
hat := newHAT()
const testSize = 1024
for i := uint(0); i < testSize; i++ {
rtest.Assert(t, hat.Size() == i, "expected hat size %v got %v", i, hat.Size())
e, idx := hat.Alloc()
rtest.Assert(t, idx == i, "expected entry at idx %v got %v", i, idx)
e.length = uint32(i)
}
for i := uint(0); i < testSize; i++ {
e := hat.Ref(i)
rtest.Assert(t, e.length == uint32(i), "expected entry to contain %v got %v", uint32(i), e.length)
}
}
func BenchmarkIndexMapHash(b *testing.B) {
var m indexMap
m.add(restic.ID{}, 0, 0, 0, 0) // Trigger lazy initialization.
ids := make([]restic.ID, 128) // 4 KiB.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range ids {
r.Read(ids[i][:])
}
b.ReportAllocs()
b.SetBytes(int64(len(restic.ID{}) * len(ids)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, id := range ids {
m.hash(id)
}
}
}
func TestIndexMapFirstIndex(t *testing.T) {
t.Parallel()
var (
id restic.ID
m indexMap
r = rand.New(rand.NewSource(98765))
fi = make(map[restic.ID]int)
)
for i := 1; i <= 400; i++ {
r.Read(id[:])
rtest.Equals(t, -1, m.firstIndex(id), "wrong firstIndex for nonexistant id")
m.add(id, 0, 0, 0, 0)
idx := m.firstIndex(id)
rtest.Equals(t, i, idx, "unexpected index for id")
fi[id] = idx
}
// iterate over blobs, as this is a hashmap the order is effectively random
for id, idx := range fi {
rtest.Equals(t, idx, m.firstIndex(id), "wrong index returned")
}
}
func TestIndexMapFirstIndexDuplicates(t *testing.T) {
t.Parallel()
var (
id restic.ID
m indexMap
r = rand.New(rand.NewSource(98765))
)
r.Read(id[:])
for i := 1; i <= 10; i++ {
m.add(id, 0, 0, 0, 0)
}
idx := m.firstIndex(id)
rtest.Equals(t, 1, idx, "unexpected index for id")
}

View File

@@ -0,0 +1,653 @@
package index
import (
"context"
"fmt"
"runtime"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct {
idx []*Index
pendingBlobs restic.BlobSet
idxMutex sync.RWMutex
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
mi := &MasterIndex{pendingBlobs: restic.NewBlobSet()}
mi.clear()
return mi
}
func (mi *MasterIndex) clear() {
// Always add an empty final index, such that MergeFinalIndexes can merge into this.
mi.idx = []*Index{NewIndex()}
mi.idx[0].Finalize()
}
// Lookup queries all known Indexes for the ID and returns all matches.
func (mi *MasterIndex) Lookup(bh restic.BlobHandle) (pbs []restic.PackedBlob) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
pbs = idx.Lookup(bh, pbs)
}
return pbs
}
// LookupSize queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) LookupSize(bh restic.BlobHandle) (uint, bool) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if size, found := idx.LookupSize(bh); found {
return size, found
}
}
return 0, false
}
// AddPending adds a given blob to list of pending Blobs
// Before doing so it checks if this blob is already known.
// Returns true if adding was successful and false if the blob
// was already known
func (mi *MasterIndex) AddPending(bh restic.BlobHandle) bool {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// Check if blob is pending or in index
if mi.pendingBlobs.Has(bh) {
return false
}
for _, idx := range mi.idx {
if idx.Has(bh) {
return false
}
}
// really not known -> insert
mi.pendingBlobs.Insert(bh)
return true
}
// Has queries all known Indexes for the ID and returns the first match.
// Also returns true if the ID is pending.
func (mi *MasterIndex) Has(bh restic.BlobHandle) bool {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
// also return true if blob is pending
if mi.pendingBlobs.Has(bh) {
return true
}
for _, idx := range mi.idx {
if idx.Has(bh) {
return true
}
}
return false
}
// IDs returns the IDs of all indexes contained in the index.
func (mi *MasterIndex) IDs() restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
ids := restic.NewIDSet()
for _, idx := range mi.idx {
if !idx.Final() {
continue
}
indexIDs, err := idx.IDs()
if err != nil {
debug.Log("not using index, ID() returned error %v", err)
continue
}
for _, id := range indexIDs {
ids.Insert(id)
}
}
return ids
}
// Packs returns all packs that are covered by the index.
// If packBlacklist is given, those packs are only contained in the
// resulting IDSet if they are contained in a non-final (newly written) index.
func (mi *MasterIndex) Packs(packBlacklist restic.IDSet) restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
packs := restic.NewIDSet()
for _, idx := range mi.idx {
idxPacks := idx.Packs()
if idx.final && len(packBlacklist) > 0 {
idxPacks = idxPacks.Sub(packBlacklist)
}
packs.Merge(idxPacks)
}
return packs
}
// Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
mi.idx = append(mi.idx, idx)
}
// StorePack remembers the id and pack in the index.
func (mi *MasterIndex) StorePack(id restic.ID, blobs []restic.Blob) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// delete blobs from pending
for _, blob := range blobs {
mi.pendingBlobs.Delete(restic.BlobHandle{Type: blob.Type, ID: blob.ID})
}
for _, idx := range mi.idx {
if !idx.Final() {
idx.StorePack(id, blobs)
return
}
}
newIdx := NewIndex()
newIdx.StorePack(id, blobs)
mi.idx = append(mi.idx, newIdx)
}
// finalizeNotFinalIndexes finalizes all indexes that
// have not yet been saved and returns that list
func (mi *MasterIndex) finalizeNotFinalIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
for _, idx := range mi.idx {
if !idx.Final() {
idx.Finalize()
list = append(list, idx)
}
}
debug.Log("return %d indexes", len(list))
return list
}
// finalizeFullIndexes finalizes all indexes that are full and returns that list.
func (mi *MasterIndex) finalizeFullIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
debug.Log("checking %d indexes", len(mi.idx))
for _, idx := range mi.idx {
if idx.Final() {
continue
}
if IndexFull(idx) {
debug.Log("index %p is full", idx)
idx.Finalize()
list = append(list, idx)
} else {
debug.Log("index %p not full", idx)
}
}
debug.Log("return %d indexes", len(list))
return list
}
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if err := idx.Each(ctx, fn); err != nil {
return err
}
}
return nil
}
// MergeFinalIndexes merges all final indexes together.
// After calling, there will be only one big final index in MasterIndex
// containing all final index contents.
// Indexes that are not final are left untouched.
// This merging can only be called after all index files are loaded - as
// removing of superseded index contents is only possible for unmerged indexes.
func (mi *MasterIndex) MergeFinalIndexes() error {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// The first index is always final and the one to merge into
newIdx := mi.idx[:1]
for i := 1; i < len(mi.idx); i++ {
idx := mi.idx[i]
// clear reference in masterindex as it may become stale
mi.idx[i] = nil
// do not merge indexes that have no id set
ids, _ := idx.IDs()
if !idx.Final() || len(ids) == 0 {
newIdx = append(newIdx, idx)
} else {
err := mi.idx[0].merge(idx)
if err != nil {
return fmt.Errorf("MergeFinalIndexes: %w", err)
}
}
}
mi.idx = newIdx
return nil
}
func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, p *progress.Counter, cb func(id restic.ID, idx *Index, oldFormat bool, err error) error) error {
indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile)
if err != nil {
return err
}
if p != nil {
var numIndexFiles uint64
err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error {
numIndexFiles++
return nil
})
if err != nil {
return err
}
p.SetMax(numIndexFiles)
defer p.Done()
}
err = ForAllIndexes(ctx, indexList, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error {
if p != nil {
p.Add(1)
}
if cb != nil {
err = cb(id, idx, oldFormat, err)
}
if err != nil {
return err
}
// special case to allow check to ignore index loading errors
if idx == nil {
return nil
}
mi.Insert(idx)
return nil
})
if err != nil {
return err
}
return mi.MergeFinalIndexes()
}
type MasterIndexRewriteOpts struct {
SaveProgress *progress.Counter
DeleteProgress func() *progress.Counter
DeleteReport func(id restic.ID, err error)
}
// Rewrite removes packs whose ID is in excludePacks from all known indexes.
// It also removes the rewritten index files and those listed in extraObsolete.
// If oldIndexes is not nil, then only the indexes in this set are processed.
// This is used by repair index to only rewrite and delete the old indexes.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error {
for _, idx := range mi.idx {
if !idx.Final() {
panic("internal error - index must be saved before calling MasterIndex.Rewrite")
}
}
var indexes restic.IDSet
if oldIndexes != nil {
// repair index adds new index entries for already existing pack files
// only remove the old (possibly broken) entries by only processing old indexes
indexes = oldIndexes
} else {
indexes = mi.IDs()
}
p := opts.SaveProgress
p.SetMax(uint64(len(indexes)))
// reset state which is not necessary for Rewrite and just consumes a lot of memory
// the index state would be invalid after Rewrite completes anyways
mi.clear()
runtime.GC()
// copy excludePacks to prevent unintended sideeffects
excludePacks = excludePacks.Clone()
debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(indexes), excludePacks)
wg, wgCtx := errgroup.WithContext(ctx)
idxCh := make(chan restic.ID)
wg.Go(func() error {
defer close(idxCh)
for id := range indexes {
select {
case idxCh <- id:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
var rewriteWg sync.WaitGroup
type rewriteTask struct {
idx *Index
oldFormat bool
}
rewriteCh := make(chan rewriteTask)
loader := func() error {
defer rewriteWg.Done()
for id := range idxCh {
buf, err := repo.LoadUnpacked(wgCtx, restic.IndexFile, id)
if err != nil {
return fmt.Errorf("LoadUnpacked(%v): %w", id.Str(), err)
}
idx, oldFormat, err := DecodeIndex(buf, id)
if err != nil {
return err
}
select {
case rewriteCh <- rewriteTask{idx, oldFormat}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
}
// loading an index can take quite some time such that this is probably CPU-bound
// the index files are probably already cached at this point
loaderCount := runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < loaderCount; i++ {
rewriteWg.Add(1)
wg.Go(loader)
}
wg.Go(func() error {
rewriteWg.Wait()
close(rewriteCh)
return nil
})
obsolete := restic.NewIDSet(extraObsolete...)
saveCh := make(chan *Index)
wg.Go(func() error {
defer close(saveCh)
newIndex := NewIndex()
for task := range rewriteCh {
// always rewrite indexes using the old format, that include a pack that must be removed or that are not full
if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx) {
// make sure that each pack is only stored exactly once in the index
excludePacks.Merge(task.idx.Packs())
// index is already up to date
p.Add(1)
continue
}
ids, err := task.idx.IDs()
if err != nil || len(ids) != 1 {
panic("internal error, index has no ID")
}
obsolete.Merge(restic.NewIDSet(ids...))
for pbs := range task.idx.EachByPack(wgCtx, excludePacks) {
newIndex.StorePack(pbs.PackID, pbs.Blobs)
if IndexFull(newIndex) {
select {
case saveCh <- newIndex:
case <-wgCtx.Done():
return wgCtx.Err()
}
newIndex = NewIndex()
}
}
if wgCtx.Err() != nil {
return wgCtx.Err()
}
// make sure that each pack is only stored exactly once in the index
excludePacks.Merge(task.idx.Packs())
p.Add(1)
}
select {
case saveCh <- newIndex:
case <-wgCtx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range saveCh {
idx.Finalize()
if _, err := idx.SaveIndex(wgCtx, repo); err != nil {
return err
}
}
return nil
}
// encoding an index can take quite some time such that this can be CPU- or IO-bound
// do not add repo.Connections() here as there are already the loader goroutines.
workerCount := runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err := wg.Wait()
p.Done()
if err != nil {
return fmt.Errorf("failed to rewrite indexes: %w", err)
}
p = nil
if opts.DeleteProgress != nil {
p = opts.DeleteProgress()
}
defer p.Done()
return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error {
if opts.DeleteReport != nil {
opts.DeleteReport(id, err)
}
return err
}, p)
}
// SaveFallback saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes.
// It is only intended for use by prune with the UnsafeRecovery option.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error {
p.SetMax(uint64(len(mi.Packs(excludePacks))))
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks)
obsolete := restic.NewIDSet()
wg, wgCtx := errgroup.WithContext(ctx)
ch := make(chan *Index)
wg.Go(func() error {
defer close(ch)
newIndex := NewIndex()
for _, idx := range mi.idx {
if idx.Final() {
ids, err := idx.IDs()
if err != nil {
panic("internal error - finalized index without ID")
}
debug.Log("adding index ids %v to supersedes field", ids)
obsolete.Merge(restic.NewIDSet(ids...))
}
for pbs := range idx.EachByPack(wgCtx, excludePacks) {
newIndex.StorePack(pbs.PackID, pbs.Blobs)
p.Add(1)
if IndexFull(newIndex) {
select {
case ch <- newIndex:
case <-wgCtx.Done():
return wgCtx.Err()
}
newIndex = NewIndex()
}
}
if wgCtx.Err() != nil {
return wgCtx.Err()
}
}
select {
case ch <- newIndex:
case <-wgCtx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range ch {
idx.Finalize()
if _, err := idx.SaveIndex(wgCtx, repo); err != nil {
return err
}
}
return nil
}
// keep concurrency bounded as we're on a fallback path
workerCount := int(repo.Connections())
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err := wg.Wait()
p.Done()
// the index no longer matches to stored state
mi.clear()
return err
}
// saveIndex saves all indexes in the backend.
func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Saving index %d", i)
sid, err := idx.SaveIndex(ctx, r)
if err != nil {
return err
}
debug.Log("Saved index %d as %v", i, sid)
}
return mi.MergeFinalIndexes()
}
// SaveIndex saves all new indexes in the backend.
func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeNotFinalIndexes()...)
}
// SaveFullIndex saves all full indexes in the backend.
func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeFullIndexes()...)
}
// ListPacks returns the blobs of the specified pack files grouped by pack file.
func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs {
out := make(chan restic.PackBlobs)
go func() {
defer close(out)
// only resort a part of the index to keep the memory overhead bounded
for i := byte(0); i < 16; i++ {
packBlob := make(map[restic.ID][]restic.Blob)
for pack := range packs {
if pack[0]&0xf == i {
packBlob[pack] = nil
}
}
if len(packBlob) == 0 {
continue
}
err := mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
})
if err != nil {
return
}
// pass on packs
for packID, pbs := range packBlob {
// allow GC
packBlob[packID] = nil
select {
case out <- restic.PackBlobs{PackID: packID, Blobs: pbs}:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Only for use by AssociatedSet
func (mi *MasterIndex) blobIndex(h restic.BlobHandle) int {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
// other indexes are ignored as their ids can change when merged into the main index
return mi.idx[0].BlobIndex(h)
}
// Only for use by AssociatedSet
func (mi *MasterIndex) stableLen(t restic.BlobType) uint {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
// other indexes are ignored as their ids can change when merged into the main index
return mi.idx[0].Len(t)
}

View File

@@ -0,0 +1,461 @@
package index_test
import (
"context"
"fmt"
"math/rand"
"runtime"
"testing"
"time"
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func TestMasterIndex(t *testing.T) {
bhInIdx1 := restic.NewRandomBlobHandle()
bhInIdx2 := restic.NewRandomBlobHandle()
bhInIdx12 := restic.BlobHandle{ID: restic.NewRandomID(), Type: restic.TreeBlob}
blob1 := restic.PackedBlob{
PackID: restic.NewRandomID(),
Blob: restic.Blob{
BlobHandle: bhInIdx1,
Length: uint(crypto.CiphertextLength(10)),
Offset: 0,
},
}
blob2 := restic.PackedBlob{
PackID: restic.NewRandomID(),
Blob: restic.Blob{
BlobHandle: bhInIdx2,
Length: uint(crypto.CiphertextLength(100)),
Offset: 10,
UncompressedLength: 200,
},
}
blob12a := restic.PackedBlob{
PackID: restic.NewRandomID(),
Blob: restic.Blob{
BlobHandle: bhInIdx12,
Length: uint(crypto.CiphertextLength(123)),
Offset: 110,
UncompressedLength: 80,
},
}
blob12b := restic.PackedBlob{
PackID: restic.NewRandomID(),
Blob: restic.Blob{
BlobHandle: bhInIdx12,
Length: uint(crypto.CiphertextLength(123)),
Offset: 50,
UncompressedLength: 80,
},
}
idx1 := index.NewIndex()
idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
idx1.StorePack(blob12a.PackID, []restic.Blob{blob12a.Blob})
idx2 := index.NewIndex()
idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
idx2.StorePack(blob12b.PackID, []restic.Blob{blob12b.Blob})
mIdx := index.NewMasterIndex()
mIdx.Insert(idx1)
mIdx.Insert(idx2)
// test idInIdx1
found := mIdx.Has(bhInIdx1)
rtest.Equals(t, true, found)
blobs := mIdx.Lookup(bhInIdx1)
rtest.Equals(t, []restic.PackedBlob{blob1}, blobs)
size, found := mIdx.LookupSize(bhInIdx1)
rtest.Equals(t, true, found)
rtest.Equals(t, uint(10), size)
// test idInIdx2
found = mIdx.Has(bhInIdx2)
rtest.Equals(t, true, found)
blobs = mIdx.Lookup(bhInIdx2)
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
size, found = mIdx.LookupSize(bhInIdx2)
rtest.Equals(t, true, found)
rtest.Equals(t, uint(200), size)
// test idInIdx12
found = mIdx.Has(bhInIdx12)
rtest.Equals(t, true, found)
blobs = mIdx.Lookup(bhInIdx12)
rtest.Equals(t, 2, len(blobs))
// test Lookup result for blob12a
found = false
if blobs[0] == blob12a || blobs[1] == blob12a {
found = true
}
rtest.Assert(t, found, "blob12a not found in result")
// test Lookup result for blob12b
found = false
if blobs[0] == blob12b || blobs[1] == blob12b {
found = true
}
rtest.Assert(t, found, "blob12a not found in result")
size, found = mIdx.LookupSize(bhInIdx12)
rtest.Equals(t, true, found)
rtest.Equals(t, uint(80), size)
// test not in index
found = mIdx.Has(restic.BlobHandle{ID: restic.NewRandomID(), Type: restic.TreeBlob})
rtest.Assert(t, !found, "Expected no blobs when fetching with a random id")
blobs = mIdx.Lookup(restic.NewRandomBlobHandle())
rtest.Assert(t, blobs == nil, "Expected no blobs when fetching with a random id")
_, found = mIdx.LookupSize(restic.NewRandomBlobHandle())
rtest.Assert(t, !found, "Expected no blobs when fetching with a random id")
}
func TestMasterMergeFinalIndexes(t *testing.T) {
bhInIdx1 := restic.NewRandomBlobHandle()
bhInIdx2 := restic.NewRandomBlobHandle()
blob1 := restic.PackedBlob{
PackID: restic.NewRandomID(),
Blob: restic.Blob{
BlobHandle: bhInIdx1,
Length: 10,
Offset: 0,
},
}
blob2 := restic.PackedBlob{
PackID: restic.NewRandomID(),
Blob: restic.Blob{
BlobHandle: bhInIdx2,
Length: 100,
Offset: 10,
UncompressedLength: 200,
},
}
idx1 := index.NewIndex()
idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
idx2 := index.NewIndex()
idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
mIdx := index.NewMasterIndex()
mIdx.Insert(idx1)
mIdx.Insert(idx2)
rtest.Equals(t, restic.NewIDSet(), mIdx.IDs())
finalIndexes, idxCount, ids := index.TestMergeIndex(t, mIdx)
rtest.Equals(t, []*index.Index{idx1, idx2}, finalIndexes)
rtest.Equals(t, 1, idxCount)
rtest.Equals(t, ids, mIdx.IDs())
blobCount := 0
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
}))
rtest.Equals(t, 2, blobCount)
blobs := mIdx.Lookup(bhInIdx1)
rtest.Equals(t, []restic.PackedBlob{blob1}, blobs)
blobs = mIdx.Lookup(bhInIdx2)
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobs = mIdx.Lookup(restic.NewRandomBlobHandle())
rtest.Assert(t, blobs == nil, "Expected no blobs when fetching with a random id")
// merge another index containing identical blobs
idx3 := index.NewIndex()
idx3.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
idx3.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
mIdx.Insert(idx3)
finalIndexes, idxCount, newIDs := index.TestMergeIndex(t, mIdx)
rtest.Equals(t, []*index.Index{idx3}, finalIndexes)
rtest.Equals(t, 1, idxCount)
ids.Merge(newIDs)
rtest.Equals(t, ids, mIdx.IDs())
// Index should have same entries as before!
blobs = mIdx.Lookup(bhInIdx1)
rtest.Equals(t, []restic.PackedBlob{blob1}, blobs)
blobs = mIdx.Lookup(bhInIdx2)
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobCount = 0
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
}))
rtest.Equals(t, 2, blobCount)
}
func createRandomMasterIndex(t testing.TB, rng *rand.Rand, num, size int) (*index.MasterIndex, restic.BlobHandle) {
mIdx := index.NewMasterIndex()
for i := 0; i < num-1; i++ {
idx, _ := createRandomIndex(rng, size)
mIdx.Insert(idx)
}
idx1, lookupBh := createRandomIndex(rng, size)
mIdx.Insert(idx1)
index.TestMergeIndex(t, mIdx)
return mIdx, lookupBh
}
func BenchmarkMasterIndexAlloc(b *testing.B) {
rng := rand.New(rand.NewSource(0))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
createRandomMasterIndex(b, rng, 10000, 5)
}
}
func BenchmarkMasterIndexLookupSingleIndex(b *testing.B) {
mIdx, lookupBh := createRandomMasterIndex(b, rand.New(rand.NewSource(0)), 1, 200000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mIdx.Lookup(lookupBh)
}
}
func BenchmarkMasterIndexLookupMultipleIndex(b *testing.B) {
mIdx, lookupBh := createRandomMasterIndex(b, rand.New(rand.NewSource(0)), 100, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mIdx.Lookup(lookupBh)
}
}
func BenchmarkMasterIndexLookupSingleIndexUnknown(b *testing.B) {
lookupBh := restic.NewRandomBlobHandle()
mIdx, _ := createRandomMasterIndex(b, rand.New(rand.NewSource(0)), 1, 200000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mIdx.Lookup(lookupBh)
}
}
func BenchmarkMasterIndexLookupMultipleIndexUnknown(b *testing.B) {
lookupBh := restic.NewRandomBlobHandle()
mIdx, _ := createRandomMasterIndex(b, rand.New(rand.NewSource(0)), 100, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mIdx.Lookup(lookupBh)
}
}
func BenchmarkMasterIndexLookupParallel(b *testing.B) {
for _, numindices := range []int{25, 50, 100} {
var lookupBh restic.BlobHandle
b.StopTimer()
rng := rand.New(rand.NewSource(0))
mIdx, lookupBh := createRandomMasterIndex(b, rng, numindices, 10000)
b.StartTimer()
name := fmt.Sprintf("known,indices=%d", numindices)
b.Run(name, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mIdx.Lookup(lookupBh)
}
})
})
lookupBh = restic.NewRandomBlobHandle()
name = fmt.Sprintf("unknown,indices=%d", numindices)
b.Run(name, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mIdx.Lookup(lookupBh)
}
})
})
}
}
func BenchmarkMasterIndexLookupBlobSize(b *testing.B) {
rng := rand.New(rand.NewSource(0))
mIdx, lookupBh := createRandomMasterIndex(b, rand.New(rng), 5, 200000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mIdx.LookupSize(lookupBh)
}
}
func BenchmarkMasterIndexEach(b *testing.B) {
rng := rand.New(rand.NewSource(0))
mIdx, _ := createRandomMasterIndex(b, rand.New(rng), 5, 200000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
entries := 0
rtest.OK(b, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
entries++
}))
}
}
func BenchmarkMasterIndexGC(b *testing.B) {
mIdx, _ := createRandomMasterIndex(b, rand.New(rand.NewSource(0)), 100, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
runtime.GC()
}
runtime.KeepAlive(mIdx)
}
var (
snapshotTime = time.Unix(1470492820, 207401672)
depth = 3
)
func createFilledRepo(t testing.TB, snapshots int, version uint) restic.Repository {
repo, _ := repository.TestRepositoryWithVersion(t, version)
for i := 0; i < snapshots; i++ {
restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth)
}
return repo
}
func TestIndexSave(t *testing.T) {
repository.TestAllVersions(t, testIndexSave)
}
func testIndexSave(t *testing.T, version uint) {
for _, test := range []struct {
name string
saver func(idx *index.MasterIndex, repo restic.Repository) error
}{
{"rewrite no-op", func(idx *index.MasterIndex, repo restic.Repository) error {
return idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{})
}},
{"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Repository) error {
return idx.Rewrite(context.TODO(), repo, nil, restic.NewIDSet(), nil, index.MasterIndexRewriteOpts{})
}},
{"SaveFallback", func(idx *index.MasterIndex, repo restic.Repository) error {
err := restic.ParallelRemove(context.TODO(), repo, idx.IDs(), restic.IndexFile, nil, nil)
if err != nil {
return nil
}
return idx.SaveFallback(context.TODO(), repo, restic.NewIDSet(), nil)
}},
} {
t.Run(test.name, func(t *testing.T) {
repo := createFilledRepo(t, 3, version)
idx := index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
blobs := make(map[restic.PackedBlob]struct{})
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobs[pb] = struct{}{}
}))
rtest.OK(t, test.saver(idx, repo))
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := blobs[pb]; ok {
delete(blobs, pb)
} else {
t.Fatalf("unexpected blobs %v", pb)
}
}))
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
checker.TestCheckRepo(t, repo, false)
})
}
}
func TestIndexSavePartial(t *testing.T) {
repository.TestAllVersions(t, testIndexSavePartial)
}
func testIndexSavePartial(t *testing.T, version uint) {
repo := createFilledRepo(t, 3, version)
// capture blob list before adding fourth snapshot
idx := index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
blobs := make(map[restic.PackedBlob]struct{})
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobs[pb] = struct{}{}
}))
// add+remove new snapshot and track its pack files
packsBefore := listPacks(t, repo)
sn := restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(4)*time.Second), depth)
rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, *sn.ID()))
packsAfter := listPacks(t, repo)
newPacks := packsAfter.Sub(packsBefore)
// rewrite index and remove pack files of new snapshot
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Rewrite(context.TODO(), repo, newPacks, nil, nil, index.MasterIndexRewriteOpts{}))
// check blobs
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := blobs[pb]; ok {
delete(blobs, pb)
} else {
t.Fatalf("unexpected blobs %v", pb)
}
}))
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
// remove pack files to make check happy
rtest.OK(t, restic.ParallelRemove(context.TODO(), repo, newPacks, restic.PackFile, nil, nil))
checker.TestCheckRepo(t, repo, false)
}
func listPacks(t testing.TB, repo restic.Lister) restic.IDSet {
s := restic.NewIDSet()
rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, _ int64) error {
s.Insert(id)
return nil
}))
return s
}

View File

@@ -0,0 +1,21 @@
package index
import (
"testing"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
func TestMergeIndex(t testing.TB, mi *MasterIndex) ([]*Index, int, restic.IDSet) {
finalIndexes := mi.finalizeNotFinalIndexes()
ids := restic.NewIDSet()
for _, idx := range finalIndexes {
id := restic.NewRandomID()
ids.Insert(id)
test.OK(t, idx.SetID(id))
}
test.OK(t, mi.MergeFinalIndexes())
return finalIndexes, len(mi.idx), ids
}