{ipn/ipnlocal, taildrop}: move put logic to taildrop (#9680)

Cleaning up taildrop logic for sending files.

Updates tailscale/corp#14772

Signed-off-by: Rhea Ghosh <rhea@tailscale.com>
Co-authored-by: Joe Tsai <joetsai@digital-static.net>
This commit is contained in:
Rhea Ghosh
2023-10-06 09:47:03 -05:00
committed by GitHub
parent c761d102ea
commit 557ddced6c
7 changed files with 312 additions and 283 deletions

View File

@@ -25,7 +25,7 @@ func (s *Handler) HasFilesWaiting() bool {
if s == nil || s.RootDir == "" || s.DirectFileMode {
return false
}
if s.KnownEmpty.Load() {
if s.knownEmpty.Load() {
// Optimization: this is usually empty, so avoid opening
// the directory and checking. We can't cache the actual
// has-files-or-not values as the macOS/iOS client might
@@ -66,7 +66,7 @@ func (s *Handler) HasFilesWaiting() bool {
}
}
if err == io.EOF {
s.KnownEmpty.Store(true)
s.knownEmpty.Store(true)
}
if err != nil {
break

182
taildrop/send.go Normal file
View File

@@ -0,0 +1,182 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package taildrop
import (
"io"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"tailscale.com/envknob"
"tailscale.com/tstime"
"tailscale.com/version/distro"
)
type incomingFile struct {
clock tstime.Clock
name string // "foo.jpg"
started time.Time
size int64 // or -1 if unknown; never 0
w io.Writer // underlying writer
sendFileNotify func() // called when done
partialPath string // non-empty in direct mode
mu sync.Mutex
copied int64
done bool
lastNotify time.Time
}
func (f *incomingFile) markAndNotifyDone() {
f.mu.Lock()
f.done = true
f.mu.Unlock()
f.sendFileNotify()
}
func (f *incomingFile) Write(p []byte) (n int, err error) {
n, err = f.w.Write(p)
var needNotify bool
defer func() {
if needNotify {
f.sendFileNotify()
}
}()
if n > 0 {
f.mu.Lock()
defer f.mu.Unlock()
f.copied += int64(n)
now := f.clock.Now()
if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second {
f.lastNotify = now
needNotify = true
}
}
return n, err
}
// HandlePut receives a file.
// It returns the number of bytes received and whether it was received successfully.
func (h *Handler) HandlePut(w http.ResponseWriter, r *http.Request) (finalSize int64, success bool) {
if !envknob.CanTaildrop() {
http.Error(w, "Taildrop disabled on device", http.StatusForbidden)
return finalSize, success
}
if r.Method != "PUT" {
http.Error(w, "expected method PUT", http.StatusMethodNotAllowed)
return finalSize, success
}
if h == nil || h.RootDir == "" {
http.Error(w, ErrNoTaildrop.Error(), http.StatusInternalServerError)
return finalSize, success
}
if distro.Get() == distro.Unraid && !h.DirectFileMode {
http.Error(w, "Taildrop folder not configured or accessible", http.StatusInternalServerError)
return finalSize, success
}
rawPath := r.URL.EscapedPath()
suffix, ok := strings.CutPrefix(rawPath, "/v0/put/")
if !ok {
http.Error(w, "misconfigured internals", http.StatusInternalServerError)
return finalSize, success
}
if suffix == "" {
http.Error(w, "empty filename", http.StatusBadRequest)
return finalSize, success
}
if strings.Contains(suffix, "/") {
http.Error(w, "directories not supported", http.StatusBadRequest)
return finalSize, success
}
baseName, err := url.PathUnescape(suffix)
if err != nil {
http.Error(w, "bad path encoding", http.StatusBadRequest)
return finalSize, success
}
dstFile, ok := h.DiskPath(baseName)
if !ok {
http.Error(w, "bad filename", 400)
return finalSize, success
}
// TODO(bradfitz): prevent same filename being sent by two peers at once
// prevent same filename being sent twice
if _, err := os.Stat(dstFile); err == nil {
http.Error(w, "file exists", http.StatusConflict)
return finalSize, success
}
partialFile := dstFile + PartialSuffix
f, err := os.Create(partialFile)
if err != nil {
h.Logf("put Create error: %v", RedactErr(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
}
defer func() {
if !success {
os.Remove(partialFile)
}
}()
var inFile *incomingFile
sendFileNotify := h.SendFileNotify
if sendFileNotify == nil {
sendFileNotify = func() {} // avoid nil panics below
}
if r.ContentLength != 0 {
inFile = &incomingFile{
clock: h.Clock,
name: baseName,
started: h.Clock.Now(),
size: r.ContentLength,
w: f,
sendFileNotify: sendFileNotify,
}
if h.DirectFileMode {
inFile.partialPath = partialFile
}
h.incomingFiles.Store(inFile, struct{}{})
defer h.incomingFiles.Delete(inFile)
n, err := io.Copy(inFile, r.Body)
if err != nil {
err = RedactErr(err)
f.Close()
h.Logf("put Copy error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
}
finalSize = n
}
if err := RedactErr(f.Close()); err != nil {
h.Logf("put Close error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
}
if h.DirectFileMode && !h.DirectFileDoFinalRename {
if inFile != nil { // non-zero length; TODO: notify even for zero length
inFile.markAndNotifyDone()
}
} else {
if err := os.Rename(partialFile, dstFile); err != nil {
err = RedactErr(err)
h.Logf("put final rename: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return finalSize, success
}
}
// TODO: set modtime
// TODO: some real response
success = true
io.WriteString(w, "{}\n")
h.knownEmpty.Store(false)
sendFileNotify()
return finalSize, success
}

View File

@@ -15,6 +15,8 @@ import (
"unicode"
"unicode/utf8"
"tailscale.com/ipn"
"tailscale.com/syncs"
"tailscale.com/tstime"
"tailscale.com/types/logger"
"tailscale.com/util/multierr"
@@ -41,7 +43,14 @@ type Handler struct {
// it's received.
DirectFileDoFinalRename bool
KnownEmpty atomic.Bool
// SendFileNotify is called periodically while a file is actively
// receiving the contents for the file. There is a final call
// to the function when reception completes.
SendFileNotify func()
knownEmpty atomic.Bool
incomingFiles syncs.Map[*incomingFile, struct{}]
}
var (
@@ -113,6 +122,27 @@ func (s *Handler) DiskPath(baseName string) (fullPath string, ok bool) {
return filepath.Join(s.RootDir, baseName), true
}
func (s *Handler) IncomingFiles() []ipn.PartialFile {
// Make sure we always set n.IncomingFiles non-nil so it gets encoded
// in JSON to clients. They distinguish between empty and non-nil
// to know whether a Notify should be able about files.
files := make([]ipn.PartialFile, 0)
s.incomingFiles.Range(func(f *incomingFile, _ struct{}) bool {
f.mu.Lock()
defer f.mu.Unlock()
files = append(files, ipn.PartialFile{
Name: f.name,
Started: f.started,
DeclaredSize: f.size,
Received: f.copied,
PartialPath: f.partialPath,
Done: f.done,
})
return true
})
return files
}
type redactedErr struct {
msg string
inner error

88
taildrop/taildrop_test.go Normal file
View File

@@ -0,0 +1,88 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package taildrop
import (
"os"
"path/filepath"
"runtime"
"testing"
)
// Tests "foo.jpg.deleted" marks (for Windows).
func TestDeletedMarkers(t *testing.T) {
dir := t.TempDir()
h := &Handler{RootDir: dir}
nothingWaiting := func() {
t.Helper()
h.knownEmpty.Store(false)
if h.HasFilesWaiting() {
t.Fatal("unexpected files waiting")
}
}
touch := func(base string) {
t.Helper()
if err := TouchFile(filepath.Join(dir, base)); err != nil {
t.Fatal(err)
}
}
wantEmptyTempDir := func() {
t.Helper()
if fis, err := os.ReadDir(dir); err != nil {
t.Fatal(err)
} else if len(fis) > 0 && runtime.GOOS != "windows" {
for _, fi := range fis {
t.Errorf("unexpected file in tempdir: %q", fi.Name())
}
}
}
nothingWaiting()
wantEmptyTempDir()
touch("foo.jpg.deleted")
nothingWaiting()
wantEmptyTempDir()
touch("foo.jpg.deleted")
touch("foo.jpg")
nothingWaiting()
wantEmptyTempDir()
touch("foo.jpg.deleted")
touch("foo.jpg")
wf, err := h.WaitingFiles()
if err != nil {
t.Fatal(err)
}
if len(wf) != 0 {
t.Fatalf("WaitingFiles = %d; want 0", len(wf))
}
wantEmptyTempDir()
touch("foo.jpg.deleted")
touch("foo.jpg")
if rc, _, err := h.OpenFile("foo.jpg"); err == nil {
rc.Close()
t.Fatal("unexpected foo.jpg open")
}
wantEmptyTempDir()
// And verify basics still work in non-deleted cases.
touch("foo.jpg")
touch("bar.jpg.deleted")
if wf, err := h.WaitingFiles(); err != nil {
t.Error(err)
} else if len(wf) != 1 {
t.Errorf("WaitingFiles = %d; want 1", len(wf))
} else if wf[0].Name != "foo.jpg" {
t.Errorf("unexpected waiting file %+v", wf[0])
}
if rc, _, err := h.OpenFile("foo.jpg"); err != nil {
t.Fatal(err)
} else {
rc.Close()
}
}