mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-26 03:25:35 +00:00
6ada33db77
It is possible that upon a cold-start, we enqueue a partial file for deletion that is resumed shortly after startup. If the file transfer happens to last longer than deleteDelay, we will delete the partial file, which is unfortunate. The client spent a long time uploading a file, only for it to be accidentally deleted. It's a very rare race, but also a frustrating one if it happens to manifest. Fix the code to only delete partial files that do not have an active puts against it. We also fix a minor bug in ResumeReader where we read b[:blockSize] instead of b[:cs.Size]. The former is the fixed size of 64KiB, while the latter is usually 64KiB, but may be less for the last block. Updates tailscale/corp#14772 Signed-off-by: Joe Tsai <joetsai@digital-static.net>
169 lines
4.9 KiB
Go
169 lines
4.9 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package taildrop
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"os"
|
|
"slices"
|
|
"strings"
|
|
)
|
|
|
|
var (
|
|
blockSize = int64(64 << 10)
|
|
hashAlgorithm = "sha256"
|
|
)
|
|
|
|
// BlockChecksum represents the checksum for a single block.
|
|
type BlockChecksum struct {
|
|
Checksum Checksum `json:"checksum"`
|
|
Algorithm string `json:"algo"` // always "sha256" for now
|
|
Size int64 `json:"size"` // always (64<<10) for now
|
|
}
|
|
|
|
// Checksum is an opaque checksum that is comparable.
|
|
type Checksum struct{ cs [sha256.Size]byte }
|
|
|
|
func hash(b []byte) Checksum {
|
|
return Checksum{sha256.Sum256(b)}
|
|
}
|
|
func (cs Checksum) String() string {
|
|
return hex.EncodeToString(cs.cs[:])
|
|
}
|
|
func (cs Checksum) AppendText(b []byte) ([]byte, error) {
|
|
return hexAppendEncode(b, cs.cs[:]), nil
|
|
}
|
|
func (cs Checksum) MarshalText() ([]byte, error) {
|
|
return hexAppendEncode(nil, cs.cs[:]), nil
|
|
}
|
|
func (cs *Checksum) UnmarshalText(b []byte) error {
|
|
if len(b) != 2*len(cs.cs) {
|
|
return fmt.Errorf("invalid hex length: %d", len(b))
|
|
}
|
|
_, err := hex.Decode(cs.cs[:], b)
|
|
return err
|
|
}
|
|
|
|
// TODO(https://go.dev/issue/53693): Use hex.AppendEncode instead.
|
|
func hexAppendEncode(dst, src []byte) []byte {
|
|
n := hex.EncodedLen(len(src))
|
|
dst = slices.Grow(dst, n)
|
|
hex.Encode(dst[len(dst):][:n], src)
|
|
return dst[:len(dst)+n]
|
|
}
|
|
|
|
// PartialFiles returns a list of partial files in [Handler.Dir]
|
|
// that were sent (or is actively being sent) by the provided id.
|
|
func (m *Manager) PartialFiles(id ClientID) (ret []string, err error) {
|
|
if m == nil || m.opts.Dir == "" {
|
|
return nil, ErrNoTaildrop
|
|
}
|
|
if m.opts.DirectFileMode && m.opts.AvoidFinalRename {
|
|
return nil, nil // resuming is not supported for users that peek at our file structure
|
|
}
|
|
|
|
suffix := id.partialSuffix()
|
|
if err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool {
|
|
if name := de.Name(); strings.HasSuffix(name, suffix) {
|
|
ret = append(ret, name)
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return ret, redactError(err)
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
// HashPartialFile returns a function that hashes the next block in the file,
|
|
// starting from the beginning of the file.
|
|
// It returns (BlockChecksum{}, io.EOF) when the stream is complete.
|
|
// It is the caller's responsibility to call close.
|
|
func (m *Manager) HashPartialFile(id ClientID, baseName string) (next func() (BlockChecksum, error), close func() error, err error) {
|
|
if m == nil || m.opts.Dir == "" {
|
|
return nil, nil, ErrNoTaildrop
|
|
}
|
|
noopNext := func() (BlockChecksum, error) { return BlockChecksum{}, io.EOF }
|
|
noopClose := func() error { return nil }
|
|
if m.opts.DirectFileMode && m.opts.AvoidFinalRename {
|
|
return noopNext, noopClose, nil // resuming is not supported for users that peek at our file structure
|
|
}
|
|
|
|
dstFile, err := joinDir(m.opts.Dir, baseName)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
f, err := os.Open(dstFile + id.partialSuffix())
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return noopNext, noopClose, nil
|
|
}
|
|
return nil, nil, redactError(err)
|
|
}
|
|
|
|
b := make([]byte, blockSize) // TODO: Pool this?
|
|
next = func() (BlockChecksum, error) {
|
|
switch n, err := io.ReadFull(f, b); {
|
|
case err != nil && err != io.EOF && err != io.ErrUnexpectedEOF:
|
|
return BlockChecksum{}, redactError(err)
|
|
case n == 0:
|
|
return BlockChecksum{}, io.EOF
|
|
default:
|
|
return BlockChecksum{hash(b[:n]), hashAlgorithm, int64(n)}, nil
|
|
}
|
|
}
|
|
close = f.Close
|
|
return next, close, nil
|
|
}
|
|
|
|
// ResumeReader reads and discards the leading content of r
|
|
// that matches the content based on the checksums that exist.
|
|
// It returns the number of bytes consumed,
|
|
// and returns an [io.Reader] representing the remaining content.
|
|
func ResumeReader(r io.Reader, hashNext func() (BlockChecksum, error)) (int64, io.Reader, error) {
|
|
if hashNext == nil {
|
|
return 0, r, nil
|
|
}
|
|
|
|
var offset int64
|
|
b := make([]byte, 0, blockSize)
|
|
for {
|
|
// Obtain the next block checksum from the remote peer.
|
|
cs, err := hashNext()
|
|
switch {
|
|
case err == io.EOF:
|
|
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
|
case err != nil:
|
|
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
|
case cs.Algorithm != hashAlgorithm || cs.Size < 0 || cs.Size > blockSize:
|
|
return offset, io.MultiReader(bytes.NewReader(b), r), fmt.Errorf("invalid block size or hashing algorithm")
|
|
}
|
|
|
|
// Read the contents of the next block.
|
|
n, err := io.ReadFull(r, b[:cs.Size])
|
|
b = b[:n]
|
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
err = nil
|
|
}
|
|
if len(b) == 0 || err != nil {
|
|
// This should not occur in practice.
|
|
// It implies that an error occurred reading r,
|
|
// or that the partial file on the remote side is fully complete.
|
|
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
|
}
|
|
|
|
// Compare the local and remote block checksums.
|
|
// If it mismatches, then resume from this point.
|
|
if cs.Checksum != hash(b) {
|
|
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
|
}
|
|
offset += int64(len(b))
|
|
b = b[:0]
|
|
}
|
|
}
|