mirror of
https://github.com/tailscale/tailscale.git
synced 2025-08-14 06:57:31 +00:00
taildrop: merge taildrop and feature/taildrop packages together
Fixes #15812 Change-Id: I3bf0666bf9e7a9caea5f0f99fdb0eb2812157608 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:

committed by
Brad Fitzpatrick

parent
068d5ab655
commit
5b597489bc
205
feature/taildrop/delete.go
Normal file
205
feature/taildrop/delete.go
Normal file
@@ -0,0 +1,205 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
||||
// deleteDelay is the amount of time to wait before we delete a file.
|
||||
// A shorter value ensures timely deletion of deleted and partial files, while
|
||||
// a longer value provides more opportunity for partial files to be resumed.
|
||||
const deleteDelay = time.Hour
|
||||
|
||||
// fileDeleter manages asynchronous deletion of files after deleteDelay.
|
||||
type fileDeleter struct {
|
||||
logf logger.Logf
|
||||
clock tstime.DefaultClock
|
||||
dir string
|
||||
event func(string) // called for certain events; for testing only
|
||||
|
||||
mu sync.Mutex
|
||||
queue list.List
|
||||
byName map[string]*list.Element
|
||||
|
||||
emptySignal chan struct{} // signal that the queue is empty
|
||||
group syncs.WaitGroup
|
||||
shutdownCtx context.Context
|
||||
shutdown context.CancelFunc
|
||||
}
|
||||
|
||||
// deleteFile is a specific file to delete after deleteDelay.
|
||||
type deleteFile struct {
|
||||
name string
|
||||
inserted time.Time
|
||||
}
|
||||
|
||||
func (d *fileDeleter) Init(m *manager, eventHook func(string)) {
|
||||
d.logf = m.opts.Logf
|
||||
d.clock = m.opts.Clock
|
||||
d.dir = m.opts.Dir
|
||||
d.event = eventHook
|
||||
|
||||
d.byName = make(map[string]*list.Element)
|
||||
d.emptySignal = make(chan struct{})
|
||||
d.shutdownCtx, d.shutdown = context.WithCancel(context.Background())
|
||||
|
||||
// From a cold-start, load the list of partial and deleted files.
|
||||
//
|
||||
// Only run this if we have ever received at least one file
|
||||
// to avoid ever touching the taildrop directory on systems (e.g., MacOS)
|
||||
// that pop up a security dialog window upon first access.
|
||||
if m.opts.State == nil {
|
||||
return
|
||||
}
|
||||
if b, _ := m.opts.State.ReadState(ipn.TaildropReceivedKey); len(b) == 0 {
|
||||
return
|
||||
}
|
||||
d.group.Go(func() {
|
||||
d.event("start full-scan")
|
||||
defer d.event("end full-scan")
|
||||
rangeDir(d.dir, func(de fs.DirEntry) bool {
|
||||
switch {
|
||||
case d.shutdownCtx.Err() != nil:
|
||||
return false // terminate early
|
||||
case !de.Type().IsRegular():
|
||||
return true
|
||||
case strings.HasSuffix(de.Name(), partialSuffix):
|
||||
// Only enqueue the file for deletion if there is no active put.
|
||||
nameID := strings.TrimSuffix(de.Name(), partialSuffix)
|
||||
if i := strings.LastIndexByte(nameID, '.'); i > 0 {
|
||||
key := incomingFileKey{clientID(nameID[i+len("."):]), nameID[:i]}
|
||||
m.incomingFiles.LoadFunc(key, func(_ *incomingFile, loaded bool) {
|
||||
if !loaded {
|
||||
d.Insert(de.Name())
|
||||
}
|
||||
})
|
||||
} else {
|
||||
d.Insert(de.Name())
|
||||
}
|
||||
case strings.HasSuffix(de.Name(), deletedSuffix):
|
||||
// Best-effort immediate deletion of deleted files.
|
||||
name := strings.TrimSuffix(de.Name(), deletedSuffix)
|
||||
if os.Remove(filepath.Join(d.dir, name)) == nil {
|
||||
if os.Remove(filepath.Join(d.dir, de.Name())) == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Otherwise, enqueue the file for later deletion.
|
||||
d.Insert(de.Name())
|
||||
}
|
||||
return true
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Insert enqueues baseName for eventual deletion.
|
||||
func (d *fileDeleter) Insert(baseName string) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if d.shutdownCtx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if _, ok := d.byName[baseName]; ok {
|
||||
return // already queued for deletion
|
||||
}
|
||||
d.byName[baseName] = d.queue.PushBack(&deleteFile{baseName, d.clock.Now()})
|
||||
if d.queue.Len() == 1 && d.shutdownCtx.Err() == nil {
|
||||
d.group.Go(func() { d.waitAndDelete(deleteDelay) })
|
||||
}
|
||||
}
|
||||
|
||||
// waitAndDelete is an asynchronous deletion goroutine.
|
||||
// At most one waitAndDelete routine is ever running at a time.
|
||||
// It is not started unless there is at least one file in the queue.
|
||||
func (d *fileDeleter) waitAndDelete(wait time.Duration) {
|
||||
tc, ch := d.clock.NewTimer(wait)
|
||||
defer tc.Stop() // cleanup the timer resource if we stop early
|
||||
d.event("start waitAndDelete")
|
||||
defer d.event("end waitAndDelete")
|
||||
select {
|
||||
case <-d.shutdownCtx.Done():
|
||||
case <-d.emptySignal:
|
||||
case now := <-ch:
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
// Iterate over all files to delete, and delete anything old enough.
|
||||
var next *list.Element
|
||||
var failed []*list.Element
|
||||
for elem := d.queue.Front(); elem != nil; elem = next {
|
||||
next = elem.Next()
|
||||
file := elem.Value.(*deleteFile)
|
||||
if now.Sub(file.inserted) < deleteDelay {
|
||||
break // everything after this is recently inserted
|
||||
}
|
||||
|
||||
// Delete the expired file.
|
||||
if name, ok := strings.CutSuffix(file.name, deletedSuffix); ok {
|
||||
if err := os.Remove(filepath.Join(d.dir, name)); err != nil && !os.IsNotExist(err) {
|
||||
d.logf("could not delete: %v", redactError(err))
|
||||
failed = append(failed, elem)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := os.Remove(filepath.Join(d.dir, file.name)); err != nil && !os.IsNotExist(err) {
|
||||
d.logf("could not delete: %v", redactError(err))
|
||||
failed = append(failed, elem)
|
||||
continue
|
||||
}
|
||||
d.queue.Remove(elem)
|
||||
delete(d.byName, file.name)
|
||||
d.event("deleted " + file.name)
|
||||
}
|
||||
for _, elem := range failed {
|
||||
elem.Value.(*deleteFile).inserted = now // retry after deleteDelay
|
||||
d.queue.MoveToBack(elem)
|
||||
}
|
||||
|
||||
// If there are still some files to delete, retry again later.
|
||||
if d.queue.Len() > 0 && d.shutdownCtx.Err() == nil {
|
||||
file := d.queue.Front().Value.(*deleteFile)
|
||||
retryAfter := deleteDelay - now.Sub(file.inserted)
|
||||
d.group.Go(func() { d.waitAndDelete(retryAfter) })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove dequeues baseName from eventual deletion.
|
||||
func (d *fileDeleter) Remove(baseName string) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if elem := d.byName[baseName]; elem != nil {
|
||||
d.queue.Remove(elem)
|
||||
delete(d.byName, baseName)
|
||||
// Signal to terminate any waitAndDelete goroutines.
|
||||
if d.queue.Len() == 0 {
|
||||
select {
|
||||
case <-d.shutdownCtx.Done():
|
||||
case d.emptySignal <- struct{}{}:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the deleter.
|
||||
// It blocks until all goroutines are stopped.
|
||||
func (d *fileDeleter) Shutdown() {
|
||||
d.mu.Lock() // acquire lock to ensure no new goroutines start after shutdown
|
||||
d.shutdown()
|
||||
d.mu.Unlock()
|
||||
d.group.Wait()
|
||||
}
|
152
feature/taildrop/delete_test.go
Normal file
152
feature/taildrop/delete_test.go
Normal file
@@ -0,0 +1,152 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/store/mem"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/util/must"
|
||||
)
|
||||
|
||||
func TestDeleter(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
must.Do(touchFile(filepath.Join(dir, "foo.partial")))
|
||||
must.Do(touchFile(filepath.Join(dir, "bar.partial")))
|
||||
must.Do(touchFile(filepath.Join(dir, "fizz")))
|
||||
must.Do(touchFile(filepath.Join(dir, "fizz.deleted")))
|
||||
must.Do(touchFile(filepath.Join(dir, "buzz.deleted"))) // lacks a matching "buzz" file
|
||||
|
||||
checkDirectory := func(want ...string) {
|
||||
t.Helper()
|
||||
var got []string
|
||||
for _, de := range must.Get(os.ReadDir(dir)) {
|
||||
got = append(got, de.Name())
|
||||
}
|
||||
slices.Sort(got)
|
||||
slices.Sort(want)
|
||||
if diff := cmp.Diff(got, want); diff != "" {
|
||||
t.Fatalf("directory mismatch (-got +want):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
clock := tstest.NewClock(tstest.ClockOpts{Start: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)})
|
||||
advance := func(d time.Duration) {
|
||||
t.Helper()
|
||||
t.Logf("advance: %v", d)
|
||||
clock.Advance(d)
|
||||
}
|
||||
|
||||
eventsChan := make(chan string, 1000)
|
||||
checkEvents := func(want ...string) {
|
||||
t.Helper()
|
||||
tm := time.NewTimer(10 * time.Second)
|
||||
defer tm.Stop()
|
||||
var got []string
|
||||
for range want {
|
||||
select {
|
||||
case event := <-eventsChan:
|
||||
t.Logf("event: %s", event)
|
||||
got = append(got, event)
|
||||
case <-tm.C:
|
||||
t.Fatalf("timed out waiting for event: got %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
slices.Sort(got)
|
||||
slices.Sort(want)
|
||||
if diff := cmp.Diff(got, want); diff != "" {
|
||||
t.Fatalf("events mismatch (-got +want):\n%s", diff)
|
||||
}
|
||||
}
|
||||
eventHook := func(event string) { eventsChan <- event }
|
||||
|
||||
var m manager
|
||||
var fd fileDeleter
|
||||
m.opts.Logf = t.Logf
|
||||
m.opts.Clock = tstime.DefaultClock{Clock: clock}
|
||||
m.opts.Dir = dir
|
||||
m.opts.State = must.Get(mem.New(nil, ""))
|
||||
must.Do(m.opts.State.WriteState(ipn.TaildropReceivedKey, []byte{1}))
|
||||
fd.Init(&m, eventHook)
|
||||
defer fd.Shutdown()
|
||||
insert := func(name string) {
|
||||
t.Helper()
|
||||
t.Logf("insert: %v", name)
|
||||
fd.Insert(name)
|
||||
}
|
||||
remove := func(name string) {
|
||||
t.Helper()
|
||||
t.Logf("remove: %v", name)
|
||||
fd.Remove(name)
|
||||
}
|
||||
|
||||
checkEvents("start full-scan")
|
||||
checkEvents("end full-scan", "start waitAndDelete")
|
||||
checkDirectory("foo.partial", "bar.partial", "buzz.deleted")
|
||||
|
||||
advance(deleteDelay / 2)
|
||||
checkDirectory("foo.partial", "bar.partial", "buzz.deleted")
|
||||
advance(deleteDelay / 2)
|
||||
checkEvents("deleted foo.partial", "deleted bar.partial", "deleted buzz.deleted")
|
||||
checkEvents("end waitAndDelete")
|
||||
checkDirectory()
|
||||
|
||||
must.Do(touchFile(filepath.Join(dir, "one.partial")))
|
||||
insert("one.partial")
|
||||
checkEvents("start waitAndDelete")
|
||||
advance(deleteDelay / 4)
|
||||
must.Do(touchFile(filepath.Join(dir, "two.partial")))
|
||||
insert("two.partial")
|
||||
advance(deleteDelay / 4)
|
||||
must.Do(touchFile(filepath.Join(dir, "three.partial")))
|
||||
insert("three.partial")
|
||||
advance(deleteDelay / 4)
|
||||
must.Do(touchFile(filepath.Join(dir, "four.partial")))
|
||||
insert("four.partial")
|
||||
|
||||
advance(deleteDelay / 4)
|
||||
checkEvents("deleted one.partial")
|
||||
checkDirectory("two.partial", "three.partial", "four.partial")
|
||||
checkEvents("end waitAndDelete", "start waitAndDelete")
|
||||
|
||||
advance(deleteDelay / 4)
|
||||
checkEvents("deleted two.partial")
|
||||
checkDirectory("three.partial", "four.partial")
|
||||
checkEvents("end waitAndDelete", "start waitAndDelete")
|
||||
|
||||
advance(deleteDelay / 4)
|
||||
checkEvents("deleted three.partial")
|
||||
checkDirectory("four.partial")
|
||||
checkEvents("end waitAndDelete", "start waitAndDelete")
|
||||
|
||||
advance(deleteDelay / 4)
|
||||
checkEvents("deleted four.partial")
|
||||
checkDirectory()
|
||||
checkEvents("end waitAndDelete")
|
||||
|
||||
insert("wuzz.partial")
|
||||
checkEvents("start waitAndDelete")
|
||||
remove("wuzz.partial")
|
||||
checkEvents("end waitAndDelete")
|
||||
}
|
||||
|
||||
// Test that the asynchronous full scan of the taildrop directory does not occur
|
||||
// on a cold start if taildrop has never received any files.
|
||||
func TestDeleterInitWithoutTaildrop(t *testing.T) {
|
||||
var m manager
|
||||
var fd fileDeleter
|
||||
m.opts.Logf = t.Logf
|
||||
m.opts.Dir = t.TempDir()
|
||||
m.opts.State = must.Get(mem.New(nil, ""))
|
||||
fd.Init(&m, func(event string) { t.Errorf("unexpected event: %v", event) })
|
||||
fd.Shutdown()
|
||||
}
|
@@ -22,7 +22,6 @@ import (
|
||||
"tailscale.com/ipn/ipnext"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/taildrop"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/types/empty"
|
||||
"tailscale.com/types/logger"
|
||||
@@ -72,7 +71,7 @@ type Extension struct {
|
||||
selfUID tailcfg.UserID
|
||||
capFileSharing bool
|
||||
fileWaiters set.HandleSet[context.CancelFunc] // of wake-up funcs
|
||||
mgr atomic.Pointer[taildrop.Manager] // mutex held to write; safe to read without lock;
|
||||
mgr atomic.Pointer[manager] // mutex held to write; safe to read without lock;
|
||||
// outgoingFiles keeps track of Taildrop outgoing files keyed to their OutgoingFile.ID
|
||||
outgoingFiles map[string]*ipn.OutgoingFile
|
||||
}
|
||||
@@ -113,7 +112,7 @@ func (e *Extension) onSelfChange(self tailcfg.NodeView) {
|
||||
osshare.SetFileSharingEnabled(e.capFileSharing, e.logf)
|
||||
}
|
||||
|
||||
func (e *Extension) setMgrLocked(mgr *taildrop.Manager) {
|
||||
func (e *Extension) setMgrLocked(mgr *manager) {
|
||||
if old := e.mgr.Swap(mgr); old != nil {
|
||||
old.Shutdown()
|
||||
}
|
||||
@@ -141,7 +140,7 @@ func (e *Extension) onChangeProfile(profile ipn.LoginProfileView, _ ipn.PrefsVie
|
||||
if fileRoot == "" {
|
||||
e.logf("no Taildrop directory configured")
|
||||
}
|
||||
e.setMgrLocked(taildrop.ManagerOptions{
|
||||
e.setMgrLocked(managerOptions{
|
||||
Logf: e.logf,
|
||||
Clock: tstime.DefaultClock{Clock: e.sb.Clock()},
|
||||
State: e.stateStore,
|
||||
@@ -191,10 +190,10 @@ func (e *Extension) hasCapFileSharing() bool {
|
||||
return e.capFileSharing
|
||||
}
|
||||
|
||||
// manager returns the active taildrop.Manager, or nil.
|
||||
// manager returns the active Manager, or nil.
|
||||
//
|
||||
// Methods on a nil Manager are safe to call.
|
||||
func (e *Extension) manager() *taildrop.Manager {
|
||||
func (e *Extension) manager() *manager {
|
||||
return e.mgr.Load()
|
||||
}
|
||||
|
||||
|
@@ -24,7 +24,6 @@ import (
|
||||
"tailscale.com/ipn/ipnlocal"
|
||||
"tailscale.com/ipn/localapi"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/taildrop"
|
||||
"tailscale.com/util/clientmetric"
|
||||
"tailscale.com/util/httphdr"
|
||||
"tailscale.com/util/mak"
|
||||
@@ -320,7 +319,7 @@ func singleFilePut(
|
||||
default:
|
||||
resumeStart := time.Now()
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
offset, remainingBody, err = taildrop.ResumeReader(body, func() (out taildrop.BlockChecksum, err error) {
|
||||
offset, remainingBody, err = resumeReader(body, func() (out blockChecksum, err error) {
|
||||
err = dec.Decode(&out)
|
||||
return out, err
|
||||
})
|
||||
|
@@ -14,7 +14,6 @@ import (
|
||||
|
||||
"tailscale.com/ipn/ipnlocal"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/taildrop"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/util/clientmetric"
|
||||
"tailscale.com/util/httphdr"
|
||||
@@ -49,7 +48,7 @@ func handlePeerPut(h ipnlocal.PeerAPIHandler, w http.ResponseWriter, r *http.Req
|
||||
// extensionForPut is the subset of taildrop extension that taildrop
|
||||
// file put needs. This is pulled out for testability.
|
||||
type extensionForPut interface {
|
||||
manager() *taildrop.Manager
|
||||
manager() *manager
|
||||
hasCapFileSharing() bool
|
||||
Clock() tstime.Clock
|
||||
}
|
||||
@@ -67,11 +66,11 @@ func handlePeerPutWithBackend(h ipnlocal.PeerAPIHandler, ext extensionForPut, w
|
||||
}
|
||||
|
||||
if !canPutFile(h) {
|
||||
http.Error(w, taildrop.ErrNoTaildrop.Error(), http.StatusForbidden)
|
||||
http.Error(w, ErrNoTaildrop.Error(), http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if !ext.hasCapFileSharing() {
|
||||
http.Error(w, taildrop.ErrNoTaildrop.Error(), http.StatusForbidden)
|
||||
http.Error(w, ErrNoTaildrop.Error(), http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
rawPath := r.URL.EscapedPath()
|
||||
@@ -82,13 +81,13 @@ func handlePeerPutWithBackend(h ipnlocal.PeerAPIHandler, ext extensionForPut, w
|
||||
}
|
||||
baseName, err := url.PathUnescape(prefix)
|
||||
if err != nil {
|
||||
http.Error(w, taildrop.ErrInvalidFileName.Error(), http.StatusBadRequest)
|
||||
http.Error(w, ErrInvalidFileName.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
enc := json.NewEncoder(w)
|
||||
switch r.Method {
|
||||
case "GET":
|
||||
id := taildrop.ClientID(h.Peer().StableID())
|
||||
id := clientID(h.Peer().StableID())
|
||||
if prefix == "" {
|
||||
// List all the partial files.
|
||||
files, err := taildropMgr.PartialFiles(id)
|
||||
@@ -128,7 +127,7 @@ func handlePeerPutWithBackend(h ipnlocal.PeerAPIHandler, ext extensionForPut, w
|
||||
}
|
||||
case "PUT":
|
||||
t0 := ext.Clock().Now()
|
||||
id := taildrop.ClientID(h.Peer().StableID())
|
||||
id := clientID(h.Peer().StableID())
|
||||
|
||||
var offset int64
|
||||
if rangeHdr := r.Header.Get("Range"); rangeHdr != "" {
|
||||
@@ -139,17 +138,17 @@ func handlePeerPutWithBackend(h ipnlocal.PeerAPIHandler, ext extensionForPut, w
|
||||
}
|
||||
offset = ranges[0].Start
|
||||
}
|
||||
n, err := taildropMgr.PutFile(taildrop.ClientID(fmt.Sprint(id)), baseName, r.Body, offset, r.ContentLength)
|
||||
n, err := taildropMgr.PutFile(clientID(fmt.Sprint(id)), baseName, r.Body, offset, r.ContentLength)
|
||||
switch err {
|
||||
case nil:
|
||||
d := ext.Clock().Since(t0).Round(time.Second / 10)
|
||||
h.Logf("got put of %s in %v from %v/%v", approxSize(n), d, h.RemoteAddr().Addr(), h.Peer().ComputedName)
|
||||
io.WriteString(w, "{}\n")
|
||||
case taildrop.ErrNoTaildrop:
|
||||
case ErrNoTaildrop:
|
||||
http.Error(w, err.Error(), http.StatusForbidden)
|
||||
case taildrop.ErrInvalidFileName:
|
||||
case ErrInvalidFileName:
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
case taildrop.ErrFileExists:
|
||||
case ErrFileExists:
|
||||
http.Error(w, err.Error(), http.StatusConflict)
|
||||
default:
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@@ -21,7 +21,6 @@ import (
|
||||
"tailscale.com/client/tailscale/apitype"
|
||||
"tailscale.com/ipn/ipnlocal"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/taildrop"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/types/logger"
|
||||
@@ -54,10 +53,10 @@ type fakeExtension struct {
|
||||
logf logger.Logf
|
||||
capFileSharing bool
|
||||
clock tstime.Clock
|
||||
taildrop *taildrop.Manager
|
||||
taildrop *manager
|
||||
}
|
||||
|
||||
func (lb *fakeExtension) manager() *taildrop.Manager {
|
||||
func (lb *fakeExtension) manager() *manager {
|
||||
return lb.taildrop
|
||||
}
|
||||
func (lb *fakeExtension) Clock() tstime.Clock { return lb.clock }
|
||||
@@ -66,7 +65,7 @@ func (lb *fakeExtension) hasCapFileSharing() bool {
|
||||
}
|
||||
|
||||
type peerAPITestEnv struct {
|
||||
taildrop *taildrop.Manager
|
||||
taildrop *manager
|
||||
ph *peerAPIHandler
|
||||
rr *httptest.ResponseRecorder
|
||||
logBuf tstest.MemLogger
|
||||
@@ -477,7 +476,7 @@ func TestHandlePeerAPI(t *testing.T) {
|
||||
}
|
||||
|
||||
var e peerAPITestEnv
|
||||
e.taildrop = taildrop.ManagerOptions{
|
||||
e.taildrop = managerOptions{
|
||||
Logf: e.logBuf.Logf,
|
||||
Dir: rootDir,
|
||||
}.New()
|
||||
@@ -526,7 +525,7 @@ func TestHandlePeerAPI(t *testing.T) {
|
||||
// a bit. So test that we work around that sufficiently.
|
||||
func TestFileDeleteRace(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
taildropMgr := taildrop.ManagerOptions{
|
||||
taildropMgr := managerOptions{
|
||||
Logf: t.Logf,
|
||||
Dir: dir,
|
||||
}.New()
|
||||
|
153
feature/taildrop/resume.go
Normal file
153
feature/taildrop/resume.go
Normal file
@@ -0,0 +1,153 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
blockSize = int64(64 << 10)
|
||||
hashAlgorithm = "sha256"
|
||||
)
|
||||
|
||||
// blockChecksum represents the checksum for a single block.
|
||||
type blockChecksum struct {
|
||||
Checksum checksum `json:"checksum"`
|
||||
Algorithm string `json:"algo"` // always "sha256" for now
|
||||
Size int64 `json:"size"` // always (64<<10) for now
|
||||
}
|
||||
|
||||
// checksum is an opaque checksum that is comparable.
|
||||
type checksum struct{ cs [sha256.Size]byte }
|
||||
|
||||
func hash(b []byte) checksum {
|
||||
return checksum{sha256.Sum256(b)}
|
||||
}
|
||||
func (cs checksum) String() string {
|
||||
return hex.EncodeToString(cs.cs[:])
|
||||
}
|
||||
func (cs checksum) AppendText(b []byte) ([]byte, error) {
|
||||
return hex.AppendEncode(b, cs.cs[:]), nil
|
||||
}
|
||||
func (cs checksum) MarshalText() ([]byte, error) {
|
||||
return hex.AppendEncode(nil, cs.cs[:]), nil
|
||||
}
|
||||
func (cs *checksum) UnmarshalText(b []byte) error {
|
||||
if len(b) != 2*len(cs.cs) {
|
||||
return fmt.Errorf("invalid hex length: %d", len(b))
|
||||
}
|
||||
_, err := hex.Decode(cs.cs[:], b)
|
||||
return err
|
||||
}
|
||||
|
||||
// PartialFiles returns a list of partial files in [Handler.Dir]
|
||||
// that were sent (or is actively being sent) by the provided id.
|
||||
func (m *manager) PartialFiles(id clientID) (ret []string, err error) {
|
||||
if m == nil || m.opts.Dir == "" {
|
||||
return nil, ErrNoTaildrop
|
||||
}
|
||||
|
||||
suffix := id.partialSuffix()
|
||||
if err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool {
|
||||
if name := de.Name(); strings.HasSuffix(name, suffix) {
|
||||
ret = append(ret, name)
|
||||
}
|
||||
return true
|
||||
}); err != nil {
|
||||
return ret, redactError(err)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// HashPartialFile returns a function that hashes the next block in the file,
|
||||
// starting from the beginning of the file.
|
||||
// It returns (BlockChecksum{}, io.EOF) when the stream is complete.
|
||||
// It is the caller's responsibility to call close.
|
||||
func (m *manager) HashPartialFile(id clientID, baseName string) (next func() (blockChecksum, error), close func() error, err error) {
|
||||
if m == nil || m.opts.Dir == "" {
|
||||
return nil, nil, ErrNoTaildrop
|
||||
}
|
||||
noopNext := func() (blockChecksum, error) { return blockChecksum{}, io.EOF }
|
||||
noopClose := func() error { return nil }
|
||||
|
||||
dstFile, err := joinDir(m.opts.Dir, baseName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
f, err := os.Open(dstFile + id.partialSuffix())
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return noopNext, noopClose, nil
|
||||
}
|
||||
return nil, nil, redactError(err)
|
||||
}
|
||||
|
||||
b := make([]byte, blockSize) // TODO: Pool this?
|
||||
next = func() (blockChecksum, error) {
|
||||
switch n, err := io.ReadFull(f, b); {
|
||||
case err != nil && err != io.EOF && err != io.ErrUnexpectedEOF:
|
||||
return blockChecksum{}, redactError(err)
|
||||
case n == 0:
|
||||
return blockChecksum{}, io.EOF
|
||||
default:
|
||||
return blockChecksum{hash(b[:n]), hashAlgorithm, int64(n)}, nil
|
||||
}
|
||||
}
|
||||
close = f.Close
|
||||
return next, close, nil
|
||||
}
|
||||
|
||||
// resumeReader reads and discards the leading content of r
|
||||
// that matches the content based on the checksums that exist.
|
||||
// It returns the number of bytes consumed,
|
||||
// and returns an [io.Reader] representing the remaining content.
|
||||
func resumeReader(r io.Reader, hashNext func() (blockChecksum, error)) (int64, io.Reader, error) {
|
||||
if hashNext == nil {
|
||||
return 0, r, nil
|
||||
}
|
||||
|
||||
var offset int64
|
||||
b := make([]byte, 0, blockSize)
|
||||
for {
|
||||
// Obtain the next block checksum from the remote peer.
|
||||
cs, err := hashNext()
|
||||
switch {
|
||||
case err == io.EOF:
|
||||
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
||||
case err != nil:
|
||||
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
||||
case cs.Algorithm != hashAlgorithm || cs.Size < 0 || cs.Size > blockSize:
|
||||
return offset, io.MultiReader(bytes.NewReader(b), r), fmt.Errorf("invalid block size or hashing algorithm")
|
||||
}
|
||||
|
||||
// Read the contents of the next block.
|
||||
n, err := io.ReadFull(r, b[:cs.Size])
|
||||
b = b[:n]
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
err = nil
|
||||
}
|
||||
if len(b) == 0 || err != nil {
|
||||
// This should not occur in practice.
|
||||
// It implies that an error occurred reading r,
|
||||
// or that the partial file on the remote side is fully complete.
|
||||
return offset, io.MultiReader(bytes.NewReader(b), r), err
|
||||
}
|
||||
|
||||
// Compare the local and remote block checksums.
|
||||
// If it mismatches, then resume from this point.
|
||||
if cs.Checksum != hash(b) {
|
||||
return offset, io.MultiReader(bytes.NewReader(b), r), nil
|
||||
}
|
||||
offset += int64(len(b))
|
||||
b = b[:0]
|
||||
}
|
||||
}
|
74
feature/taildrop/resume_test.go
Normal file
74
feature/taildrop/resume_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"testing/iotest"
|
||||
|
||||
"tailscale.com/util/must"
|
||||
)
|
||||
|
||||
func TestResume(t *testing.T) {
|
||||
oldBlockSize := blockSize
|
||||
defer func() { blockSize = oldBlockSize }()
|
||||
blockSize = 256
|
||||
|
||||
m := managerOptions{Logf: t.Logf, Dir: t.TempDir()}.New()
|
||||
defer m.Shutdown()
|
||||
|
||||
rn := rand.New(rand.NewSource(0))
|
||||
want := make([]byte, 12345)
|
||||
must.Get(io.ReadFull(rn, want))
|
||||
|
||||
t.Run("resume-noexist", func(t *testing.T) {
|
||||
r := io.Reader(bytes.NewReader(want))
|
||||
|
||||
next, close, err := m.HashPartialFile("", "foo")
|
||||
must.Do(err)
|
||||
defer close()
|
||||
offset, r, err := resumeReader(r, next)
|
||||
must.Do(err)
|
||||
must.Do(close()) // Windows wants the file handle to be closed to rename it.
|
||||
|
||||
must.Get(m.PutFile("", "foo", r, offset, -1))
|
||||
got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo"))))
|
||||
if !bytes.Equal(got, want) {
|
||||
t.Errorf("content mismatches")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("resume-retry", func(t *testing.T) {
|
||||
rn := rand.New(rand.NewSource(0))
|
||||
for i := 0; true; i++ {
|
||||
r := io.Reader(bytes.NewReader(want))
|
||||
|
||||
next, close, err := m.HashPartialFile("", "bar")
|
||||
must.Do(err)
|
||||
defer close()
|
||||
offset, r, err := resumeReader(r, next)
|
||||
must.Do(err)
|
||||
must.Do(close()) // Windows wants the file handle to be closed to rename it.
|
||||
|
||||
numWant := rn.Int63n(min(int64(len(want))-offset, 1000) + 1)
|
||||
if offset < int64(len(want)) {
|
||||
r = io.MultiReader(io.LimitReader(r, numWant), iotest.ErrReader(io.ErrClosedPipe))
|
||||
}
|
||||
if _, err := m.PutFile("", "bar", r, offset, -1); err == nil {
|
||||
break
|
||||
}
|
||||
if i > 1000 {
|
||||
t.Fatalf("too many iterations to complete the test")
|
||||
}
|
||||
}
|
||||
got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "bar"))))
|
||||
if !bytes.Equal(got, want) {
|
||||
t.Errorf("content mismatches")
|
||||
}
|
||||
})
|
||||
}
|
178
feature/taildrop/retrieve.go
Normal file
178
feature/taildrop/retrieve.go
Normal file
@@ -0,0 +1,178 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"tailscale.com/client/tailscale/apitype"
|
||||
"tailscale.com/logtail/backoff"
|
||||
)
|
||||
|
||||
// HasFilesWaiting reports whether any files are buffered in [Handler.Dir].
|
||||
// This always returns false when [Handler.DirectFileMode] is false.
|
||||
func (m *manager) HasFilesWaiting() (has bool) {
|
||||
if m == nil || m.opts.Dir == "" || m.opts.DirectFileMode {
|
||||
return false
|
||||
}
|
||||
|
||||
// Optimization: this is usually empty, so avoid opening
|
||||
// the directory and checking. We can't cache the actual
|
||||
// has-files-or-not values as the macOS/iOS client might
|
||||
// in the future use+delete the files directly. So only
|
||||
// keep this negative cache.
|
||||
totalReceived := m.totalReceived.Load()
|
||||
if totalReceived == m.emptySince.Load() {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check whether there is at least one one waiting file.
|
||||
err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool {
|
||||
name := de.Name()
|
||||
if isPartialOrDeleted(name) || !de.Type().IsRegular() {
|
||||
return true
|
||||
}
|
||||
_, err := os.Stat(filepath.Join(m.opts.Dir, name+deletedSuffix))
|
||||
if os.IsNotExist(err) {
|
||||
has = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// If there are no more waiting files, record totalReceived as emptySince
|
||||
// so that we can short-circuit the expensive directory traversal
|
||||
// if no files have been received after the start of this call.
|
||||
if err == nil && !has {
|
||||
m.emptySince.Store(totalReceived)
|
||||
}
|
||||
return has
|
||||
}
|
||||
|
||||
// WaitingFiles returns the list of files that have been sent by a
|
||||
// peer that are waiting in [Handler.Dir].
|
||||
// This always returns nil when [Handler.DirectFileMode] is false.
|
||||
func (m *manager) WaitingFiles() (ret []apitype.WaitingFile, err error) {
|
||||
if m == nil || m.opts.Dir == "" {
|
||||
return nil, ErrNoTaildrop
|
||||
}
|
||||
if m.opts.DirectFileMode {
|
||||
return nil, nil
|
||||
}
|
||||
if err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool {
|
||||
name := de.Name()
|
||||
if isPartialOrDeleted(name) || !de.Type().IsRegular() {
|
||||
return true
|
||||
}
|
||||
_, err := os.Stat(filepath.Join(m.opts.Dir, name+deletedSuffix))
|
||||
if os.IsNotExist(err) {
|
||||
fi, err := de.Info()
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
ret = append(ret, apitype.WaitingFile{
|
||||
Name: filepath.Base(name),
|
||||
Size: fi.Size(),
|
||||
})
|
||||
}
|
||||
return true
|
||||
}); err != nil {
|
||||
return nil, redactError(err)
|
||||
}
|
||||
sort.Slice(ret, func(i, j int) bool { return ret[i].Name < ret[j].Name })
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// DeleteFile deletes a file of the given baseName from [Handler.Dir].
|
||||
// This method is only allowed when [Handler.DirectFileMode] is false.
|
||||
func (m *manager) DeleteFile(baseName string) error {
|
||||
if m == nil || m.opts.Dir == "" {
|
||||
return ErrNoTaildrop
|
||||
}
|
||||
if m.opts.DirectFileMode {
|
||||
return errors.New("deletes not allowed in direct mode")
|
||||
}
|
||||
path, err := joinDir(m.opts.Dir, baseName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var bo *backoff.Backoff
|
||||
logf := m.opts.Logf
|
||||
t0 := m.opts.Clock.Now()
|
||||
for {
|
||||
err := os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
err = redactError(err)
|
||||
// Put a retry loop around deletes on Windows.
|
||||
//
|
||||
// Windows file descriptor closes are effectively asynchronous,
|
||||
// as a bunch of hooks run on/after close,
|
||||
// and we can't necessarily delete the file for a while after close,
|
||||
// as we need to wait for everybody to be done with it.
|
||||
// On Windows, unlike Unix, a file can't be deleted if it's open anywhere.
|
||||
// So try a few times but ultimately just leave a "foo.jpg.deleted"
|
||||
// marker file to note that it's deleted and we clean it up later.
|
||||
if runtime.GOOS == "windows" {
|
||||
if bo == nil {
|
||||
bo = backoff.NewBackoff("delete-retry", logf, 1*time.Second)
|
||||
}
|
||||
if m.opts.Clock.Since(t0) < 5*time.Second {
|
||||
bo.BackOff(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err := touchFile(path + deletedSuffix); err != nil {
|
||||
logf("peerapi: failed to leave deleted marker: %v", err)
|
||||
}
|
||||
m.deleter.Insert(baseName + deletedSuffix)
|
||||
}
|
||||
logf("peerapi: failed to DeleteFile: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func touchFile(path string) error {
|
||||
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
return redactError(err)
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// OpenFile opens a file of the given baseName from [Handler.Dir].
|
||||
// This method is only allowed when [Handler.DirectFileMode] is false.
|
||||
func (m *manager) OpenFile(baseName string) (rc io.ReadCloser, size int64, err error) {
|
||||
if m == nil || m.opts.Dir == "" {
|
||||
return nil, 0, ErrNoTaildrop
|
||||
}
|
||||
if m.opts.DirectFileMode {
|
||||
return nil, 0, errors.New("opens not allowed in direct mode")
|
||||
}
|
||||
path, err := joinDir(m.opts.Dir, baseName)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if _, err := os.Stat(path + deletedSuffix); err == nil {
|
||||
return nil, 0, redactError(&fs.PathError{Op: "open", Path: path, Err: fs.ErrNotExist})
|
||||
}
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, 0, redactError(err)
|
||||
}
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return nil, 0, redactError(err)
|
||||
}
|
||||
return f, fi.Size(), nil
|
||||
}
|
252
feature/taildrop/send.go
Normal file
252
feature/taildrop/send.go
Normal file
@@ -0,0 +1,252 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tailscale.com/envknob"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/version/distro"
|
||||
)
|
||||
|
||||
type incomingFileKey struct {
|
||||
id clientID
|
||||
name string // e.g., "foo.jpeg"
|
||||
}
|
||||
|
||||
type incomingFile struct {
|
||||
clock tstime.DefaultClock
|
||||
|
||||
started time.Time
|
||||
size int64 // or -1 if unknown; never 0
|
||||
w io.Writer // underlying writer
|
||||
sendFileNotify func() // called when done
|
||||
partialPath string // non-empty in direct mode
|
||||
finalPath string // not used in direct mode
|
||||
|
||||
mu sync.Mutex
|
||||
copied int64
|
||||
done bool
|
||||
lastNotify time.Time
|
||||
}
|
||||
|
||||
func (f *incomingFile) Write(p []byte) (n int, err error) {
|
||||
n, err = f.w.Write(p)
|
||||
|
||||
var needNotify bool
|
||||
defer func() {
|
||||
if needNotify {
|
||||
f.sendFileNotify()
|
||||
}
|
||||
}()
|
||||
if n > 0 {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.copied += int64(n)
|
||||
now := f.clock.Now()
|
||||
if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second {
|
||||
f.lastNotify = now
|
||||
needNotify = true
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// PutFile stores a file into [manager.Dir] from a given client id.
|
||||
// The baseName must be a base filename without any slashes.
|
||||
// The length is the expected length of content to read from r,
|
||||
// it may be negative to indicate that it is unknown.
|
||||
// It returns the length of the entire file.
|
||||
//
|
||||
// If there is a failure reading from r, then the partial file is not deleted
|
||||
// for some period of time. The [manager.PartialFiles] and [manager.HashPartialFile]
|
||||
// methods may be used to list all partial files and to compute the hash for a
|
||||
// specific partial file. This allows the client to determine whether to resume
|
||||
// a partial file. While resuming, PutFile may be called again with a non-zero
|
||||
// offset to specify where to resume receiving data at.
|
||||
func (m *manager) PutFile(id clientID, baseName string, r io.Reader, offset, length int64) (int64, error) {
|
||||
switch {
|
||||
case m == nil || m.opts.Dir == "":
|
||||
return 0, ErrNoTaildrop
|
||||
case !envknob.CanTaildrop():
|
||||
return 0, ErrNoTaildrop
|
||||
case distro.Get() == distro.Unraid && !m.opts.DirectFileMode:
|
||||
return 0, ErrNotAccessible
|
||||
}
|
||||
dstPath, err := joinDir(m.opts.Dir, baseName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
redactAndLogError := func(action string, err error) error {
|
||||
err = redactError(err)
|
||||
m.opts.Logf("put %v error: %v", action, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Check whether there is an in-progress transfer for the file.
|
||||
partialPath := dstPath + id.partialSuffix()
|
||||
inFileKey := incomingFileKey{id, baseName}
|
||||
inFile, loaded := m.incomingFiles.LoadOrInit(inFileKey, func() *incomingFile {
|
||||
inFile := &incomingFile{
|
||||
clock: m.opts.Clock,
|
||||
started: m.opts.Clock.Now(),
|
||||
size: length,
|
||||
sendFileNotify: m.opts.SendFileNotify,
|
||||
}
|
||||
if m.opts.DirectFileMode {
|
||||
inFile.partialPath = partialPath
|
||||
inFile.finalPath = dstPath
|
||||
}
|
||||
return inFile
|
||||
})
|
||||
if loaded {
|
||||
return 0, ErrFileExists
|
||||
}
|
||||
defer m.incomingFiles.Delete(inFileKey)
|
||||
m.deleter.Remove(filepath.Base(partialPath)) // avoid deleting the partial file while receiving
|
||||
|
||||
// Create (if not already) the partial file with read-write permissions.
|
||||
f, err := os.OpenFile(partialPath, os.O_CREATE|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return 0, redactAndLogError("Create", err)
|
||||
}
|
||||
defer func() {
|
||||
f.Close() // best-effort to cleanup dangling file handles
|
||||
if err != nil {
|
||||
m.deleter.Insert(filepath.Base(partialPath)) // mark partial file for eventual deletion
|
||||
}
|
||||
}()
|
||||
inFile.w = f
|
||||
|
||||
// Record that we have started to receive at least one file.
|
||||
// This is used by the deleter upon a cold-start to scan the directory
|
||||
// for any files that need to be deleted.
|
||||
if m.opts.State != nil {
|
||||
if b, _ := m.opts.State.ReadState(ipn.TaildropReceivedKey); len(b) == 0 {
|
||||
if err := m.opts.State.WriteState(ipn.TaildropReceivedKey, []byte{1}); err != nil {
|
||||
m.opts.Logf("WriteState error: %v", err) // non-fatal error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A positive offset implies that we are resuming an existing file.
|
||||
// Seek to the appropriate offset and truncate the file.
|
||||
if offset != 0 {
|
||||
currLength, err := f.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
return 0, redactAndLogError("Seek", err)
|
||||
}
|
||||
if offset < 0 || offset > currLength {
|
||||
return 0, redactAndLogError("Seek", err)
|
||||
}
|
||||
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
||||
return 0, redactAndLogError("Seek", err)
|
||||
}
|
||||
if err := f.Truncate(offset); err != nil {
|
||||
return 0, redactAndLogError("Truncate", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Copy the contents of the file.
|
||||
copyLength, err := io.Copy(inFile, r)
|
||||
if err != nil {
|
||||
return 0, redactAndLogError("Copy", err)
|
||||
}
|
||||
if length >= 0 && copyLength != length {
|
||||
return 0, redactAndLogError("Copy", errors.New("copied an unexpected number of bytes"))
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
return 0, redactAndLogError("Close", err)
|
||||
}
|
||||
fileLength := offset + copyLength
|
||||
|
||||
inFile.mu.Lock()
|
||||
inFile.done = true
|
||||
inFile.mu.Unlock()
|
||||
|
||||
// File has been successfully received, rename the partial file
|
||||
// to the final destination filename. If a file of that name already exists,
|
||||
// then try multiple times with variations of the filename.
|
||||
computePartialSum := sync.OnceValues(func() ([sha256.Size]byte, error) {
|
||||
return sha256File(partialPath)
|
||||
})
|
||||
maxRetries := 10
|
||||
for ; maxRetries > 0; maxRetries-- {
|
||||
// Atomically rename the partial file as the destination file if it doesn't exist.
|
||||
// Otherwise, it returns the length of the current destination file.
|
||||
// The operation is atomic.
|
||||
dstLength, err := func() (int64, error) {
|
||||
m.renameMu.Lock()
|
||||
defer m.renameMu.Unlock()
|
||||
switch fi, err := os.Stat(dstPath); {
|
||||
case os.IsNotExist(err):
|
||||
return -1, os.Rename(partialPath, dstPath)
|
||||
case err != nil:
|
||||
return -1, err
|
||||
default:
|
||||
return fi.Size(), nil
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
return 0, redactAndLogError("Rename", err)
|
||||
}
|
||||
if dstLength < 0 {
|
||||
break // we successfully renamed; so stop
|
||||
}
|
||||
|
||||
// Avoid the final rename if a destination file has the same contents.
|
||||
//
|
||||
// Note: this is best effort and copying files from iOS from the Media Library
|
||||
// results in processing on the iOS side which means the size and shas of the
|
||||
// same file can be different.
|
||||
if dstLength == fileLength {
|
||||
partialSum, err := computePartialSum()
|
||||
if err != nil {
|
||||
return 0, redactAndLogError("Rename", err)
|
||||
}
|
||||
dstSum, err := sha256File(dstPath)
|
||||
if err != nil {
|
||||
return 0, redactAndLogError("Rename", err)
|
||||
}
|
||||
if dstSum == partialSum {
|
||||
if err := os.Remove(partialPath); err != nil {
|
||||
return 0, redactAndLogError("Remove", err)
|
||||
}
|
||||
break // we successfully found a content match; so stop
|
||||
}
|
||||
}
|
||||
|
||||
// Choose a new destination filename and try again.
|
||||
dstPath = nextFilename(dstPath)
|
||||
inFile.finalPath = dstPath
|
||||
}
|
||||
if maxRetries <= 0 {
|
||||
return 0, errors.New("too many retries trying to rename partial file")
|
||||
}
|
||||
m.totalReceived.Add(1)
|
||||
m.opts.SendFileNotify()
|
||||
return fileLength, nil
|
||||
}
|
||||
|
||||
func sha256File(file string) (out [sha256.Size]byte, err error) {
|
||||
h := sha256.New()
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
return out, err
|
||||
}
|
||||
defer f.Close()
|
||||
if _, err := io.Copy(h, f); err != nil {
|
||||
return out, err
|
||||
}
|
||||
return [sha256.Size]byte(h.Sum(nil)), nil
|
||||
}
|
337
feature/taildrop/taildrop.go
Normal file
337
feature/taildrop/taildrop.go
Normal file
@@ -0,0 +1,337 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// Package taildrop contains the implementation of the Taildrop
|
||||
// functionality including sending and retrieving files.
|
||||
// This package does not validate permissions, the caller should
|
||||
// be responsible for ensuring correct authorization.
|
||||
//
|
||||
// For related documentation see: http://go/taildrop-how-does-it-work
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"hash/adler32"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/util/multierr"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoTaildrop = errors.New("Taildrop disabled; no storage directory")
|
||||
ErrInvalidFileName = errors.New("invalid filename")
|
||||
ErrFileExists = errors.New("file already exists")
|
||||
ErrNotAccessible = errors.New("Taildrop folder not configured or accessible")
|
||||
)
|
||||
|
||||
const (
|
||||
// partialSuffix is the suffix appended to files while they're
|
||||
// still in the process of being transferred.
|
||||
partialSuffix = ".partial"
|
||||
|
||||
// deletedSuffix is the suffix for a deleted marker file
|
||||
// that's placed next to a file (without the suffix) that we
|
||||
// tried to delete, but Windows wouldn't let us. These are
|
||||
// only written on Windows (and in tests), but they're not
|
||||
// permitted to be uploaded directly on any platform, like
|
||||
// partial files.
|
||||
deletedSuffix = ".deleted"
|
||||
)
|
||||
|
||||
// clientID is an opaque identifier for file resumption.
|
||||
// A client can only list and resume partial files for its own ID.
|
||||
// It must contain any filesystem specific characters (e.g., slashes).
|
||||
type clientID string // e.g., "n12345CNTRL"
|
||||
|
||||
func (id clientID) partialSuffix() string {
|
||||
if id == "" {
|
||||
return partialSuffix
|
||||
}
|
||||
return "." + string(id) + partialSuffix // e.g., ".n12345CNTRL.partial"
|
||||
}
|
||||
|
||||
// managerOptions are options to configure the [manager].
|
||||
type managerOptions struct {
|
||||
Logf logger.Logf // may be nil
|
||||
Clock tstime.DefaultClock // may be nil
|
||||
State ipn.StateStore // may be nil
|
||||
|
||||
// Dir is the directory to store received files.
|
||||
// This main either be the final location for the files
|
||||
// or just a temporary staging directory (see DirectFileMode).
|
||||
Dir string
|
||||
|
||||
// DirectFileMode reports whether we are writing files
|
||||
// directly to a download directory, rather than writing them to
|
||||
// a temporary staging directory.
|
||||
//
|
||||
// The following methods:
|
||||
// - HasFilesWaiting
|
||||
// - WaitingFiles
|
||||
// - DeleteFile
|
||||
// - OpenFile
|
||||
// have no purpose in DirectFileMode.
|
||||
// They are only used to check whether files are in the staging directory,
|
||||
// copy them out, and then delete them.
|
||||
DirectFileMode bool
|
||||
|
||||
// SendFileNotify is called periodically while a file is actively
|
||||
// receiving the contents for the file. There is a final call
|
||||
// to the function when reception completes.
|
||||
// It is not called if nil.
|
||||
SendFileNotify func()
|
||||
}
|
||||
|
||||
// manager manages the state for receiving and managing taildropped files.
|
||||
type manager struct {
|
||||
opts managerOptions
|
||||
|
||||
// incomingFiles is a map of files actively being received.
|
||||
incomingFiles syncs.Map[incomingFileKey, *incomingFile]
|
||||
// deleter managers asynchronous deletion of files.
|
||||
deleter fileDeleter
|
||||
|
||||
// renameMu is used to protect os.Rename calls so that they are atomic.
|
||||
renameMu sync.Mutex
|
||||
|
||||
// totalReceived counts the cumulative total of received files.
|
||||
totalReceived atomic.Int64
|
||||
// emptySince specifies that there were no waiting files
|
||||
// since this value of totalReceived.
|
||||
emptySince atomic.Int64
|
||||
}
|
||||
|
||||
// New initializes a new taildrop manager.
|
||||
// It may spawn asynchronous goroutines to delete files,
|
||||
// so the Shutdown method must be called for resource cleanup.
|
||||
func (opts managerOptions) New() *manager {
|
||||
if opts.Logf == nil {
|
||||
opts.Logf = logger.Discard
|
||||
}
|
||||
if opts.SendFileNotify == nil {
|
||||
opts.SendFileNotify = func() {}
|
||||
}
|
||||
m := &manager{opts: opts}
|
||||
m.deleter.Init(m, func(string) {})
|
||||
m.emptySince.Store(-1) // invalidate this cache
|
||||
return m
|
||||
}
|
||||
|
||||
// Dir returns the directory.
|
||||
func (m *manager) Dir() string {
|
||||
return m.opts.Dir
|
||||
}
|
||||
|
||||
// Shutdown shuts down the Manager.
|
||||
// It blocks until all spawned goroutines have stopped running.
|
||||
func (m *manager) Shutdown() {
|
||||
if m != nil {
|
||||
m.deleter.shutdown()
|
||||
m.deleter.group.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func validFilenameRune(r rune) bool {
|
||||
switch r {
|
||||
case '/':
|
||||
return false
|
||||
case '\\', ':', '*', '"', '<', '>', '|':
|
||||
// Invalid stuff on Windows, but we reject them everywhere
|
||||
// for now.
|
||||
// TODO(bradfitz): figure out a better plan. We initially just
|
||||
// wrote things to disk URL path-escaped, but that's gross
|
||||
// when debugging, and just moves the problem to callers.
|
||||
// So now we put the UTF-8 filenames on disk directly as
|
||||
// sent.
|
||||
return false
|
||||
}
|
||||
return unicode.IsGraphic(r)
|
||||
}
|
||||
|
||||
func isPartialOrDeleted(s string) bool {
|
||||
return strings.HasSuffix(s, deletedSuffix) || strings.HasSuffix(s, partialSuffix)
|
||||
}
|
||||
|
||||
func joinDir(dir, baseName string) (fullPath string, err error) {
|
||||
if !utf8.ValidString(baseName) {
|
||||
return "", ErrInvalidFileName
|
||||
}
|
||||
if strings.TrimSpace(baseName) != baseName {
|
||||
return "", ErrInvalidFileName
|
||||
}
|
||||
if len(baseName) > 255 {
|
||||
return "", ErrInvalidFileName
|
||||
}
|
||||
// TODO: validate unicode normalization form too? Varies by platform.
|
||||
clean := path.Clean(baseName)
|
||||
if clean != baseName ||
|
||||
clean == "." || clean == ".." ||
|
||||
isPartialOrDeleted(clean) {
|
||||
return "", ErrInvalidFileName
|
||||
}
|
||||
for _, r := range baseName {
|
||||
if !validFilenameRune(r) {
|
||||
return "", ErrInvalidFileName
|
||||
}
|
||||
}
|
||||
if !filepath.IsLocal(baseName) {
|
||||
return "", ErrInvalidFileName
|
||||
}
|
||||
return filepath.Join(dir, baseName), nil
|
||||
}
|
||||
|
||||
// rangeDir iterates over the contents of a directory, calling fn for each entry.
|
||||
// It continues iterating while fn returns true.
|
||||
// It reports the number of entries seen.
|
||||
func rangeDir(dir string, fn func(fs.DirEntry) bool) error {
|
||||
f, err := os.Open(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
for {
|
||||
des, err := f.ReadDir(10)
|
||||
for _, de := range des {
|
||||
if !fn(de) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// IncomingFiles returns a list of active incoming files.
|
||||
func (m *manager) IncomingFiles() []ipn.PartialFile {
|
||||
// Make sure we always set n.IncomingFiles non-nil so it gets encoded
|
||||
// in JSON to clients. They distinguish between empty and non-nil
|
||||
// to know whether a Notify should be able about files.
|
||||
files := make([]ipn.PartialFile, 0)
|
||||
for k, f := range m.incomingFiles.All() {
|
||||
f.mu.Lock()
|
||||
files = append(files, ipn.PartialFile{
|
||||
Name: k.name,
|
||||
Started: f.started,
|
||||
DeclaredSize: f.size,
|
||||
Received: f.copied,
|
||||
PartialPath: f.partialPath,
|
||||
FinalPath: f.finalPath,
|
||||
Done: f.done,
|
||||
})
|
||||
f.mu.Unlock()
|
||||
}
|
||||
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].Started.Before(files[j].Started)
|
||||
})
|
||||
|
||||
return files
|
||||
}
|
||||
|
||||
type redactedError struct {
|
||||
msg string
|
||||
inner error
|
||||
}
|
||||
|
||||
func (re *redactedError) Error() string {
|
||||
return re.msg
|
||||
}
|
||||
|
||||
func (re *redactedError) Unwrap() error {
|
||||
return re.inner
|
||||
}
|
||||
|
||||
func redactString(s string) string {
|
||||
hash := adler32.Checksum([]byte(s))
|
||||
|
||||
const redacted = "redacted"
|
||||
var buf [len(redacted) + len(".12345678")]byte
|
||||
b := append(buf[:0], []byte(redacted)...)
|
||||
b = append(b, '.')
|
||||
b = strconv.AppendUint(b, uint64(hash), 16)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func redactError(root error) error {
|
||||
// redactStrings is a list of sensitive strings that were redacted.
|
||||
// It is not sufficient to just snub out sensitive fields in Go errors
|
||||
// since some wrapper errors like fmt.Errorf pre-cache the error string,
|
||||
// which would unfortunately remain unaffected.
|
||||
var redactStrings []string
|
||||
|
||||
// Redact sensitive fields in known Go error types.
|
||||
var unknownErrors int
|
||||
multierr.Range(root, func(err error) bool {
|
||||
switch err := err.(type) {
|
||||
case *os.PathError:
|
||||
redactStrings = append(redactStrings, err.Path)
|
||||
err.Path = redactString(err.Path)
|
||||
case *os.LinkError:
|
||||
redactStrings = append(redactStrings, err.New, err.Old)
|
||||
err.New = redactString(err.New)
|
||||
err.Old = redactString(err.Old)
|
||||
default:
|
||||
unknownErrors++
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// If there are no redacted strings or no unknown error types,
|
||||
// then we can return the possibly modified root error verbatim.
|
||||
// Otherwise, we must replace redacted strings from any wrappers.
|
||||
if len(redactStrings) == 0 || unknownErrors == 0 {
|
||||
return root
|
||||
}
|
||||
|
||||
// Stringify and replace any paths that we found above, then return
|
||||
// the error wrapped in a type that uses the newly-redacted string
|
||||
// while also allowing Unwrap()-ing to the inner error type(s).
|
||||
s := root.Error()
|
||||
for _, toRedact := range redactStrings {
|
||||
s = strings.ReplaceAll(s, toRedact, redactString(toRedact))
|
||||
}
|
||||
return &redactedError{msg: s, inner: root}
|
||||
}
|
||||
|
||||
var (
|
||||
rxExtensionSuffix = regexp.MustCompile(`(\.[a-zA-Z0-9]{0,3}[a-zA-Z][a-zA-Z0-9]{0,3})*$`)
|
||||
rxNumberSuffix = regexp.MustCompile(` \([0-9]+\)`)
|
||||
)
|
||||
|
||||
// nextFilename returns the next filename in a sequence.
|
||||
// It is used for construction a new filename if there is a conflict.
|
||||
//
|
||||
// For example, "Foo.jpg" becomes "Foo (1).jpg" and
|
||||
// "Foo (1).jpg" becomes "Foo (2).jpg".
|
||||
func nextFilename(name string) string {
|
||||
ext := rxExtensionSuffix.FindString(strings.TrimPrefix(name, "."))
|
||||
name = strings.TrimSuffix(name, ext)
|
||||
var n uint64
|
||||
if rxNumberSuffix.MatchString(name) {
|
||||
i := strings.LastIndex(name, " (")
|
||||
if n, _ = strconv.ParseUint(name[i+len("( "):len(name)-len(")")], 10, 64); n > 0 {
|
||||
name = name[:i]
|
||||
}
|
||||
}
|
||||
return name + " (" + strconv.FormatUint(n+1, 10) + ")" + ext
|
||||
}
|
69
feature/taildrop/taildrop_test.go
Normal file
69
feature/taildrop/taildrop_test.go
Normal file
@@ -0,0 +1,69 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package taildrop
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestJoinDir(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
tests := []struct {
|
||||
in string
|
||||
want string // just relative to m.Dir
|
||||
wantOk bool
|
||||
}{
|
||||
{"", "", false},
|
||||
{"foo", "foo", true},
|
||||
{"./foo", "", false},
|
||||
{"../foo", "", false},
|
||||
{"foo/bar", "", false},
|
||||
{"😋", "😋", true},
|
||||
{"\xde\xad\xbe\xef", "", false},
|
||||
{"foo.partial", "", false},
|
||||
{"foo.deleted", "", false},
|
||||
{strings.Repeat("a", 1024), "", false},
|
||||
{"foo:bar", "", false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got, gotErr := joinDir(dir, tt.in)
|
||||
got, _ = filepath.Rel(dir, got)
|
||||
gotOk := gotErr == nil
|
||||
if got != tt.want || gotOk != tt.wantOk {
|
||||
t.Errorf("joinDir(%q) = (%v, %v), want (%v, %v)", tt.in, got, gotOk, tt.want, tt.wantOk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNextFilename(t *testing.T) {
|
||||
tests := []struct {
|
||||
in string
|
||||
want string
|
||||
want2 string
|
||||
}{
|
||||
{"foo", "foo (1)", "foo (2)"},
|
||||
{"foo(1)", "foo(1) (1)", "foo(1) (2)"},
|
||||
{"foo.tar", "foo (1).tar", "foo (2).tar"},
|
||||
{"foo.tar.gz", "foo (1).tar.gz", "foo (2).tar.gz"},
|
||||
{".bashrc", ".bashrc (1)", ".bashrc (2)"},
|
||||
{"fizz buzz.torrent", "fizz buzz (1).torrent", "fizz buzz (2).torrent"},
|
||||
{"rawr 2023.12.15.txt", "rawr 2023.12.15 (1).txt", "rawr 2023.12.15 (2).txt"},
|
||||
{"IMG_7934.JPEG", "IMG_7934 (1).JPEG", "IMG_7934 (2).JPEG"},
|
||||
{"my song.mp3", "my song (1).mp3", "my song (2).mp3"},
|
||||
{"archive.7z", "archive (1).7z", "archive (2).7z"},
|
||||
{"foo/bar/fizz", "foo/bar/fizz (1)", "foo/bar/fizz (2)"},
|
||||
{"新完全マスター N2 文法.pdf", "新完全マスター N2 文法 (1).pdf", "新完全マスター N2 文法 (2).pdf"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
if got := nextFilename(tt.in); got != tt.want {
|
||||
t.Errorf("NextFilename(%q) = %q, want %q", tt.in, got, tt.want)
|
||||
}
|
||||
if got2 := nextFilename(tt.want); got2 != tt.want2 {
|
||||
t.Errorf("NextFilename(%q) = %q, want %q", tt.want, got2, tt.want2)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user