internal/restic: Simplify ParallelRemove

This commit is contained in:
greatroar
2025-07-18 12:24:45 +02:00
parent 95a36b55f4
commit f7f6459eb9

View File

@@ -52,43 +52,31 @@ func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, f
return wg.Wait()
}
// ParallelRemove deletes the given fileList of fileType in parallel
// if callback returns an error, then it will abort.
// ParallelRemove deletes the given fileList of fileType in parallel.
// If report returns an error, it aborts.
func ParallelRemove[FT FileTypes](ctx context.Context, repo RemoverUnpacked[FT], fileList IDSet, fileType FT, report func(id ID, err error) error, bar *progress.Counter) error {
fileChan := make(chan ID)
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
defer close(fileChan)
for id := range fileList {
select {
case fileChan <- id:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
wg.SetLimit(int(repo.Connections())) // deleting files is IO-bound
bar.SetMax(uint64(len(fileList)))
// deleting files is IO-bound
workerCount := repo.Connections()
for i := 0; i < int(workerCount); i++ {
loop:
for id := range fileList {
select {
case <-ctx.Done():
break loop
default:
}
wg.Go(func() error {
for id := range fileChan {
err := repo.RemoveUnpacked(ctx, fileType, id)
if err == nil {
// increment counter only if no error
bar.Add(1)
}
if report != nil {
err = report(id, err)
}
if err != nil {
return err
}
err := repo.RemoveUnpacked(ctx, fileType, id)
if err == nil {
bar.Add(1)
}
return nil
if report != nil {
err = report(id, err)
}
return err
})
}
return wg.Wait()