mirror of
https://github.com/restic/restic.git
synced 2025-12-03 20:11:52 +00:00
repository: implement pack compression
This commit is contained in:
@@ -32,7 +32,7 @@ func NewPacker(k *crypto.Key, wr io.Writer) *Packer {
|
||||
|
||||
// Add saves the data read from rd as a new blob to the packer. Returned is the
|
||||
// number of bytes written to the pack.
|
||||
func (p *Packer) Add(t restic.BlobType, id restic.ID, data []byte) (int, error) {
|
||||
func (p *Packer) Add(t restic.BlobType, id restic.ID, data []byte, uncompressedLength int) (int, error) {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
|
||||
@@ -41,20 +41,23 @@ func (p *Packer) Add(t restic.BlobType, id restic.ID, data []byte) (int, error)
|
||||
n, err := p.wr.Write(data)
|
||||
c.Length = uint(n)
|
||||
c.Offset = p.bytes
|
||||
c.UncompressedLength = uint(uncompressedLength)
|
||||
p.bytes += uint(n)
|
||||
p.blobs = append(p.blobs, c)
|
||||
|
||||
return n, errors.Wrap(err, "Write")
|
||||
}
|
||||
|
||||
var entrySize = uint(binary.Size(restic.BlobType(0)) + headerLengthSize + len(restic.ID{}))
|
||||
var entrySize = uint(binary.Size(restic.BlobType(0)) + 2*headerLengthSize + len(restic.ID{}))
|
||||
var plainEntrySize = uint(binary.Size(restic.BlobType(0)) + headerLengthSize + len(restic.ID{}))
|
||||
|
||||
// headerEntry describes the format of header entries. It serves only as
|
||||
// documentation.
|
||||
type headerEntry struct {
|
||||
Type uint8
|
||||
Length uint32
|
||||
ID restic.ID
|
||||
Type uint8
|
||||
Length uint32
|
||||
ID restic.ID
|
||||
CompressedLength uint32
|
||||
}
|
||||
|
||||
// Finalize writes the header for all added blobs and finalizes the pack.
|
||||
@@ -70,7 +73,7 @@ func (p *Packer) Finalize() (uint, error) {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
encryptedHeader := make([]byte, 0, len(header)+p.k.Overhead()+p.k.NonceSize())
|
||||
encryptedHeader := make([]byte, 0, restic.CiphertextLength(len(header)))
|
||||
nonce := crypto.NewRandomNonce()
|
||||
encryptedHeader = append(encryptedHeader, nonce...)
|
||||
encryptedHeader = p.k.Seal(encryptedHeader, nonce, header, nil)
|
||||
@@ -81,7 +84,7 @@ func (p *Packer) Finalize() (uint, error) {
|
||||
return 0, errors.Wrap(err, "Write")
|
||||
}
|
||||
|
||||
hdrBytes := restic.CiphertextLength(len(header))
|
||||
hdrBytes := len(encryptedHeader)
|
||||
if n != hdrBytes {
|
||||
return 0, errors.New("wrong number of bytes written")
|
||||
}
|
||||
@@ -104,11 +107,15 @@ func (p *Packer) makeHeader() ([]byte, error) {
|
||||
buf := make([]byte, 0, len(p.blobs)*int(entrySize))
|
||||
|
||||
for _, b := range p.blobs {
|
||||
switch b.Type {
|
||||
case restic.DataBlob:
|
||||
switch {
|
||||
case b.Type == restic.DataBlob && b.UncompressedLength == 0:
|
||||
buf = append(buf, 0)
|
||||
case restic.TreeBlob:
|
||||
case b.Type == restic.TreeBlob && b.UncompressedLength == 0:
|
||||
buf = append(buf, 1)
|
||||
case b.Type == restic.DataBlob && b.UncompressedLength != 0:
|
||||
buf = append(buf, 2)
|
||||
case b.Type == restic.TreeBlob && b.UncompressedLength != 0:
|
||||
buf = append(buf, 3)
|
||||
default:
|
||||
return nil, errors.Errorf("invalid blob type %v", b.Type)
|
||||
}
|
||||
@@ -116,6 +123,10 @@ func (p *Packer) makeHeader() ([]byte, error) {
|
||||
var lenLE [4]byte
|
||||
binary.LittleEndian.PutUint32(lenLE[:], uint32(b.Length))
|
||||
buf = append(buf, lenLE[:]...)
|
||||
if b.UncompressedLength != 0 {
|
||||
binary.LittleEndian.PutUint32(lenLE[:], uint32(b.UncompressedLength))
|
||||
buf = append(buf, lenLE[:]...)
|
||||
}
|
||||
buf = append(buf, b.ID[:]...)
|
||||
}
|
||||
|
||||
@@ -152,7 +163,7 @@ func (p *Packer) String() string {
|
||||
|
||||
var (
|
||||
// we require at least one entry in the header, and one blob for a pack file
|
||||
minFileSize = entrySize + crypto.Extension + uint(headerLengthSize)
|
||||
minFileSize = plainEntrySize + crypto.Extension + uint(headerLengthSize)
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -167,16 +178,11 @@ const (
|
||||
eagerEntries = 15
|
||||
)
|
||||
|
||||
// readRecords reads up to max records from the underlying ReaderAt, returning
|
||||
// the raw header, the total number of records in the header, and any error.
|
||||
// If the header contains fewer than max entries, the header is truncated to
|
||||
// readRecords reads up to bufsize bytes from the underlying ReaderAt, returning
|
||||
// the raw header, the total number of bytes in the header, and any error.
|
||||
// If the header contains fewer than bufsize bytes, the header is truncated to
|
||||
// the appropriate size.
|
||||
func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
|
||||
var bufsize int
|
||||
bufsize += max * int(entrySize)
|
||||
bufsize += crypto.Extension
|
||||
bufsize += headerLengthSize
|
||||
|
||||
func readRecords(rd io.ReaderAt, size int64, bufsize int) ([]byte, int, error) {
|
||||
if bufsize > int(size) {
|
||||
bufsize = int(size)
|
||||
}
|
||||
@@ -197,8 +203,6 @@ func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
|
||||
err = InvalidFileError{Message: "header length is zero"}
|
||||
case hlen < crypto.Extension:
|
||||
err = InvalidFileError{Message: "header length is too small"}
|
||||
case (hlen-crypto.Extension)%uint32(entrySize) != 0:
|
||||
err = InvalidFileError{Message: "header length is invalid"}
|
||||
case int64(hlen) > size-int64(headerLengthSize):
|
||||
err = InvalidFileError{Message: "header is larger than file"}
|
||||
case int64(hlen) > MaxHeaderSize-int64(headerLengthSize):
|
||||
@@ -208,8 +212,8 @@ func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
|
||||
return nil, 0, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
total := (int(hlen) - crypto.Extension) / int(entrySize)
|
||||
if total < max {
|
||||
total := int(hlen + headerLengthSize)
|
||||
if total < bufsize {
|
||||
// truncate to the beginning of the pack header
|
||||
b = b[len(b)-int(hlen):]
|
||||
}
|
||||
@@ -230,11 +234,12 @@ func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
|
||||
// eagerly download eagerEntries header entries as part of header-length request.
|
||||
// only make second request if actual number of entries is greater than eagerEntries
|
||||
|
||||
b, c, err := readRecords(rd, size, eagerEntries)
|
||||
eagerSize := eagerEntries*int(entrySize) + headerSize
|
||||
b, c, err := readRecords(rd, size, eagerSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c <= eagerEntries {
|
||||
if c <= eagerSize {
|
||||
// eager read sufficed, return what we got
|
||||
return b, nil
|
||||
}
|
||||
@@ -262,7 +267,7 @@ func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdr
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if len(buf) < k.NonceSize()+k.Overhead() {
|
||||
if len(buf) < restic.CiphertextLength(0) {
|
||||
return nil, 0, errors.New("invalid header, too small")
|
||||
}
|
||||
|
||||
@@ -274,11 +279,12 @@ func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdr
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
entries = make([]restic.Blob, 0, uint(len(buf))/entrySize)
|
||||
// might over allocate a bit if all blobs have EntrySize but only by a few percent
|
||||
entries = make([]restic.Blob, 0, uint(len(buf))/plainEntrySize)
|
||||
|
||||
pos := uint(0)
|
||||
for len(buf) > 0 {
|
||||
entry, err := parseHeaderEntry(buf)
|
||||
entry, headerSize, err := parseHeaderEntry(buf)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
@@ -286,36 +292,60 @@ func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdr
|
||||
|
||||
entries = append(entries, entry)
|
||||
pos += entry.Length
|
||||
buf = buf[entrySize:]
|
||||
buf = buf[headerSize:]
|
||||
}
|
||||
|
||||
return entries, hdrSize, nil
|
||||
}
|
||||
|
||||
func parseHeaderEntry(p []byte) (b restic.Blob, err error) {
|
||||
if uint(len(p)) < entrySize {
|
||||
func parseHeaderEntry(p []byte) (b restic.Blob, size uint, err error) {
|
||||
l := uint(len(p))
|
||||
size = plainEntrySize
|
||||
if l < plainEntrySize {
|
||||
err = errors.Errorf("parseHeaderEntry: buffer of size %d too short", len(p))
|
||||
return b, err
|
||||
return b, size, err
|
||||
}
|
||||
p = p[:entrySize]
|
||||
tpe := p[0]
|
||||
|
||||
switch p[0] {
|
||||
case 0:
|
||||
switch tpe {
|
||||
case 0, 2:
|
||||
b.Type = restic.DataBlob
|
||||
case 1:
|
||||
case 1, 3:
|
||||
b.Type = restic.TreeBlob
|
||||
default:
|
||||
return b, errors.Errorf("invalid type %d", p[0])
|
||||
return b, size, errors.Errorf("invalid type %d", tpe)
|
||||
}
|
||||
|
||||
b.Length = uint(binary.LittleEndian.Uint32(p[1:5]))
|
||||
copy(b.ID[:], p[5:])
|
||||
p = p[5:]
|
||||
if tpe == 2 || tpe == 3 {
|
||||
size = entrySize
|
||||
if l < entrySize {
|
||||
err = errors.Errorf("parseHeaderEntry: buffer of size %d too short", len(p))
|
||||
return b, size, err
|
||||
}
|
||||
b.UncompressedLength = uint(binary.LittleEndian.Uint32(p[0:4]))
|
||||
p = p[4:]
|
||||
}
|
||||
|
||||
return b, nil
|
||||
copy(b.ID[:], p[:])
|
||||
|
||||
return b, size, nil
|
||||
}
|
||||
|
||||
func CalculateEntrySize(blob restic.Blob) int {
|
||||
if blob.UncompressedLength != 0 {
|
||||
return int(entrySize)
|
||||
}
|
||||
return int(plainEntrySize)
|
||||
}
|
||||
|
||||
func CalculateHeaderSize(blobs []restic.Blob) int {
|
||||
return headerSize + len(blobs)*int(entrySize)
|
||||
size := headerSize
|
||||
for _, blob := range blobs {
|
||||
size += CalculateEntrySize(blob)
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
// Size returns the size of all packs computed by index information.
|
||||
@@ -333,7 +363,7 @@ func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.I
|
||||
if !onlyHdr {
|
||||
size += int64(blob.Length)
|
||||
}
|
||||
packSize[blob.PackID] = size + int64(entrySize)
|
||||
packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob))
|
||||
}
|
||||
|
||||
return packSize
|
||||
|
||||
@@ -23,9 +23,10 @@ func TestParseHeaderEntry(t *testing.T) {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = binary.Write(buf, binary.LittleEndian, &h)
|
||||
|
||||
b, err := parseHeaderEntry(buf.Bytes())
|
||||
b, size, err := parseHeaderEntry(buf.Bytes())
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, restic.DataBlob, b.Type)
|
||||
rtest.Equals(t, plainEntrySize, size)
|
||||
t.Logf("%v %v", h.ID, b.ID)
|
||||
rtest.Assert(t, bytes.Equal(h.ID[:], b.ID[:]), "id mismatch")
|
||||
rtest.Equals(t, uint(h.Length), b.Length)
|
||||
@@ -34,14 +35,14 @@ func TestParseHeaderEntry(t *testing.T) {
|
||||
buf.Reset()
|
||||
_ = binary.Write(buf, binary.LittleEndian, &h)
|
||||
|
||||
b, err = parseHeaderEntry(buf.Bytes())
|
||||
b, _, err = parseHeaderEntry(buf.Bytes())
|
||||
rtest.Assert(t, err != nil, "no error for invalid type")
|
||||
|
||||
h.Type = 0
|
||||
buf.Reset()
|
||||
_ = binary.Write(buf, binary.LittleEndian, &h)
|
||||
|
||||
b, err = parseHeaderEntry(buf.Bytes()[:entrySize-1])
|
||||
b, _, err = parseHeaderEntry(buf.Bytes()[:plainEntrySize-1])
|
||||
rtest.Assert(t, err != nil, "no error for short input")
|
||||
}
|
||||
|
||||
@@ -97,7 +98,8 @@ func TestReadHeaderEagerLoad(t *testing.T) {
|
||||
func TestReadRecords(t *testing.T) {
|
||||
testReadRecords := func(dataSize, entryCount, totalRecords int) {
|
||||
totalHeader := rtest.Random(0, totalRecords*int(entrySize)+crypto.Extension)
|
||||
off := len(totalHeader) - (entryCount*int(entrySize) + crypto.Extension)
|
||||
bufSize := entryCount*int(entrySize) + crypto.Extension
|
||||
off := len(totalHeader) - bufSize
|
||||
if off < 0 {
|
||||
off = 0
|
||||
}
|
||||
@@ -110,10 +112,10 @@ func TestReadRecords(t *testing.T) {
|
||||
|
||||
rd := bytes.NewReader(buf.Bytes())
|
||||
|
||||
header, count, err := readRecords(rd, int64(rd.Len()), entryCount)
|
||||
header, count, err := readRecords(rd, int64(rd.Len()), bufSize+4)
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, len(totalHeader)+4, count)
|
||||
rtest.Equals(t, expectedHeader, header)
|
||||
rtest.Equals(t, totalRecords, count)
|
||||
}
|
||||
|
||||
// basic
|
||||
|
||||
@@ -38,7 +38,7 @@ func newPack(t testing.TB, k *crypto.Key, lengths []int) ([]Buf, []byte, uint) {
|
||||
var buf bytes.Buffer
|
||||
p := pack.NewPacker(k, &buf)
|
||||
for _, b := range bufs {
|
||||
_, err := p.Add(restic.TreeBlob, b.id, b.data)
|
||||
_, err := p.Add(restic.TreeBlob, b.id, b.data, 2*len(b.data))
|
||||
rtest.OK(t, err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user