mirror of
https://github.com/restic/restic.git
synced 2025-12-12 03:22:07 +00:00
internal/repository/index: Simplify MasterIndex concurrency
This commit is contained in:
@@ -459,27 +459,24 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked[restic.
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
// a worker receives an index from ch, and saves the index
|
var savers errgroup.Group
|
||||||
worker := func() error {
|
|
||||||
for idx := range saveCh {
|
|
||||||
idx.Finalize()
|
|
||||||
if len(idx.packs) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, err := idx.SaveIndex(wgCtx, repo); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// encoding an index can take quite some time such that this can be CPU- or IO-bound
|
// encoding an index can take quite some time such that this can be CPU- or IO-bound
|
||||||
// do not add repo.Connections() here as there are already the loader goroutines.
|
// do not add repo.Connections() here as there are already the loader goroutines.
|
||||||
workerCount := runtime.GOMAXPROCS(0)
|
savers.SetLimit(runtime.GOMAXPROCS(0))
|
||||||
// run workers on ch
|
|
||||||
for i := 0; i < workerCount; i++ {
|
for idx := range saveCh {
|
||||||
wg.Go(worker)
|
savers.Go(func() error {
|
||||||
|
idx.Finalize()
|
||||||
|
if len(idx.packs) == 0 {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
_, err := idx.SaveIndex(wgCtx, repo)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Go(savers.Wait)
|
||||||
|
|
||||||
err := wg.Wait()
|
err := wg.Wait()
|
||||||
p.Done()
|
p.Done()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -514,6 +511,8 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove
|
|||||||
|
|
||||||
obsolete := restic.NewIDSet()
|
obsolete := restic.NewIDSet()
|
||||||
wg, wgCtx := errgroup.WithContext(ctx)
|
wg, wgCtx := errgroup.WithContext(ctx)
|
||||||
|
// keep concurrency bounded as we're on a fallback path
|
||||||
|
wg.SetLimit(1 + int(repo.Connections()))
|
||||||
|
|
||||||
ch := make(chan *Index)
|
ch := make(chan *Index)
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
@@ -553,23 +552,14 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
// a worker receives an index from ch, and saves the index
|
|
||||||
worker := func() error {
|
|
||||||
for idx := range ch {
|
for idx := range ch {
|
||||||
|
wg.Go(func() error {
|
||||||
idx.Finalize()
|
idx.Finalize()
|
||||||
if _, err := idx.SaveIndex(wgCtx, repo); err != nil {
|
_, err := idx.SaveIndex(wgCtx, repo)
|
||||||
return err
|
return err
|
||||||
}
|
})
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// keep concurrency bounded as we're on a fallback path
|
|
||||||
workerCount := int(repo.Connections())
|
|
||||||
// run workers on ch
|
|
||||||
for i := 0; i < workerCount; i++ {
|
|
||||||
wg.Go(worker)
|
|
||||||
}
|
|
||||||
err := wg.Wait()
|
err := wg.Wait()
|
||||||
p.Done()
|
p.Done()
|
||||||
// the index no longer matches to stored state
|
// the index no longer matches to stored state
|
||||||
|
|||||||
Reference in New Issue
Block a user