taildrop: improve the functionality and reliability of put (#9762)

Changes made:
* Move all HTTP related functionality from taildrop to ipnlocal.
* Add two arguments to taildrop.Manager.PutFile to specify
  an opaque client ID and a resume offset (both unused for now).
* Cleanup the logic of taildrop.Manager.PutFile
  to be easier to follow.
* Implement file conflict handling where duplicate files are renamed
  (e.g., "IMG_1234.jpg" -> "IMG_1234 (2).jpg").
* Implement file de-duplication where "renaming" a partial file
  simply deletes it if it already exists with the same contents.
* Detect conflicting active puts where a second concurrent put
  results in an error.

Updates tailscale/corp#14772

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
Co-authored-by: Rhea Ghosh <rhea@tailscale.com>
This commit is contained in:
Joe Tsai
2023-10-12 09:28:46 -07:00
committed by GitHub
parent 1294b89792
commit 37c646d9d3
7 changed files with 409 additions and 187 deletions

View File

@@ -4,11 +4,10 @@
package taildrop
import (
"crypto/sha256"
"errors"
"io"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
@@ -17,10 +16,14 @@ import (
"tailscale.com/version/distro"
)
type incomingFileKey struct {
id ClientID
name string // e.g., "foo.jpeg"
}
type incomingFile struct {
clock tstime.Clock
name string // "foo.jpg"
started time.Time
size int64 // or -1 if unknown; never 0
w io.Writer // underlying writer
@@ -33,13 +36,6 @@ type incomingFile struct {
lastNotify time.Time
}
func (f *incomingFile) markAndNotifyDone() {
f.mu.Lock()
f.done = true
f.mu.Unlock()
f.sendFileNotify()
}
func (f *incomingFile) Write(p []byte) (n int, err error) {
n, err = f.w.Write(p)
@@ -62,123 +58,197 @@ func (f *incomingFile) Write(p []byte) (n int, err error) {
return n, err
}
// HandlePut receives a file.
// It handles an HTTP PUT request to the "/v0/put/{filename}" endpoint,
// where {filename} is a base filename.
// It returns the number of bytes received and whether it was received successfully.
func (h *Handler) HandlePut(w http.ResponseWriter, r *http.Request) (finalSize int64, success bool) {
if !envknob.CanTaildrop() {
http.Error(w, "Taildrop disabled on device", http.StatusForbidden)
return finalSize, success
// PutFile stores a file into [Manager.Dir] from a given client id.
// The baseName must be a base filename without any slashes.
// The length is the expected length of content to read from r,
// it may be negative to indicate that it is unknown.
//
// If there is a failure reading from r, then the partial file is not deleted
// for some period of time. The [Manager.PartialFiles] and [Manager.HashPartialFile]
// methods may be used to list all partial files and to compute the hash for a
// specific partial file. This allows the client to determine whether to resume
// a partial file. While resuming, PutFile may be called again with a non-zero
// offset to specify where to resume receiving data at.
func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, length int64) (int64, error) {
switch {
case m == nil || m.Dir == "":
return 0, ErrNoTaildrop
case !envknob.CanTaildrop():
return 0, ErrNoTaildrop
case distro.Get() == distro.Unraid && !m.DirectFileMode:
return 0, ErrNotAccessible
}
if r.Method != "PUT" {
http.Error(w, "expected method PUT", http.StatusMethodNotAllowed)
return finalSize, success
}
if h == nil || h.Dir == "" {
http.Error(w, errNoTaildrop.Error(), http.StatusInternalServerError)
return finalSize, success
}
if distro.Get() == distro.Unraid && !h.DirectFileMode {
http.Error(w, "Taildrop folder not configured or accessible", http.StatusInternalServerError)
return finalSize, success
}
rawPath := r.URL.EscapedPath()
suffix, ok := strings.CutPrefix(rawPath, "/v0/put/")
dstPath, ok := m.joinDir(baseName)
if !ok {
http.Error(w, "misconfigured internals", http.StatusInternalServerError)
return finalSize, success
}
if suffix == "" {
http.Error(w, "empty filename", http.StatusBadRequest)
return finalSize, success
}
if strings.Contains(suffix, "/") {
http.Error(w, "directories not supported", http.StatusBadRequest)
return finalSize, success
}
baseName, err := url.PathUnescape(suffix)
if err != nil {
http.Error(w, "bad path encoding", http.StatusBadRequest)
return finalSize, success
}
dstFile, ok := h.diskPath(baseName)
if !ok {
http.Error(w, "bad filename", http.StatusBadRequest)
return finalSize, success
}
// TODO(bradfitz): prevent same filename being sent by two peers at once
// prevent same filename being sent twice
if _, err := os.Stat(dstFile); err == nil {
http.Error(w, "file exists", http.StatusConflict)
return finalSize, success
return 0, ErrInvalidFileName
}
partialFile := dstFile + partialSuffix
f, err := os.Create(partialFile)
if err != nil {
h.Logf("put Create error: %v", redactErr(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
redactAndLogError := func(action string, err error) error {
err = redactErr(err)
m.Logf("put %v error: %v", action, err)
return err
}
defer func() {
if !success {
os.Remove(partialFile)
}
}()
var inFile *incomingFile
sendFileNotify := h.SendFileNotify
avoidPartialRename := m.DirectFileMode && m.AvoidFinalRename
if avoidPartialRename {
// Users using AvoidFinalRename are depending on the exact filename
// of the partial files. So avoid injecting the id into it.
id = ""
}
// Check whether there is an in-progress transfer for the file.
sendFileNotify := m.SendFileNotify
if sendFileNotify == nil {
sendFileNotify = func() {} // avoid nil panics below
}
if r.ContentLength != 0 {
inFile = &incomingFile{
clock: h.Clock,
name: baseName,
started: h.Clock.Now(),
size: r.ContentLength,
w: f,
partialPath := dstPath + id.partialSuffix()
inFileKey := incomingFileKey{id, baseName}
inFile, loaded := m.incomingFiles.LoadOrInit(inFileKey, func() *incomingFile {
inFile := &incomingFile{
clock: m.Clock,
started: m.Clock.Now(),
size: length,
sendFileNotify: sendFileNotify,
}
if h.DirectFileMode {
inFile.partialPath = partialFile
if m.DirectFileMode {
inFile.partialPath = partialPath
}
h.incomingFiles.Store(inFile, struct{}{})
defer h.incomingFiles.Delete(inFile)
n, err := io.Copy(inFile, r.Body)
return inFile
})
if loaded {
return 0, ErrFileExists
}
defer m.incomingFiles.Delete(inFileKey)
// Create (if not already) the partial file with read-write permissions.
f, err := os.OpenFile(partialPath, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return 0, redactAndLogError("Create", err)
}
defer func() {
f.Close() // best-effort to cleanup dangling file handles
if err != nil {
err = redactErr(err)
f.Close()
h.Logf("put Copy error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
if avoidPartialRename {
os.Remove(partialPath) // best-effort
return
}
// TODO: We need to delete partialPath eventually.
// However, this must be done after some period of time.
}
finalSize = n
}
if err := redactErr(f.Close()); err != nil {
h.Logf("put Close error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
}
if h.DirectFileMode && h.AvoidFinalRename {
if inFile != nil { // non-zero length; TODO: notify even for zero length
inFile.markAndNotifyDone()
}()
inFile.w = f
// A positive offset implies that we are resuming an existing file.
// Seek to the appropriate offset and truncate the file.
if offset != 0 {
currLength, err := f.Seek(0, io.SeekEnd)
if err != nil {
return 0, redactAndLogError("Seek", err)
}
} else {
if err := os.Rename(partialFile, dstFile); err != nil {
err = redactErr(err)
h.Logf("put final rename: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
if offset < 0 || offset > currLength {
return 0, redactAndLogError("Seek", err)
}
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return 0, redactAndLogError("Seek", err)
}
if err := f.Truncate(offset); err != nil {
return 0, redactAndLogError("Truncate", err)
}
}
// TODO: set modtime
// TODO: some real response
success = true
io.WriteString(w, "{}\n")
h.knownEmpty.Store(false)
// Copy the contents of the file.
copyLength, err := io.Copy(inFile, r)
if err != nil {
return 0, redactAndLogError("Copy", err)
}
if length >= 0 && copyLength != length {
return 0, redactAndLogError("Copy", errors.New("copied an unexpected number of bytes"))
}
if err := f.Close(); err != nil {
return 0, redactAndLogError("Close", err)
}
fileLength := offset + copyLength
// Return early for avoidPartialRename since users of AvoidFinalRename
// are depending on the exact naming of partial files.
if avoidPartialRename {
inFile.mu.Lock()
inFile.done = true
inFile.mu.Unlock()
m.knownEmpty.Store(false)
sendFileNotify()
return fileLength, nil
}
// File has been successfully received, rename the partial file
// to the final destination filename. If a file of that name already exists,
// then try multiple times with variations of the filename.
computePartialSum := sync.OnceValues(func() ([sha256.Size]byte, error) {
return sha256File(partialPath)
})
maxRetries := 10
for ; maxRetries > 0; maxRetries-- {
// Atomically rename the partial file as the destination file if it doesn't exist.
// Otherwise, it returns the length of the current destination file.
// The operation is atomic.
dstLength, err := func() (int64, error) {
m.renameMu.Lock()
defer m.renameMu.Unlock()
switch fi, err := os.Stat(dstPath); {
case os.IsNotExist(err):
return -1, os.Rename(partialPath, dstPath)
case err != nil:
return -1, err
default:
return fi.Size(), nil
}
}()
if err != nil {
return 0, redactAndLogError("Rename", err)
}
if dstLength < 0 {
break // we successfully renamed; so stop
}
// Avoid the final rename if a destination file has the same contents.
if dstLength == fileLength {
partialSum, err := computePartialSum()
if err != nil {
return 0, redactAndLogError("Rename", err)
}
dstSum, err := sha256File(dstPath)
if err != nil {
return 0, redactAndLogError("Rename", err)
}
if dstSum == partialSum {
if err := os.Remove(partialPath); err != nil {
return 0, redactAndLogError("Remove", err)
}
break // we successfully found a content match; so stop
}
}
// Choose a new destination filename and try again.
dstPath = NextFilename(dstPath)
}
if maxRetries <= 0 {
return 0, errors.New("too many retries trying to rename partial file")
}
m.knownEmpty.Store(false)
sendFileNotify()
return finalSize, success
return fileLength, nil
}
func sha256File(file string) (out [sha256.Size]byte, err error) {
h := sha256.New()
f, err := os.Open(file)
if err != nil {
return out, err
}
defer f.Close()
if _, err := io.Copy(h, f); err != nil {
return out, err
}
return [sha256.Size]byte(h.Sum(nil)), nil
}