ipn{,/ipnlocal}, client/tailscale: move Taildrop recv notifications to LocalAPI HTTP method

Updates #6417

Change-Id: Iec544c477a0e5e9f1c6bf23555afec06255e2e22
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2022-11-20 13:25:54 -08:00 committed by Brad Fitzpatrick
parent f053f16460
commit 0f7da5c7dc
5 changed files with 96 additions and 28 deletions

View File

@ -426,8 +426,20 @@ func (lc *LocalClient) IDToken(ctx context.Context, aud string) (*tailcfg.TokenR
return decodeJSON[*tailcfg.TokenResponse](body)
}
// WaitingFiles returns the list of received Taildrop files that have been
// received by the Tailscale daemon in its staging/cache directory but not yet
// transferred by the user's CLI or GUI client and written to a user's home
// directory somewhere.
func (lc *LocalClient) WaitingFiles(ctx context.Context) ([]apitype.WaitingFile, error) {
body, err := lc.get200(ctx, "/localapi/v0/files/")
return lc.AwaitWaitingFiles(ctx, 0)
}
// AwaitWaitingFiles is like WaitingFiles but takes a duration to await for an answer.
// If the duration is 0, it will return immediately. The duration is respected at second
// granularity only. If no files are available, it returns (nil, nil).
func (lc *LocalClient) AwaitWaitingFiles(ctx context.Context, d time.Duration) ([]apitype.WaitingFile, error) {
path := "/localapi/v0/files/?waitsec=" + fmt.Sprint(int(d.Seconds()))
body, err := lc.get200(ctx, path)
if err != nil {
return nil, err
}

View File

@ -26,7 +26,6 @@
"golang.org/x/time/rate"
"tailscale.com/client/tailscale/apitype"
"tailscale.com/envknob"
"tailscale.com/ipn"
"tailscale.com/net/tsaddr"
"tailscale.com/tailcfg"
"tailscale.com/util/quarantine"
@ -529,30 +528,16 @@ func wipeInbox(ctx context.Context) error {
}
func waitForFile(ctx context.Context) error {
c, bc, pumpCtx, cancel := connect(ctx)
defer cancel()
fileWaiting := make(chan bool, 1)
notifyError := make(chan error, 1)
bc.SetNotifyCallback(func(n ipn.Notify) {
if n.ErrMessage != nil {
notifyError <- fmt.Errorf("Notify.ErrMessage: %v", *n.ErrMessage)
}
if n.FilesWaiting != nil {
select {
case fileWaiting <- true:
default:
}
}
})
go pump(pumpCtx, bc, c)
select {
case <-fileWaiting:
for {
ff, err := localClient.AwaitWaitingFiles(ctx, time.Hour)
if len(ff) > 0 {
return nil
case <-pumpCtx.Done():
return pumpCtx.Err()
case <-ctx.Done():
return ctx.Err()
case err := <-notifyError:
}
if err := ctx.Err(); err != nil {
return err
}
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
return err
}
}
}

View File

@ -76,6 +76,8 @@ type Notify struct {
// FilesWaiting if non-nil means that files are buffered in
// the Tailscale daemon and ready for local transfer to the
// user's preferred storage location.
//
// Deprecated: use LocalClient.AwaitWaitingFiles instead.
FilesWaiting *empty.Message `json:",omitempty"`
// IncomingFiles, if non-nil, specifies which files are in the
@ -83,6 +85,8 @@ type Notify struct {
// Notify should not update the state of file transfers. A non-nil
// but empty IncomingFiles means that no files are in the middle
// of being transferred.
//
// Deprecated: use LocalClient.AwaitWaitingFiles instead.
IncomingFiles []PartialFile `json:",omitempty"`
// LocalTCPPort, if non-nil, informs the UI frontend which

View File

@ -180,6 +180,7 @@ type LocalBackend struct {
peerAPIListeners []*peerAPIListener
loginFlags controlclient.LoginFlags
incomingFiles map[*incomingFile]bool
fileWaiters map[*mapSetHandle]context.CancelFunc // handle => func to call on file received
lastStatusTime time.Time // status.AsOf value of the last processed status update
// directFileRoot, if non-empty, means to write received files
// directly to this directory, without staging them in an
@ -1709,6 +1710,9 @@ func (b *LocalBackend) sendFileNotify() {
var n ipn.Notify
b.mu.Lock()
for _, wakeWaiter := range b.fileWaiters {
wakeWaiter()
}
notifyFunc := b.notify
apiSrv := b.peerAPIServer
if notifyFunc == nil || apiSrv == nil {
@ -3579,6 +3583,20 @@ func (b *LocalBackend) TestOnlyPublicKeys() (machineKey key.MachinePublic, nodeK
return mk, nk
}
// mapSetHandle is a minimal (but non-zero) value whose address serves as a map
// key for sets of non-comparable values that can't be map keys themselves.
type mapSetHandle byte
func (b *LocalBackend) setFileWaiter(handle *mapSetHandle, wakeWaiter context.CancelFunc) {
b.mu.Lock()
defer b.mu.Unlock()
if wakeWaiter == nil {
delete(b.fileWaiters, handle)
} else {
mak.Set(&b.fileWaiters, handle, wakeWaiter)
}
}
func (b *LocalBackend) WaitingFiles() ([]apitype.WaitingFile, error) {
b.mu.Lock()
apiSrv := b.peerAPIServer
@ -3586,6 +3604,42 @@ func (b *LocalBackend) WaitingFiles() ([]apitype.WaitingFile, error) {
return apiSrv.WaitingFiles()
}
// AwaitWaitingFiles is like WaitingFiles but blocks while ctx is not done,
// waiting for any files to be available.
//
// On return, exactly one of the results will be non-empty or non-nil,
// respectively.
func (b *LocalBackend) AwaitWaitingFiles(ctx context.Context) ([]apitype.WaitingFile, error) {
if ff, err := b.WaitingFiles(); err != nil || len(ff) > 0 {
return ff, err
}
for {
gotFile, gotFileCancel := context.WithCancel(context.Background())
defer gotFileCancel()
handle := new(mapSetHandle)
b.setFileWaiter(handle, gotFileCancel)
defer b.setFileWaiter(handle, nil)
// Now that we've registered ourselves, check again, in case
// of race. Otherwise there's a small window where we could
// miss a file arrival and wait forever.
if ff, err := b.WaitingFiles(); err != nil || len(ff) > 0 {
return ff, err
}
select {
case <-gotFile.Done():
if ff, err := b.WaitingFiles(); err != nil || len(ff) > 0 {
return ff, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (b *LocalBackend) DeleteFile(name string) error {
b.mu.Lock()
apiSrv := b.peerAPIServer

View File

@ -7,6 +7,7 @@
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
@ -680,8 +681,20 @@ func (h *Handler) serveFiles(w http.ResponseWriter, r *http.Request) {
http.Error(w, "want GET to list files", 400)
return
}
wfs, err := h.b.WaitingFiles()
ctx := r.Context()
if s := r.FormValue("waitsec"); s != "" && s != "0" {
d, err := strconv.Atoi(s)
if err != nil {
http.Error(w, "invalid waitsec", http.StatusBadRequest)
return
}
deadline := time.Now().Add(time.Duration(d) * time.Second)
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, deadline)
defer cancel()
}
wfs, err := h.b.AwaitWaitingFiles(ctx)
if err != nil && ctx.Err() == nil {
http.Error(w, err.Error(), 500)
return
}