mirror of
https://github.com/tailscale/tailscale.git
synced 2025-03-28 03:52:35 +00:00
taildrop: switch hashing to be streaming based (#9861)
While the previous logic was correct, it did not perform well. Resuming is a dance between the client and server, where 1. the client requests hashes for a partial file, 2. the server then computes those hashes, 3. the client computes hashes locally and compares them. 4. goto 1 while the partial file still has data While step 2 is running, the client is sitting idle. While step 3 is running, the server is sitting idle. By streaming over the block hash immediately after the server computes it, the client can start checking the hash, while the server works on the next hash (in a pipelined manner). This performs dramatically better and also uses less memory as we don't need to hold a list of hashes, but only need to handle one hash at a time. There are two detriments to this approach: * The HTTP API relies on a JSON stream, which is not a standard REST-like pattern. However, since we implement both client and server, this is fine. * While the stream is on-going, we hold an open file handle on the server side while the file is being hashed. On really slow streams, this could hold a file open forever. Updates tailscale/corp#14772 Signed-off-by: Joe Tsai <joetsai@digital-static.net> Co-authored-by: Rhea Ghosh <rhea@tailscale.com>
This commit is contained in:
parent
7971333603
commit
3f27087e9d
@ -649,34 +649,46 @@ func (h *peerAPIHandler) handlePeerPut(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, taildrop.ErrInvalidFileName.Error(), http.StatusBadRequest)
|
http.Error(w, taildrop.ErrInvalidFileName.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case "GET":
|
case "GET":
|
||||||
var resp any
|
|
||||||
var err error
|
|
||||||
id := taildrop.ClientID(h.peerNode.StableID())
|
id := taildrop.ClientID(h.peerNode.StableID())
|
||||||
|
|
||||||
if prefix == "" {
|
if prefix == "" {
|
||||||
resp, err = h.ps.taildrop.PartialFiles(id)
|
// List all the partial files.
|
||||||
} else {
|
files, err := h.ps.taildrop.PartialFiles(id)
|
||||||
ranges, ok := httphdr.ParseRange(r.Header.Get("Range"))
|
if err != nil {
|
||||||
if !ok || len(ranges) != 1 || ranges[0].Length < 0 {
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
http.Error(w, "invalid Range header", http.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
offset := ranges[0].Start
|
if err := enc.Encode(files); err != nil {
|
||||||
length := ranges[0].Length
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
if length == 0 {
|
h.logf("json.Encoder.Encode error: %v", err)
|
||||||
length = -1 // httphdr.Range.Length == 0 implies reading the rest of file
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Stream all the block hashes for the specified file.
|
||||||
|
next, close, err := h.ps.taildrop.HashPartialFile(id, baseName)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer close()
|
||||||
|
for {
|
||||||
|
switch cs, err := next(); {
|
||||||
|
case err == io.EOF:
|
||||||
|
return
|
||||||
|
case err != nil:
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
h.logf("HashPartialFile.next error: %v", err)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
if err := enc.Encode(cs); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
h.logf("json.Encoder.Encode error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
resp, err = h.ps.taildrop.HashPartialFile(id, baseName, offset, length)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
case "PUT":
|
case "PUT":
|
||||||
t0 := h.ps.b.clock.Now()
|
t0 := h.ps.b.clock.Now()
|
||||||
|
@ -1320,38 +1320,36 @@ func (h *Handler) serveFilePut(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Before we PUT a file we check to see if there are any existing partial file and if so,
|
// Before we PUT a file we check to see if there are any existing partial file and if so,
|
||||||
// we resume the upload from where we left off by sending the remaining file instead of
|
// we resume the upload from where we left off by sending the remaining file instead of
|
||||||
// the full file.
|
// the full file.
|
||||||
offset, remainingBody, err := taildrop.ResumeReader(r.Body, func(offset, length int64) (taildrop.FileChecksums, error) {
|
var offset int64
|
||||||
client := &http.Client{
|
var resumeDuration time.Duration
|
||||||
Transport: h.b.Dialer().PeerAPITransport(),
|
remainingBody := io.Reader(r.Body)
|
||||||
Timeout: 10 * time.Second,
|
client := &http.Client{
|
||||||
}
|
Transport: h.b.Dialer().PeerAPITransport(),
|
||||||
req, err := http.NewRequestWithContext(r.Context(), "GET", dstURL.String()+"/v0/put/"+filenameEscaped, nil)
|
Timeout: 10 * time.Second,
|
||||||
if err != nil {
|
}
|
||||||
return taildrop.FileChecksums{}, err
|
req, err := http.NewRequestWithContext(r.Context(), "GET", dstURL.String()+"/v0/put/"+filenameEscaped, nil)
|
||||||
}
|
|
||||||
|
|
||||||
rangeHdr, ok := httphdr.FormatRange([]httphdr.Range{{Start: offset, Length: length}})
|
|
||||||
if !ok {
|
|
||||||
return taildrop.FileChecksums{}, fmt.Errorf("invalid offset and length")
|
|
||||||
}
|
|
||||||
req.Header.Set("Range", rangeHdr)
|
|
||||||
switch resp, err := client.Do(req); {
|
|
||||||
case err != nil:
|
|
||||||
return taildrop.FileChecksums{}, err
|
|
||||||
case resp.StatusCode == http.StatusMethodNotAllowed || resp.StatusCode == http.StatusNotFound:
|
|
||||||
return taildrop.FileChecksums{}, nil // implies remote peer on older version
|
|
||||||
case resp.StatusCode != http.StatusOK:
|
|
||||||
return taildrop.FileChecksums{}, fmt.Errorf("unexpected status code %d", resp.StatusCode)
|
|
||||||
default:
|
|
||||||
var checksums taildrop.FileChecksums
|
|
||||||
err = json.NewDecoder(resp.Body).Decode(&checksums)
|
|
||||||
return checksums, err
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// ResumeReader ensures that the returned offset and reader are consistent
|
http.Error(w, "bogus peer URL", http.StatusInternalServerError)
|
||||||
// even if an error is encountered. Thus, we can still proceed.
|
return
|
||||||
h.logf("reader could not be fully resumed: %v", err)
|
}
|
||||||
|
switch resp, err := client.Do(req); {
|
||||||
|
case err != nil:
|
||||||
|
h.logf("could not fetch remote hashes: %v", err)
|
||||||
|
case resp.StatusCode == http.StatusMethodNotAllowed || resp.StatusCode == http.StatusNotFound:
|
||||||
|
// noop; implies older peerapi without resume support
|
||||||
|
case resp.StatusCode != http.StatusOK:
|
||||||
|
h.logf("fetch remote hashes status code: %d", resp.StatusCode)
|
||||||
|
default:
|
||||||
|
resumeStart := time.Now()
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
offset, remainingBody, err = taildrop.ResumeReader(r.Body, func() (out taildrop.BlockChecksum, err error) {
|
||||||
|
err = dec.Decode(&out)
|
||||||
|
return out, err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
h.logf("reader could not be fully resumed: %v", err)
|
||||||
|
}
|
||||||
|
resumeDuration = time.Since(resumeStart).Round(time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
outReq, err := http.NewRequestWithContext(r.Context(), "PUT", "http://peer/v0/put/"+filenameEscaped, remainingBody)
|
outReq, err := http.NewRequestWithContext(r.Context(), "PUT", "http://peer/v0/put/"+filenameEscaped, remainingBody)
|
||||||
@ -1361,6 +1359,7 @@ func (h *Handler) serveFilePut(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
outReq.ContentLength = r.ContentLength
|
outReq.ContentLength = r.ContentLength
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
|
h.logf("resuming put at offset %d after %v", offset, resumeDuration)
|
||||||
rangeHdr, _ := httphdr.FormatRange([]httphdr.Range{{offset, 0}})
|
rangeHdr, _ := httphdr.FormatRange([]httphdr.Range{{offset, 0}})
|
||||||
outReq.Header.Set("Range", rangeHdr)
|
outReq.Header.Set("Range", rangeHdr)
|
||||||
if outReq.ContentLength >= 0 {
|
if outReq.ContentLength >= 0 {
|
||||||
|
@ -20,23 +20,11 @@ var (
|
|||||||
hashAlgorithm = "sha256"
|
hashAlgorithm = "sha256"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileChecksums represents checksums into partially received file.
|
// BlockChecksum represents the checksum for a single block.
|
||||||
type FileChecksums struct {
|
type BlockChecksum struct {
|
||||||
// Offset is the offset into the file.
|
Checksum Checksum `json:"checksum"`
|
||||||
Offset int64 `json:"offset"`
|
Algorithm string `json:"algo"` // always "sha256" for now
|
||||||
// Length is the length of content being hashed in the file.
|
Size int64 `json:"size"` // always (64<<10) for now
|
||||||
Length int64 `json:"length"`
|
|
||||||
// Checksums is a list of checksums of BlockSize-sized blocks
|
|
||||||
// starting from Offset. The number of checksums is the Length
|
|
||||||
// divided by BlockSize rounded up to the nearest integer.
|
|
||||||
// All blocks except for the last one are guaranteed to be checksums
|
|
||||||
// over BlockSize-sized blocks.
|
|
||||||
Checksums []Checksum `json:"checksums"`
|
|
||||||
// Algorithm is the hashing algorithm used to compute checksums.
|
|
||||||
Algorithm string `json:"algorithm"` // always "sha256" for now
|
|
||||||
// BlockSize is the size of each block.
|
|
||||||
// The last block may be smaller than this, but never zero.
|
|
||||||
BlockSize int64 `json:"blockSize"` // always (64<<10) for now
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checksum is an opaque checksum that is comparable.
|
// Checksum is an opaque checksum that is comparable.
|
||||||
@ -92,113 +80,89 @@ func (m *Manager) PartialFiles(id ClientID) (ret []string, err error) {
|
|||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HashPartialFile hashes the contents of a partial file sent by id,
|
// HashPartialFile returns a function that hashes the next block in the file,
|
||||||
// starting at the specified offset and for the specified length.
|
// starting from the beginning of the file.
|
||||||
// If length is negative, it hashes the entire file.
|
// It returns (BlockChecksum{}, io.EOF) when the stream is complete.
|
||||||
// If the length exceeds the remaining file length, then it hashes until EOF.
|
// It is the caller's responsibility to call close.
|
||||||
// If [FileHashes.Length] is less than length and no error occurred,
|
func (m *Manager) HashPartialFile(id ClientID, baseName string) (next func() (BlockChecksum, error), close func() error, err error) {
|
||||||
// then it implies that all remaining content in the file has been hashed.
|
|
||||||
func (m *Manager) HashPartialFile(id ClientID, baseName string, offset, length int64) (FileChecksums, error) {
|
|
||||||
if m == nil || m.opts.Dir == "" {
|
if m == nil || m.opts.Dir == "" {
|
||||||
return FileChecksums{}, ErrNoTaildrop
|
return nil, nil, ErrNoTaildrop
|
||||||
}
|
}
|
||||||
|
noopNext := func() (BlockChecksum, error) { return BlockChecksum{}, io.EOF }
|
||||||
|
noopClose := func() error { return nil }
|
||||||
if m.opts.DirectFileMode && m.opts.AvoidFinalRename {
|
if m.opts.DirectFileMode && m.opts.AvoidFinalRename {
|
||||||
return FileChecksums{}, nil // resuming is not supported for users that peek at our file structure
|
return noopNext, noopClose, nil // resuming is not supported for users that peek at our file structure
|
||||||
}
|
}
|
||||||
|
|
||||||
dstFile, err := joinDir(m.opts.Dir, baseName)
|
dstFile, err := joinDir(m.opts.Dir, baseName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return FileChecksums{}, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
f, err := os.Open(dstFile + id.partialSuffix())
|
f, err := os.Open(dstFile + id.partialSuffix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return FileChecksums{}, nil
|
return noopNext, noopClose, nil
|
||||||
}
|
}
|
||||||
return FileChecksums{}, redactError(err)
|
return nil, nil, redactError(err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
||||||
return FileChecksums{}, redactError(err)
|
|
||||||
}
|
|
||||||
checksums := FileChecksums{
|
|
||||||
Offset: offset,
|
|
||||||
Algorithm: hashAlgorithm,
|
|
||||||
BlockSize: blockSize,
|
|
||||||
}
|
|
||||||
b := make([]byte, blockSize) // TODO: Pool this?
|
b := make([]byte, blockSize) // TODO: Pool this?
|
||||||
r := io.Reader(f)
|
next = func() (BlockChecksum, error) {
|
||||||
if length >= 0 {
|
switch n, err := io.ReadFull(f, b); {
|
||||||
r = io.LimitReader(f, length)
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
switch n, err := io.ReadFull(r, b); {
|
|
||||||
case err != nil && err != io.EOF && err != io.ErrUnexpectedEOF:
|
case err != nil && err != io.EOF && err != io.ErrUnexpectedEOF:
|
||||||
return checksums, redactError(err)
|
return BlockChecksum{}, redactError(err)
|
||||||
case n == 0:
|
case n == 0:
|
||||||
return checksums, nil
|
return BlockChecksum{}, io.EOF
|
||||||
default:
|
default:
|
||||||
checksums.Checksums = append(checksums.Checksums, hash(b[:n]))
|
return BlockChecksum{hash(b[:n]), hashAlgorithm, int64(n)}, nil
|
||||||
checksums.Length += int64(n)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
close = f.Close
|
||||||
|
return next, close, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResumeReader reads and discards the leading content of r
|
// ResumeReader reads and discards the leading content of r
|
||||||
// that matches the content based on the checksums that exist.
|
// that matches the content based on the checksums that exist.
|
||||||
// It returns the number of bytes consumed,
|
// It returns the number of bytes consumed,
|
||||||
// and returns an [io.Reader] representing the remaining content.
|
// and returns an [io.Reader] representing the remaining content.
|
||||||
func ResumeReader(r io.Reader, hashFile func(offset, length int64) (FileChecksums, error)) (int64, io.Reader, error) {
|
func ResumeReader(r io.Reader, hashNext func() (BlockChecksum, error)) (int64, io.Reader, error) {
|
||||||
if hashFile == nil {
|
if hashNext == nil {
|
||||||
return 0, r, nil
|
return 0, r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ask for checksums of a particular content length,
|
|
||||||
// where the amount of memory needed to represent the checksums themselves
|
|
||||||
// is exactly equal to the blockSize.
|
|
||||||
numBlocks := blockSize / sha256.Size
|
|
||||||
hashLength := blockSize * numBlocks
|
|
||||||
|
|
||||||
var offset int64
|
var offset int64
|
||||||
b := make([]byte, 0, blockSize)
|
b := make([]byte, 0, blockSize)
|
||||||
for {
|
for {
|
||||||
// Request a list of checksums for the partial file starting at offset.
|
// Obtain the next block checksum from the remote peer.
|
||||||
checksums, err := hashFile(offset, hashLength)
|
cs, err := hashNext()
|
||||||
if len(checksums.Checksums) == 0 || err != nil {
|
switch {
|
||||||
|
case err == io.EOF:
|
||||||
|
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
||||||
|
case err != nil:
|
||||||
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
||||||
} else if checksums.BlockSize != blockSize || checksums.Algorithm != hashAlgorithm {
|
case cs.Algorithm != hashAlgorithm || cs.Size < 0 || cs.Size > blockSize:
|
||||||
return offset, io.MultiReader(bytes.NewReader(b), r), fmt.Errorf("invalid block size or hashing algorithm")
|
return offset, io.MultiReader(bytes.NewReader(b), r), fmt.Errorf("invalid block size or hashing algorithm")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read from r, comparing each block with the provided checksums.
|
// Read the contents of the next block.
|
||||||
for _, want := range checksums.Checksums {
|
n, err := io.ReadFull(r, b[:blockSize])
|
||||||
// Read a block from r.
|
b = b[:n]
|
||||||
n, err := io.ReadFull(r, b[:blockSize])
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
b = b[:n]
|
err = nil
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
}
|
||||||
err = nil
|
if len(b) == 0 || err != nil {
|
||||||
}
|
// This should not occur in practice.
|
||||||
if len(b) == 0 || err != nil {
|
// It implies that an error occurred reading r,
|
||||||
// This should not occur in practice.
|
// or that the partial file on the remote side is fully complete.
|
||||||
// It implies that an error occurred reading r,
|
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
||||||
// or that the partial file on the remote side is fully complete.
|
|
||||||
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare the local and remote block checksums.
|
|
||||||
// If it mismatches, then resume from this point.
|
|
||||||
got := hash(b)
|
|
||||||
if got != want {
|
|
||||||
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
|
||||||
}
|
|
||||||
offset += int64(len(b))
|
|
||||||
b = b[:0]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We hashed the remainder of the partial file, so stop.
|
// Compare the local and remote block checksums.
|
||||||
if checksums.Length < hashLength {
|
// If it mismatches, then resume from this point.
|
||||||
|
if cs.Checksum != hash(b) {
|
||||||
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
||||||
}
|
}
|
||||||
|
offset += int64(len(b))
|
||||||
|
b = b[:0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,11 +26,12 @@ func TestResume(t *testing.T) {
|
|||||||
want := make([]byte, 12345)
|
want := make([]byte, 12345)
|
||||||
must.Get(io.ReadFull(rn, want))
|
must.Get(io.ReadFull(rn, want))
|
||||||
|
|
||||||
t.Run("resume-noop", func(t *testing.T) {
|
t.Run("resume-noexist", func(t *testing.T) {
|
||||||
r := io.Reader(bytes.NewReader(want))
|
r := io.Reader(bytes.NewReader(want))
|
||||||
offset, r, err := ResumeReader(r, func(offset, length int64) (FileChecksums, error) {
|
next, close, err := m.HashPartialFile("", "foo")
|
||||||
return m.HashPartialFile("", "foo", offset, length)
|
must.Do(err)
|
||||||
})
|
defer close()
|
||||||
|
offset, r, err := ResumeReader(r, next)
|
||||||
must.Do(err)
|
must.Do(err)
|
||||||
must.Get(m.PutFile("", "foo", r, offset, -1))
|
must.Get(m.PutFile("", "foo", r, offset, -1))
|
||||||
got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo"))))
|
got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo"))))
|
||||||
@ -43,9 +44,10 @@ func TestResume(t *testing.T) {
|
|||||||
rn := rand.New(rand.NewSource(0))
|
rn := rand.New(rand.NewSource(0))
|
||||||
for {
|
for {
|
||||||
r := io.Reader(bytes.NewReader(want))
|
r := io.Reader(bytes.NewReader(want))
|
||||||
offset, r, err := ResumeReader(r, func(offset, length int64) (FileChecksums, error) {
|
next, close, err := m.HashPartialFile("", "foo")
|
||||||
return m.HashPartialFile("", "foo", offset, length)
|
must.Do(err)
|
||||||
})
|
defer close()
|
||||||
|
offset, r, err := ResumeReader(r, next)
|
||||||
must.Do(err)
|
must.Do(err)
|
||||||
numWant := rn.Int63n(min(int64(len(want))-offset, 1000) + 1)
|
numWant := rn.Int63n(min(int64(len(want))-offset, 1000) + 1)
|
||||||
if offset < int64(len(want)) {
|
if offset < int64(len(want)) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user