Merge pull request #5451 from greatroar/concurrency

Concurrency simplifications
This commit is contained in:
Michael Eischer
2025-09-24 22:22:40 +02:00
committed by GitHub
3 changed files with 43 additions and 64 deletions

View File

@@ -104,9 +104,11 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error {
func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error { func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error {
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
limit := d.repo.Connections() - 1 // See below for the -1. limit := int(d.repo.Connections())
wg.SetLimit(1 + limit) // +1 for the writer.
blobs := make(chan (<-chan []byte), limit) blobs := make(chan (<-chan []byte), limit)
// Writer.
wg.Go(func() error { wg.Go(func() error {
for ch := range blobs { for ch := range blobs {
select { select {
@@ -122,7 +124,6 @@ func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node)
}) })
// Start short-lived goroutines to load blobs. // Start short-lived goroutines to load blobs.
// There will be at most 1+cap(blobs) calling LoadBlob at any moment.
loop: loop:
for _, id := range node.Content { for _, id := range node.Content {
// This needs to be buffered, so that loaders can quit // This needs to be buffered, so that loaders can quit

View File

@@ -459,27 +459,24 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked[restic.
return nil return nil
}) })
// a worker receives an index from ch, and saves the index var savers errgroup.Group
worker := func() error {
for idx := range saveCh {
idx.Finalize()
if len(idx.packs) == 0 {
continue
}
if _, err := idx.SaveIndex(wgCtx, repo); err != nil {
return err
}
}
return nil
}
// encoding an index can take quite some time such that this can be CPU- or IO-bound // encoding an index can take quite some time such that this can be CPU- or IO-bound
// do not add repo.Connections() here as there are already the loader goroutines. // do not add repo.Connections() here as there are already the loader goroutines.
workerCount := runtime.GOMAXPROCS(0) savers.SetLimit(runtime.GOMAXPROCS(0))
// run workers on ch
for i := 0; i < workerCount; i++ { for idx := range saveCh {
wg.Go(worker) savers.Go(func() error {
idx.Finalize()
if len(idx.packs) == 0 {
return nil
}
_, err := idx.SaveIndex(wgCtx, repo)
return err
})
} }
wg.Go(savers.Wait)
err := wg.Wait() err := wg.Wait()
p.Done() p.Done()
if err != nil { if err != nil {
@@ -514,6 +511,8 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove
obsolete := restic.NewIDSet() obsolete := restic.NewIDSet()
wg, wgCtx := errgroup.WithContext(ctx) wg, wgCtx := errgroup.WithContext(ctx)
// keep concurrency bounded as we're on a fallback path
wg.SetLimit(1 + int(repo.Connections()))
ch := make(chan *Index) ch := make(chan *Index)
wg.Go(func() error { wg.Go(func() error {
@@ -553,23 +552,14 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove
return nil return nil
}) })
// a worker receives an index from ch, and saves the index for idx := range ch {
worker := func() error { wg.Go(func() error {
for idx := range ch {
idx.Finalize() idx.Finalize()
if _, err := idx.SaveIndex(wgCtx, repo); err != nil { _, err := idx.SaveIndex(wgCtx, repo)
return err return err
} })
}
return nil
} }
// keep concurrency bounded as we're on a fallback path
workerCount := int(repo.Connections())
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err := wg.Wait() err := wg.Wait()
p.Done() p.Done()
// the index no longer matches to stored state // the index no longer matches to stored state

View File

@@ -52,43 +52,31 @@ func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, f
return wg.Wait() return wg.Wait()
} }
// ParallelRemove deletes the given fileList of fileType in parallel // ParallelRemove deletes the given fileList of fileType in parallel.
// if callback returns an error, then it will abort. // If report returns an error, it aborts.
func ParallelRemove[FT FileTypes](ctx context.Context, repo RemoverUnpacked[FT], fileList IDSet, fileType FT, report func(id ID, err error) error, bar *progress.Counter) error { func ParallelRemove[FT FileTypes](ctx context.Context, repo RemoverUnpacked[FT], fileList IDSet, fileType FT, report func(id ID, err error) error, bar *progress.Counter) error {
fileChan := make(chan ID)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error { wg.SetLimit(int(repo.Connections())) // deleting files is IO-bound
defer close(fileChan)
for id := range fileList {
select {
case fileChan <- id:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
bar.SetMax(uint64(len(fileList))) bar.SetMax(uint64(len(fileList)))
// deleting files is IO-bound loop:
workerCount := repo.Connections() for id := range fileList {
for i := 0; i < int(workerCount); i++ { select {
case <-ctx.Done():
break loop
default:
}
wg.Go(func() error { wg.Go(func() error {
for id := range fileChan { err := repo.RemoveUnpacked(ctx, fileType, id)
err := repo.RemoveUnpacked(ctx, fileType, id) if err == nil {
if err == nil { bar.Add(1)
// increment counter only if no error
bar.Add(1)
}
if report != nil {
err = report(id, err)
}
if err != nil {
return err
}
} }
return nil if report != nil {
err = report(id, err)
}
return err
}) })
} }
return wg.Wait() return wg.Wait()