diff --git a/internal/repository/index/master_index.go b/internal/repository/index/master_index.go index a41119b58..c0a5095e3 100644 --- a/internal/repository/index/master_index.go +++ b/internal/repository/index/master_index.go @@ -459,27 +459,24 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked[restic. return nil }) - // a worker receives an index from ch, and saves the index - worker := func() error { - for idx := range saveCh { - idx.Finalize() - if len(idx.packs) == 0 { - continue - } - if _, err := idx.SaveIndex(wgCtx, repo); err != nil { - return err - } - } - return nil - } - + var savers errgroup.Group // encoding an index can take quite some time such that this can be CPU- or IO-bound // do not add repo.Connections() here as there are already the loader goroutines. - workerCount := runtime.GOMAXPROCS(0) - // run workers on ch - for i := 0; i < workerCount; i++ { - wg.Go(worker) + savers.SetLimit(runtime.GOMAXPROCS(0)) + + for idx := range saveCh { + savers.Go(func() error { + idx.Finalize() + if len(idx.packs) == 0 { + return nil + } + _, err := idx.SaveIndex(wgCtx, repo) + return err + }) } + + wg.Go(savers.Wait) + err := wg.Wait() p.Done() if err != nil { @@ -514,6 +511,8 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove obsolete := restic.NewIDSet() wg, wgCtx := errgroup.WithContext(ctx) + // keep concurrency bounded as we're on a fallback path + wg.SetLimit(1 + int(repo.Connections())) ch := make(chan *Index) wg.Go(func() error { @@ -553,23 +552,14 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove return nil }) - // a worker receives an index from ch, and saves the index - worker := func() error { - for idx := range ch { + for idx := range ch { + wg.Go(func() error { idx.Finalize() - if _, err := idx.SaveIndex(wgCtx, repo); err != nil { - return err - } - } - return nil + _, err := idx.SaveIndex(wgCtx, repo) + return err + }) } - // keep concurrency bounded as we're on a fallback path - workerCount := int(repo.Connections()) - // run workers on ch - for i := 0; i < workerCount; i++ { - wg.Go(worker) - } err := wg.Wait() p.Done() // the index no longer matches to stored state