mirror of
https://github.com/restic/restic.git
synced 2025-12-03 23:11:47 +00:00
Moves files
This commit is contained in:
2
internal/pipe/doc.go
Normal file
2
internal/pipe/doc.go
Normal file
@@ -0,0 +1,2 @@
|
||||
// Package pipe implements walking a directory in a deterministic order.
|
||||
package pipe
|
||||
292
internal/pipe/pipe.go
Normal file
292
internal/pipe/pipe.go
Normal file
@@ -0,0 +1,292 @@
|
||||
package pipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
"restic/errors"
|
||||
|
||||
"restic/debug"
|
||||
"restic/fs"
|
||||
)
|
||||
|
||||
type Result interface{}
|
||||
|
||||
type Job interface {
|
||||
Path() string
|
||||
Fullpath() string
|
||||
Error() error
|
||||
Info() os.FileInfo
|
||||
|
||||
Result() chan<- Result
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
basedir string
|
||||
path string
|
||||
info os.FileInfo
|
||||
error error
|
||||
result chan<- Result
|
||||
|
||||
// points to the old node if available, interface{} is used to prevent
|
||||
// circular import
|
||||
Node interface{}
|
||||
}
|
||||
|
||||
func (e Entry) Path() string { return e.path }
|
||||
func (e Entry) Fullpath() string { return filepath.Join(e.basedir, e.path) }
|
||||
func (e Entry) Error() error { return e.error }
|
||||
func (e Entry) Info() os.FileInfo { return e.info }
|
||||
func (e Entry) Result() chan<- Result { return e.result }
|
||||
|
||||
type Dir struct {
|
||||
basedir string
|
||||
path string
|
||||
error error
|
||||
info os.FileInfo
|
||||
|
||||
Entries [](<-chan Result)
|
||||
result chan<- Result
|
||||
}
|
||||
|
||||
func (e Dir) Path() string { return e.path }
|
||||
func (e Dir) Fullpath() string { return filepath.Join(e.basedir, e.path) }
|
||||
func (e Dir) Error() error { return e.error }
|
||||
func (e Dir) Info() os.FileInfo { return e.info }
|
||||
func (e Dir) Result() chan<- Result { return e.result }
|
||||
|
||||
// readDirNames reads the directory named by dirname and returns
|
||||
// a sorted list of directory entries.
|
||||
// taken from filepath/path.go
|
||||
func readDirNames(dirname string) ([]string, error) {
|
||||
f, err := fs.Open(dirname)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Open")
|
||||
}
|
||||
names, err := f.Readdirnames(-1)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Readdirnames")
|
||||
}
|
||||
sort.Strings(names)
|
||||
return names, nil
|
||||
}
|
||||
|
||||
// SelectFunc returns true for all items that should be included (files and
|
||||
// dirs). If false is returned, files are ignored and dirs are not even walked.
|
||||
type SelectFunc func(item string, fi os.FileInfo) bool
|
||||
|
||||
func walk(ctx context.Context, basedir, dir string, selectFunc SelectFunc, jobs chan<- Job, res chan<- Result) (excluded bool) {
|
||||
debug.Log("start on %q, basedir %q", dir, basedir)
|
||||
|
||||
relpath, err := filepath.Rel(basedir, dir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
info, err := fs.Lstat(dir)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "Lstat")
|
||||
debug.Log("error for %v: %v, res %p", dir, err, res)
|
||||
select {
|
||||
case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if !selectFunc(dir, info) {
|
||||
debug.Log("file %v excluded by filter, res %p", dir, res)
|
||||
excluded = true
|
||||
return
|
||||
}
|
||||
|
||||
if !info.IsDir() {
|
||||
debug.Log("sending file job for %v, res %p", dir, res)
|
||||
select {
|
||||
case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
debug.RunHook("pipe.readdirnames", dir)
|
||||
names, err := readDirNames(dir)
|
||||
if err != nil {
|
||||
debug.Log("Readdirnames(%v) returned error: %v, res %p", dir, err, res)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Insert breakpoint to allow testing behaviour with vanishing files
|
||||
// between Readdir() and lstat()
|
||||
debug.RunHook("pipe.walk1", relpath)
|
||||
|
||||
entries := make([]<-chan Result, 0, len(names))
|
||||
|
||||
for _, name := range names {
|
||||
subpath := filepath.Join(dir, name)
|
||||
|
||||
fi, statErr := fs.Lstat(subpath)
|
||||
if !selectFunc(subpath, fi) {
|
||||
debug.Log("file %v excluded by filter", subpath)
|
||||
continue
|
||||
}
|
||||
|
||||
ch := make(chan Result, 1)
|
||||
entries = append(entries, ch)
|
||||
|
||||
if statErr != nil {
|
||||
statErr = errors.Wrap(statErr, "Lstat")
|
||||
debug.Log("sending file job for %v, err %v, res %p", subpath, err, res)
|
||||
select {
|
||||
case jobs <- Entry{info: fi, error: statErr, basedir: basedir, path: filepath.Join(relpath, name), result: ch}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Insert breakpoint to allow testing behaviour with vanishing files
|
||||
// between walk and open
|
||||
debug.RunHook("pipe.walk2", filepath.Join(relpath, name))
|
||||
|
||||
walk(ctx, basedir, subpath, selectFunc, jobs, ch)
|
||||
}
|
||||
|
||||
debug.Log("sending dirjob for %q, basedir %q, res %p", dir, basedir, res)
|
||||
select {
|
||||
case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// cleanupPath is used to clean a path. For a normal path, a slice with just
|
||||
// the path is returned. For special cases such as "." and "/" the list of
|
||||
// names within those paths is returned.
|
||||
func cleanupPath(path string) ([]string, error) {
|
||||
path = filepath.Clean(path)
|
||||
if filepath.Dir(path) != path {
|
||||
return []string{path}, nil
|
||||
}
|
||||
|
||||
paths, err := readDirNames(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, p := range paths {
|
||||
paths[i] = filepath.Join(path, p)
|
||||
}
|
||||
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
// Walk sends a Job for each file and directory it finds below the paths. When
|
||||
// the channel done is closed, processing stops.
|
||||
func Walk(ctx context.Context, walkPaths []string, selectFunc SelectFunc, jobs chan<- Job, res chan<- Result) {
|
||||
var paths []string
|
||||
|
||||
for _, p := range walkPaths {
|
||||
ps, err := cleanupPath(p)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Readdirnames(%v): %v, skipping\n", p, err)
|
||||
debug.Log("Readdirnames(%v) returned error: %v, skipping", p, err)
|
||||
continue
|
||||
}
|
||||
|
||||
paths = append(paths, ps...)
|
||||
}
|
||||
|
||||
debug.Log("start on %v", paths)
|
||||
defer func() {
|
||||
debug.Log("output channel closed")
|
||||
close(jobs)
|
||||
}()
|
||||
|
||||
entries := make([]<-chan Result, 0, len(paths))
|
||||
for _, path := range paths {
|
||||
debug.Log("start walker for %v", path)
|
||||
ch := make(chan Result, 1)
|
||||
excluded := walk(ctx, filepath.Dir(path), path, selectFunc, jobs, ch)
|
||||
|
||||
if excluded {
|
||||
debug.Log("walker for %v done, it was excluded by the filter", path)
|
||||
continue
|
||||
}
|
||||
|
||||
entries = append(entries, ch)
|
||||
debug.Log("walker for %v done", path)
|
||||
}
|
||||
|
||||
debug.Log("sending root node, res %p", res)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case jobs <- Dir{Entries: entries, result: res}:
|
||||
}
|
||||
|
||||
debug.Log("walker done")
|
||||
}
|
||||
|
||||
// Split feeds all elements read from inChan to dirChan and entChan.
|
||||
func Split(inChan <-chan Job, dirChan chan<- Dir, entChan chan<- Entry) {
|
||||
debug.Log("start")
|
||||
defer debug.Log("done")
|
||||
|
||||
inCh := inChan
|
||||
dirCh := dirChan
|
||||
entCh := entChan
|
||||
|
||||
var (
|
||||
dir Dir
|
||||
ent Entry
|
||||
)
|
||||
|
||||
// deactivate sending until we received at least one job
|
||||
dirCh = nil
|
||||
entCh = nil
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-inCh:
|
||||
if !ok {
|
||||
// channel is closed
|
||||
return
|
||||
}
|
||||
|
||||
if job == nil {
|
||||
panic("nil job received")
|
||||
}
|
||||
|
||||
// disable receiving until the current job has been sent
|
||||
inCh = nil
|
||||
|
||||
switch j := job.(type) {
|
||||
case Dir:
|
||||
dir = j
|
||||
dirCh = dirChan
|
||||
case Entry:
|
||||
ent = j
|
||||
entCh = entChan
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown job type %v", j))
|
||||
}
|
||||
case dirCh <- dir:
|
||||
// disable sending, re-enable receiving
|
||||
dirCh = nil
|
||||
inCh = inChan
|
||||
case entCh <- ent:
|
||||
// disable sending, re-enable receiving
|
||||
entCh = nil
|
||||
inCh = inChan
|
||||
}
|
||||
}
|
||||
}
|
||||
600
internal/pipe/pipe_test.go
Normal file
600
internal/pipe/pipe_test.go
Normal file
@@ -0,0 +1,600 @@
|
||||
package pipe_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"restic/debug"
|
||||
"restic/pipe"
|
||||
. "restic/test"
|
||||
)
|
||||
|
||||
type stats struct {
|
||||
dirs, files int
|
||||
}
|
||||
|
||||
func acceptAll(string, os.FileInfo) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func statPath(path string) (stats, error) {
|
||||
var s stats
|
||||
|
||||
// count files and directories with filepath.Walk()
|
||||
err := filepath.Walk(TestWalkerPath, func(p string, fi os.FileInfo, err error) error {
|
||||
if fi == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if fi.IsDir() {
|
||||
s.dirs++
|
||||
} else {
|
||||
s.files++
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
return s, err
|
||||
}
|
||||
|
||||
const maxWorkers = 100
|
||||
|
||||
func TestPipelineWalkerWithSplit(t *testing.T) {
|
||||
if TestWalkerPath == "" {
|
||||
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
|
||||
}
|
||||
|
||||
var err error
|
||||
if !filepath.IsAbs(TestWalkerPath) {
|
||||
TestWalkerPath, err = filepath.Abs(TestWalkerPath)
|
||||
OK(t, err)
|
||||
}
|
||||
|
||||
before, err := statPath(TestWalkerPath)
|
||||
OK(t, err)
|
||||
|
||||
t.Logf("walking path %s with %d dirs, %d files", TestWalkerPath,
|
||||
before.dirs, before.files)
|
||||
|
||||
// account for top level dir
|
||||
before.dirs++
|
||||
|
||||
after := stats{}
|
||||
m := sync.Mutex{}
|
||||
|
||||
worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case e, ok := <-entCh:
|
||||
if !ok {
|
||||
// channel is closed
|
||||
return
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
after.files++
|
||||
m.Unlock()
|
||||
|
||||
e.Result() <- true
|
||||
|
||||
case dir, ok := <-dirCh:
|
||||
if !ok {
|
||||
// channel is closed
|
||||
return
|
||||
}
|
||||
|
||||
// wait for all content
|
||||
for _, ch := range dir.Entries {
|
||||
<-ch
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
after.dirs++
|
||||
m.Unlock()
|
||||
|
||||
dir.Result() <- true
|
||||
case <-done:
|
||||
// pipeline was cancelled
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
done := make(chan struct{})
|
||||
entCh := make(chan pipe.Entry)
|
||||
dirCh := make(chan pipe.Dir)
|
||||
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go worker(&wg, done, entCh, dirCh)
|
||||
}
|
||||
|
||||
jobs := make(chan pipe.Job, 200)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
pipe.Split(jobs, dirCh, entCh)
|
||||
close(entCh)
|
||||
close(dirCh)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
resCh := make(chan pipe.Result, 1)
|
||||
pipe.Walk(context.TODO(), []string{TestWalkerPath}, acceptAll, jobs, resCh)
|
||||
|
||||
// wait for all workers to terminate
|
||||
wg.Wait()
|
||||
|
||||
// wait for top-level blob
|
||||
<-resCh
|
||||
|
||||
t.Logf("walked path %s with %d dirs, %d files", TestWalkerPath,
|
||||
after.dirs, after.files)
|
||||
|
||||
Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
|
||||
}
|
||||
|
||||
func TestPipelineWalker(t *testing.T) {
|
||||
if TestWalkerPath == "" {
|
||||
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
defer cancel()
|
||||
|
||||
var err error
|
||||
if !filepath.IsAbs(TestWalkerPath) {
|
||||
TestWalkerPath, err = filepath.Abs(TestWalkerPath)
|
||||
OK(t, err)
|
||||
}
|
||||
|
||||
before, err := statPath(TestWalkerPath)
|
||||
OK(t, err)
|
||||
|
||||
t.Logf("walking path %s with %d dirs, %d files", TestWalkerPath,
|
||||
before.dirs, before.files)
|
||||
|
||||
// account for top level dir
|
||||
before.dirs++
|
||||
|
||||
after := stats{}
|
||||
m := sync.Mutex{}
|
||||
|
||||
worker := func(ctx context.Context, wg *sync.WaitGroup, jobs <-chan pipe.Job) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-jobs:
|
||||
if !ok {
|
||||
// channel is closed
|
||||
return
|
||||
}
|
||||
Assert(t, job != nil, "job is nil")
|
||||
|
||||
switch j := job.(type) {
|
||||
case pipe.Dir:
|
||||
// wait for all content
|
||||
for _, ch := range j.Entries {
|
||||
<-ch
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
after.dirs++
|
||||
m.Unlock()
|
||||
|
||||
j.Result() <- true
|
||||
case pipe.Entry:
|
||||
m.Lock()
|
||||
after.files++
|
||||
m.Unlock()
|
||||
|
||||
j.Result() <- true
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
// pipeline was cancelled
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
jobs := make(chan pipe.Job)
|
||||
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go worker(ctx, &wg, jobs)
|
||||
}
|
||||
|
||||
resCh := make(chan pipe.Result, 1)
|
||||
pipe.Walk(ctx, []string{TestWalkerPath}, acceptAll, jobs, resCh)
|
||||
|
||||
// wait for all workers to terminate
|
||||
wg.Wait()
|
||||
|
||||
// wait for top-level blob
|
||||
<-resCh
|
||||
|
||||
t.Logf("walked path %s with %d dirs, %d files", TestWalkerPath,
|
||||
after.dirs, after.files)
|
||||
|
||||
Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
|
||||
}
|
||||
|
||||
func createFile(filename, data string) error {
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
_, err = f.Write([]byte(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPipeWalkerError(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "restic-test-")
|
||||
OK(t, err)
|
||||
|
||||
base := filepath.Base(dir)
|
||||
|
||||
var testjobs = []struct {
|
||||
path []string
|
||||
err bool
|
||||
}{
|
||||
{[]string{base, "a", "file_a"}, false},
|
||||
{[]string{base, "a"}, false},
|
||||
{[]string{base, "b"}, true},
|
||||
{[]string{base, "c", "file_c"}, false},
|
||||
{[]string{base, "c"}, false},
|
||||
{[]string{base}, false},
|
||||
{[]string{}, false},
|
||||
}
|
||||
|
||||
OK(t, os.Mkdir(filepath.Join(dir, "a"), 0755))
|
||||
OK(t, os.Mkdir(filepath.Join(dir, "b"), 0755))
|
||||
OK(t, os.Mkdir(filepath.Join(dir, "c"), 0755))
|
||||
|
||||
OK(t, createFile(filepath.Join(dir, "a", "file_a"), "file a"))
|
||||
OK(t, createFile(filepath.Join(dir, "b", "file_b"), "file b"))
|
||||
OK(t, createFile(filepath.Join(dir, "c", "file_c"), "file c"))
|
||||
|
||||
ranHook := false
|
||||
testdir := filepath.Join(dir, "b")
|
||||
|
||||
// install hook that removes the dir right before readdirnames()
|
||||
debug.Hook("pipe.readdirnames", func(context interface{}) {
|
||||
path := context.(string)
|
||||
|
||||
if path != testdir {
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("in hook, removing test file %v", testdir)
|
||||
ranHook = true
|
||||
|
||||
OK(t, os.RemoveAll(testdir))
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
|
||||
ch := make(chan pipe.Job)
|
||||
resCh := make(chan pipe.Result, 1)
|
||||
|
||||
go pipe.Walk(ctx, []string{dir}, acceptAll, ch, resCh)
|
||||
|
||||
i := 0
|
||||
for job := range ch {
|
||||
if i == len(testjobs) {
|
||||
t.Errorf("too many jobs received")
|
||||
break
|
||||
}
|
||||
|
||||
p := filepath.Join(testjobs[i].path...)
|
||||
if p != job.Path() {
|
||||
t.Errorf("job %d has wrong path: expected %q, got %q", i, p, job.Path())
|
||||
}
|
||||
|
||||
if testjobs[i].err {
|
||||
if job.Error() == nil {
|
||||
t.Errorf("job %d expected error but got nil", i)
|
||||
}
|
||||
} else {
|
||||
if job.Error() != nil {
|
||||
t.Errorf("job %d expected no error but got %v", i, job.Error())
|
||||
}
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
if i != len(testjobs) {
|
||||
t.Errorf("expected %d jobs, got %d", len(testjobs), i)
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
Assert(t, ranHook, "hook did not run")
|
||||
OK(t, os.RemoveAll(dir))
|
||||
}
|
||||
|
||||
func BenchmarkPipelineWalker(b *testing.B) {
|
||||
if TestWalkerPath == "" {
|
||||
b.Skipf("walkerpath not set, skipping BenchPipelineWalker")
|
||||
}
|
||||
|
||||
var max time.Duration
|
||||
m := sync.Mutex{}
|
||||
|
||||
fileWorker := func(ctx context.Context, wg *sync.WaitGroup, ch <-chan pipe.Entry) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case e, ok := <-ch:
|
||||
if !ok {
|
||||
// channel is closed
|
||||
return
|
||||
}
|
||||
|
||||
// simulate backup
|
||||
//time.Sleep(10 * time.Millisecond)
|
||||
|
||||
e.Result() <- true
|
||||
case <-ctx.Done():
|
||||
// pipeline was cancelled
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dirWorker := func(ctx context.Context, wg *sync.WaitGroup, ch <-chan pipe.Dir) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case dir, ok := <-ch:
|
||||
if !ok {
|
||||
// channel is closed
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// wait for all content
|
||||
for _, ch := range dir.Entries {
|
||||
<-ch
|
||||
}
|
||||
|
||||
d := time.Since(start)
|
||||
m.Lock()
|
||||
if d > max {
|
||||
max = d
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
dir.Result() <- true
|
||||
case <-ctx.Done():
|
||||
// pipeline was cancelled
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
defer cancel()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
max = 0
|
||||
entCh := make(chan pipe.Entry, 200)
|
||||
dirCh := make(chan pipe.Dir, 200)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
b.Logf("starting %d workers", maxWorkers)
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
wg.Add(2)
|
||||
go dirWorker(ctx, &wg, dirCh)
|
||||
go fileWorker(ctx, &wg, entCh)
|
||||
}
|
||||
|
||||
jobs := make(chan pipe.Job, 200)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
pipe.Split(jobs, dirCh, entCh)
|
||||
close(entCh)
|
||||
close(dirCh)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
resCh := make(chan pipe.Result, 1)
|
||||
pipe.Walk(ctx, []string{TestWalkerPath}, acceptAll, jobs, resCh)
|
||||
|
||||
// wait for all workers to terminate
|
||||
wg.Wait()
|
||||
|
||||
// wait for final result
|
||||
<-resCh
|
||||
|
||||
b.Logf("max duration for a dir: %v", max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineWalkerMultiple(t *testing.T) {
|
||||
if TestWalkerPath == "" {
|
||||
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
defer cancel()
|
||||
|
||||
paths, err := filepath.Glob(filepath.Join(TestWalkerPath, "*"))
|
||||
OK(t, err)
|
||||
|
||||
before, err := statPath(TestWalkerPath)
|
||||
OK(t, err)
|
||||
|
||||
t.Logf("walking paths %v with %d dirs, %d files", paths,
|
||||
before.dirs, before.files)
|
||||
|
||||
after := stats{}
|
||||
m := sync.Mutex{}
|
||||
|
||||
worker := func(ctx context.Context, wg *sync.WaitGroup, jobs <-chan pipe.Job) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-jobs:
|
||||
if !ok {
|
||||
// channel is closed
|
||||
return
|
||||
}
|
||||
Assert(t, job != nil, "job is nil")
|
||||
|
||||
switch j := job.(type) {
|
||||
case pipe.Dir:
|
||||
// wait for all content
|
||||
for _, ch := range j.Entries {
|
||||
<-ch
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
after.dirs++
|
||||
m.Unlock()
|
||||
|
||||
j.Result() <- true
|
||||
case pipe.Entry:
|
||||
m.Lock()
|
||||
after.files++
|
||||
m.Unlock()
|
||||
|
||||
j.Result() <- true
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
// pipeline was cancelled
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
jobs := make(chan pipe.Job)
|
||||
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go worker(ctx, &wg, jobs)
|
||||
}
|
||||
|
||||
resCh := make(chan pipe.Result, 1)
|
||||
pipe.Walk(ctx, paths, acceptAll, jobs, resCh)
|
||||
|
||||
// wait for all workers to terminate
|
||||
wg.Wait()
|
||||
|
||||
// wait for top-level blob
|
||||
<-resCh
|
||||
|
||||
t.Logf("walked %d paths with %d dirs, %d files", len(paths), after.dirs, after.files)
|
||||
|
||||
Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
|
||||
}
|
||||
|
||||
func dirsInPath(path string) int {
|
||||
if path == "/" || path == "." || path == "" {
|
||||
return 0
|
||||
}
|
||||
|
||||
n := 0
|
||||
for dir := path; dir != "/" && dir != "."; dir = filepath.Dir(dir) {
|
||||
n++
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func TestPipeWalkerRoot(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skipf("not running TestPipeWalkerRoot on %s", runtime.GOOS)
|
||||
return
|
||||
}
|
||||
|
||||
cwd, err := os.Getwd()
|
||||
OK(t, err)
|
||||
|
||||
testPaths := []string{
|
||||
string(filepath.Separator),
|
||||
".",
|
||||
cwd,
|
||||
}
|
||||
|
||||
for _, path := range testPaths {
|
||||
testPipeWalkerRootWithPath(path, t)
|
||||
}
|
||||
}
|
||||
|
||||
func testPipeWalkerRootWithPath(path string, t *testing.T) {
|
||||
pattern := filepath.Join(path, "*")
|
||||
rootPaths, err := filepath.Glob(pattern)
|
||||
OK(t, err)
|
||||
|
||||
for i, p := range rootPaths {
|
||||
rootPaths[i], err = filepath.Rel(path, p)
|
||||
OK(t, err)
|
||||
}
|
||||
|
||||
t.Logf("paths in %v (pattern %q) expanded to %v items", path, pattern, len(rootPaths))
|
||||
|
||||
jobCh := make(chan pipe.Job)
|
||||
var jobs []pipe.Job
|
||||
|
||||
worker := func(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for job := range jobCh {
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go worker(&wg)
|
||||
|
||||
filter := func(p string, fi os.FileInfo) bool {
|
||||
p, err := filepath.Rel(path, p)
|
||||
OK(t, err)
|
||||
return dirsInPath(p) <= 1
|
||||
}
|
||||
|
||||
resCh := make(chan pipe.Result, 1)
|
||||
pipe.Walk(context.TODO(), []string{path}, filter, jobCh, resCh)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
t.Logf("received %d jobs", len(jobs))
|
||||
|
||||
for i, job := range jobs[:len(jobs)-1] {
|
||||
path := job.Path()
|
||||
if path == "." || path == ".." || path == string(filepath.Separator) {
|
||||
t.Errorf("job %v has invalid path %q", i, path)
|
||||
}
|
||||
}
|
||||
|
||||
lastPath := jobs[len(jobs)-1].Path()
|
||||
if lastPath != "" {
|
||||
t.Errorf("last job has non-empty path %q", lastPath)
|
||||
}
|
||||
|
||||
if len(jobs) < len(rootPaths) {
|
||||
t.Errorf("want at least %v jobs, got %v for path %v\n", len(rootPaths), len(jobs), path)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user