Use new archiver code for backup

This commit is contained in:
Alexander Neumann
2018-04-22 11:57:20 +02:00
parent c703d21d55
commit 0e78ac92d8
9 changed files with 863 additions and 337 deletions

343
internal/ui/backup.go Normal file
View File

@@ -0,0 +1,343 @@
package ui
import (
"context"
"fmt"
"os"
"sort"
"sync"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/termstatus"
)
type counter struct {
Files, Dirs uint
Bytes uint64
}
type fileWorkerMessage struct {
filename string
done bool
}
// Backup reports progress for the `backup` command.
type Backup struct {
*Message
*StdioWrapper
MinUpdatePause time.Duration
term *termstatus.Terminal
v uint
start time.Time
totalBytes uint64
totalCh chan counter
processedCh chan counter
errCh chan struct{}
workerCh chan fileWorkerMessage
summary struct {
sync.Mutex
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
archiver.ItemStats
}
}
// NewBackup returns a new backup progress reporter.
func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
return &Backup{
Message: NewMessage(term, verbosity),
StdioWrapper: NewStdioWrapper(term),
term: term,
v: verbosity,
start: time.Now(),
// limit to 60fps by default
MinUpdatePause: time.Second / 60,
totalCh: make(chan counter),
processedCh: make(chan counter),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage),
}
}
// Run regularly updates the status lines. It should be called in a separate
// goroutine.
func (b *Backup) Run(ctx context.Context) error {
var (
lastUpdate time.Time
total, processed counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case t, ok := <-b.totalCh:
if ok {
total = t
started = true
} else {
// scan has finished
b.totalCh = nil
b.totalBytes = total.Bytes
}
case s := <-b.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-b.errCh:
errors++
started = true
case m := <-b.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if b.totalCh == nil {
secs := float64(time.Since(b.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
}
// limit update frequency
if time.Since(lastUpdate) < b.MinUpdatePause {
continue
}
lastUpdate = time.Now()
b.update(total, processed, errors, currentFiles, secondsRemaining)
}
}
// update updates the status lines.
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
var status string
if total.Files == 0 && total.Dirs == 0 {
// no total count available yet
status = fmt.Sprintf("[%s] %v files, %s, %d errors",
formatDuration(time.Since(b.start)),
processed.Files, formatBytes(processed.Bytes), errors,
)
} else {
var eta string
if secs > 0 {
eta = fmt.Sprintf(" ETA %s", formatSeconds(secs))
}
// include totals
status = fmt.Sprintf("[%s] %s %v files %s, total %v files %v, %d errors%s",
formatDuration(time.Since(b.start)),
formatPercent(processed.Bytes, total.Bytes),
processed.Files,
formatBytes(processed.Bytes),
total.Files,
formatBytes(total.Bytes),
errors,
eta,
)
}
lines := make([]string, 0, len(currentFiles)+1)
for filename := range currentFiles {
lines = append(lines, filename)
}
sort.Sort(sort.StringSlice(lines))
lines = append([]string{status}, lines...)
b.term.SetStatus(lines)
}
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
b.V("scan: %v\n", err)
return nil
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
b.E("error: %v\n", err)
b.errCh <- struct{}{}
return nil
}
// StartFile is called when a file is being processed by a worker.
func (b *Backup) StartFile(filename string) {
b.workerCh <- fileWorkerMessage{
filename: filename,
}
}
// CompleteBlob is called for all saved blobs for files.
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
b.processedCh <- counter{Bytes: bytes}
}
func formatPercent(numerator uint64, denominator uint64) string {
if denominator == 0 {
return ""
}
percent := 100.0 * float64(numerator) / float64(denominator)
if percent > 100 {
percent = 100
}
return fmt.Sprintf("%3.2f%%", percent)
}
func formatSeconds(sec uint64) string {
hours := sec / 3600
sec -= hours * 3600
min := sec / 60
sec -= min * 60
if hours > 0 {
return fmt.Sprintf("%d:%02d:%02d", hours, min, sec)
}
return fmt.Sprintf("%d:%02d", min, sec)
}
func formatDuration(d time.Duration) string {
sec := uint64(d / time.Second)
return formatSeconds(sec)
}
func formatBytes(c uint64) string {
b := float64(c)
switch {
case c > 1<<40:
return fmt.Sprintf("%.3f TiB", b/(1<<40))
case c > 1<<30:
return fmt.Sprintf("%.3f GiB", b/(1<<30))
case c > 1<<20:
return fmt.Sprintf("%.3f MiB", b/(1<<20))
case c > 1<<10:
return fmt.Sprintf("%.3f KiB", b/(1<<10))
default:
return fmt.Sprintf("%d B", c)
}
}
// CompleteItemFn is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (b *Backup) CompleteItemFn(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
b.summary.Lock()
b.summary.ItemStats.Add(s)
b.summary.Unlock()
if current == nil {
return
}
switch current.Type {
case "file":
b.processedCh <- counter{Files: 1}
b.workerCh <- fileWorkerMessage{
filename: item,
done: true,
}
case "dir":
b.processedCh <- counter{Dirs: 1}
}
if current.Type == "dir" {
if previous == nil {
b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
b.summary.Lock()
b.summary.Dirs.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
b.VV("unchanged %v", item)
b.summary.Lock()
b.summary.Dirs.Unchanged++
b.summary.Unlock()
} else {
b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
b.summary.Lock()
b.summary.Dirs.Changed++
b.summary.Unlock()
}
} else if current.Type == "file" {
b.workerCh <- fileWorkerMessage{
done: true,
filename: item,
}
if previous == nil {
b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
b.summary.Lock()
b.summary.Files.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
b.VV("unchanged %v", item)
b.summary.Lock()
b.summary.Files.Unchanged++
b.summary.Unlock()
} else {
b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
b.summary.Lock()
b.summary.Files.Changed++
b.summary.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
b.totalCh <- counter{Files: s.Files, Dirs: s.Dirs, Bytes: s.Bytes}
if item == "" {
b.V("scan finished in %.3fs", time.Since(b.start).Seconds())
close(b.totalCh)
return
}
}
// Finish prints the finishing messages.
func (b *Backup) Finish() {
b.V("processed %s in %s", formatBytes(b.totalBytes), formatDuration(time.Since(b.start)))
b.V("\n")
b.V("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged)
b.V("Dirs: %5d new, %5d changed, %5d unmodified\n", b.summary.Dirs.New, b.summary.Dirs.Changed, b.summary.Dirs.Unchanged)
b.VV("Data Blobs: %5d new\n", b.summary.ItemStats.DataBlobs)
b.VV("Tree Blobs: %5d new\n", b.summary.ItemStats.TreeBlobs)
b.V("Added: %-5s\n", formatBytes(b.summary.ItemStats.DataSize+b.summary.ItemStats.TreeSize))
b.V("\n")
}

45
internal/ui/message.go Normal file
View File

@@ -0,0 +1,45 @@
package ui
import "github.com/restic/restic/internal/ui/termstatus"
// Message reports progress with messages of different verbosity.
type Message struct {
term *termstatus.Terminal
v uint
}
// NewMessage returns a message progress reporter with underlying terminal
// term.
func NewMessage(term *termstatus.Terminal, verbosity uint) *Message {
return &Message{
term: term,
v: verbosity,
}
}
// E reports an error
func (m *Message) E(msg string, args ...interface{}) {
m.term.Errorf(msg, args...)
}
// P prints a message if verbosity >= 1, this is used for normal messages which
// are not errors.
func (m *Message) P(msg string, args ...interface{}) {
if m.v >= 1 {
m.term.Printf(msg, args...)
}
}
// V prints a message if verbosity >= 2, this is used for verbose messages.
func (m *Message) V(msg string, args ...interface{}) {
if m.v >= 2 {
m.term.Printf(msg, args...)
}
}
// VV prints a message if verbosity >= 3, this is used for debug messages.
func (m *Message) VV(msg string, args ...interface{}) {
if m.v >= 3 {
m.term.Printf(msg, args...)
}
}

View File

@@ -0,0 +1,86 @@
package ui
import (
"bytes"
"io"
"github.com/restic/restic/internal/ui/termstatus"
)
// StdioWrapper provides stdout and stderr integration with termstatus.
type StdioWrapper struct {
stdout *lineWriter
stderr *lineWriter
}
// NewStdioWrapper initializes a new stdio wrapper that can be used in place of
// os.Stdout or os.Stderr.
func NewStdioWrapper(term *termstatus.Terminal) *StdioWrapper {
return &StdioWrapper{
stdout: newLineWriter(term.Print),
stderr: newLineWriter(term.Error),
}
}
// Stdout returns a writer that is line buffered and can be used in place of
// os.Stdout. On Close(), the remaining bytes are written, followed by a line
// break.
func (w *StdioWrapper) Stdout() io.WriteCloser {
return w.stdout
}
// Stderr returns a writer that is line buffered and can be used in place of
// os.Stderr. On Close(), the remaining bytes are written, followed by a line
// break.
func (w *StdioWrapper) Stderr() io.WriteCloser {
return w.stderr
}
type lineWriter struct {
buf *bytes.Buffer
print func(string)
}
var _ io.WriteCloser = &lineWriter{}
func newLineWriter(print func(string)) *lineWriter {
return &lineWriter{buf: bytes.NewBuffer(nil), print: print}
}
func (w *lineWriter) Write(data []byte) (n int, err error) {
n, err = w.buf.Write(data)
if err != nil {
return n, err
}
// look for line breaks
buf := w.buf.Bytes()
skip := 0
for i := 0; i < len(buf); {
if buf[i] == '\n' {
// found line
w.print(string(buf[:i+1]))
buf = buf[i+1:]
skip += i + 1
i = 0
continue
}
i++
}
_ = w.buf.Next(skip)
return n, err
}
func (w *lineWriter) Flush() error {
if w.buf.Len() > 0 {
w.print(string(append(w.buf.Bytes(), '\n')))
}
return nil
}
func (w *lineWriter) Close() error {
return w.Flush()
}

View File

@@ -0,0 +1,95 @@
package ui
import (
"testing"
"github.com/google/go-cmp/cmp"
)
func TestStdioWrapper(t *testing.T) {
var tests = []struct {
inputs [][]byte
outputs []string
}{
{
inputs: [][]byte{
[]byte("foo"),
},
outputs: []string{
"foo\n",
},
},
{
inputs: [][]byte{
[]byte("foo"),
[]byte("bar"),
[]byte("\n"),
[]byte("baz"),
},
outputs: []string{
"foobar\n",
"baz\n",
},
},
{
inputs: [][]byte{
[]byte("foo"),
[]byte("bar\nbaz\n"),
[]byte("bump\n"),
},
outputs: []string{
"foobar\n",
"baz\n",
"bump\n",
},
},
{
inputs: [][]byte{
[]byte("foo"),
[]byte("bar\nbaz\n"),
[]byte("bum"),
[]byte("p\nx"),
[]byte("x"),
[]byte("x"),
[]byte("z"),
},
outputs: []string{
"foobar\n",
"baz\n",
"bump\n",
"xxxz\n",
},
},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
var lines []string
print := func(s string) {
lines = append(lines, s)
}
w := newLineWriter(print)
for _, data := range test.inputs {
n, err := w.Write(data)
if err != nil {
t.Fatal(err)
}
if n != len(data) {
t.Errorf("invalid length returned by Write, want %d, got %d", len(data), n)
}
}
err := w.Close()
if err != nil {
t.Fatal(err)
}
if !cmp.Equal(test.outputs, lines) {
t.Error(cmp.Diff(test.outputs, lines))
}
})
}
}