Add walk for trees, restructure

This commit is contained in:
Alexander Neumann
2015-03-02 14:48:47 +01:00
parent c8be54564f
commit f167366be5
12 changed files with 564 additions and 171 deletions

View File

@@ -5,6 +5,8 @@ import (
"os"
"path/filepath"
"sort"
"github.com/restic/restic/debug"
)
type Entry struct {
@@ -48,14 +50,15 @@ func isFile(fi os.FileInfo) bool {
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
}
func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir, res chan<- interface{}) error {
func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- interface{}) error {
info, err := os.Lstat(path)
if err != nil {
return err
}
if !info.IsDir() {
return fmt.Errorf("path is not a directory, cannot walk: %s", path)
jobs <- Entry{Info: info, Path: path, Result: res}
return nil
}
names, err := readDirNames(path)
@@ -78,26 +81,95 @@ func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir,
}
if isDir(fi) {
err = walk(subpath, done, entCh, dirCh, ch)
err = walk(subpath, done, jobs, ch)
if err != nil {
return err
}
} else {
entCh <- Entry{Info: fi, Path: subpath, Result: ch}
jobs <- Entry{Info: fi, Path: subpath, Result: ch}
}
}
dirCh <- Dir{Path: path, Info: info, Entries: entries, Result: res}
jobs <- Dir{Path: path, Info: info, Entries: entries, Result: res}
return nil
}
// Walk takes a path and sends a Job for each file and directory it finds below
// the path. When the channel done is closed, processing stops.
func Walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir) (<-chan interface{}, error) {
// Walk sends a Job for each file and directory it finds below the paths. When
// the channel done is closed, processing stops.
func Walk(paths []string, done chan struct{}, jobs chan<- interface{}) (<-chan interface{}, error) {
resCh := make(chan interface{}, 1)
err := walk(path, done, entCh, dirCh, resCh)
close(entCh)
close(dirCh)
return resCh, err
defer func() {
close(resCh)
close(jobs)
debug.Log("pipe.Walk", "output channel closed")
}()
entries := make([]<-chan interface{}, 0, len(paths))
for _, path := range paths {
debug.Log("pipe.Walk", "start walker for %v", path)
ch := make(chan interface{}, 1)
entries = append(entries, ch)
err := walk(path, done, jobs, ch)
if err != nil {
return nil, err
}
debug.Log("pipe.Walk", "walker for %v done", path)
}
resCh <- Dir{Entries: entries}
return resCh, nil
}
// Split feeds all elements read from inChan to dirChan and entChan.
func Split(inChan <-chan interface{}, dirChan chan<- Dir, entChan chan<- Entry) {
debug.Log("pipe.Split", "start")
defer debug.Log("pipe.Split", "done")
inCh := inChan
dirCh := dirChan
entCh := entChan
var (
dir Dir
ent Entry
)
// deactivate sending until we received at least one job
dirCh = nil
entCh = nil
for {
select {
case job, ok := <-inCh:
if !ok {
// channel is closed
return
}
if job == nil {
panic("nil job received")
}
// disable receiving until the current job has been sent
inCh = nil
switch j := job.(type) {
case Dir:
dir = j
dirCh = dirChan
case Entry:
ent = j
entCh = entChan
default:
panic(fmt.Sprintf("unknown job type %v", j))
}
case dirCh <- dir:
// disable sending, re-enable receiving
dirCh = nil
inCh = inChan
case entCh <- ent:
// disable sending, re-enable receiving
entCh = nil
inCh = inChan
}
}
}

View File

@@ -43,9 +43,9 @@ func statPath(path string) (stats, error) {
return s, err
}
func TestPipelineWalker(t *testing.T) {
func TestPipelineWalkerWithSplit(t *testing.T) {
if *testWalkerPath == "" {
t.Skipf("walkerpah not set, skipping TestPipelineWalker")
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
}
before, err := statPath(*testWalkerPath)
@@ -106,7 +106,92 @@ func TestPipelineWalker(t *testing.T) {
go worker(&wg, done, entCh, dirCh)
}
resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh)
jobs := make(chan interface{}, 200)
wg.Add(1)
go func() {
pipe.Split(jobs, dirCh, entCh)
close(entCh)
close(dirCh)
wg.Done()
}()
resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs)
ok(t, err)
// wait for all workers to terminate
wg.Wait()
// wait for top-level blob
<-resCh
t.Logf("walked path %s with %d dirs, %d files", *testWalkerPath,
after.dirs, after.files)
assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
}
func TestPipelineWalker(t *testing.T) {
if *testWalkerPath == "" {
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
}
before, err := statPath(*testWalkerPath)
ok(t, err)
t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath,
before.dirs, before.files)
after := stats{}
m := sync.Mutex{}
worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan interface{}) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
// channel is closed
return
}
assert(t, job != nil, "job is nil")
switch j := job.(type) {
case pipe.Dir:
// wait for all content
for _, ch := range j.Entries {
<-ch
}
m.Lock()
after.dirs++
m.Unlock()
j.Result <- true
case pipe.Entry:
m.Lock()
after.files++
m.Unlock()
j.Result <- true
}
case <-done:
// pipeline was cancelled
return
}
}
}
var wg sync.WaitGroup
done := make(chan struct{})
jobs := make(chan interface{})
for i := 0; i < *maxWorkers; i++ {
wg.Add(1)
go worker(&wg, done, jobs)
}
resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs)
ok(t, err)
// wait for all workers to terminate
@@ -123,7 +208,7 @@ func TestPipelineWalker(t *testing.T) {
func BenchmarkPipelineWalker(b *testing.B) {
if *testWalkerPath == "" {
b.Skipf("walkerpah not set, skipping BenchPipelineWalker")
b.Skipf("walkerpath not set, skipping BenchPipelineWalker")
}
var max time.Duration
@@ -196,7 +281,16 @@ func BenchmarkPipelineWalker(b *testing.B) {
go fileWorker(&wg, done, entCh)
}
resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh)
jobs := make(chan interface{}, 200)
wg.Add(1)
go func() {
pipe.Split(jobs, dirCh, entCh)
close(entCh)
close(dirCh)
wg.Done()
}()
resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs)
ok(b, err)
// wait for all workers to terminate