tailscale/taildrop/resume_test.go
Joe Tsai 3f27087e9d
taildrop: switch hashing to be streaming based (#9861)
While the previous logic was correct, it did not perform well.
Resuming is a dance between the client and server, where
1. the client requests hashes for a partial file,
2. the server then computes those hashes,
3. the client computes hashes locally and compares them.
4. goto 1 while the partial file still has data

While step 2 is running, the client is sitting idle.
While step 3 is running, the server is sitting idle.

By streaming over the block hash immediately after the server
computes it, the client can start checking the hash,
while the server works on the next hash (in a pipelined manner).
This performs dramatically better and also uses less memory
as we don't need to hold a list of hashes, but only need to
handle one hash at a time.

There are two detriments to this approach:
* The HTTP API relies on a JSON stream,
  which is not a standard REST-like pattern.
  However, since we implement both client and server,
  this is fine.
* While the stream is on-going, we hold an open file handle
  on the server side while the file is being hashed.
  On really slow streams, this could hold a file open forever.

Updates tailscale/corp#14772

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
Co-authored-by: Rhea Ghosh <rhea@tailscale.com>
2023-10-17 17:53:40 -07:00

67 lines
1.6 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package taildrop
import (
"bytes"
"io"
"math/rand"
"os"
"testing"
"testing/iotest"
"tailscale.com/util/must"
)
func TestResume(t *testing.T) {
oldBlockSize := blockSize
defer func() { blockSize = oldBlockSize }()
blockSize = 256
m := ManagerOptions{Logf: t.Logf, Dir: t.TempDir()}.New()
defer m.Shutdown()
rn := rand.New(rand.NewSource(0))
want := make([]byte, 12345)
must.Get(io.ReadFull(rn, want))
t.Run("resume-noexist", func(t *testing.T) {
r := io.Reader(bytes.NewReader(want))
next, close, err := m.HashPartialFile("", "foo")
must.Do(err)
defer close()
offset, r, err := ResumeReader(r, next)
must.Do(err)
must.Get(m.PutFile("", "foo", r, offset, -1))
got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo"))))
if !bytes.Equal(got, want) {
t.Errorf("content mismatches")
}
})
t.Run("resume-retry", func(t *testing.T) {
rn := rand.New(rand.NewSource(0))
for {
r := io.Reader(bytes.NewReader(want))
next, close, err := m.HashPartialFile("", "foo")
must.Do(err)
defer close()
offset, r, err := ResumeReader(r, next)
must.Do(err)
numWant := rn.Int63n(min(int64(len(want))-offset, 1000) + 1)
if offset < int64(len(want)) {
r = io.MultiReader(io.LimitReader(r, numWant), iotest.ErrReader(io.ErrClosedPipe))
}
if _, err := m.PutFile("", "foo", r, offset, -1); err == nil {
break
}
}
got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo"))))
if !bytes.Equal(got, want) {
t.Errorf("content mismatches")
}
})
}