Skip to content

Commit 2cc8518

Browse files
committed
make pools reusable
1 parent 8427ccd commit 2cc8518

8 files changed

Lines changed: 76 additions & 6 deletions

pool/error_pool.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,16 @@ func (p *ErrorPool) Go(f func() error) {
3535
// returning any errors from tasks.
3636
func (p *ErrorPool) Wait() error {
3737
p.pool.Wait()
38-
if len(p.errs) == 0 {
38+
39+
errs := p.errs
40+
p.errs = nil // reset errs
41+
42+
if len(errs) == 0 {
3943
return nil
4044
} else if p.onlyFirstError {
41-
return p.errs[0]
45+
return errs[0]
4246
} else {
43-
return multierror.Join(p.errs...)
47+
return multierror.Join(errs...)
4448
}
4549
}
4650

pool/error_pool_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,19 @@ func TestErrorPool(t *testing.T) {
117117
})
118118
}
119119
})
120+
121+
t.Run("reuse", func(t *testing.T) {
122+
// Test for https://github.com/sourcegraph/conc/issues/128
123+
p := pool.New().WithErrors()
124+
125+
p.Go(func() error { return err1 })
126+
wait1 := p.Wait()
127+
require.ErrorIs(t, wait1, err1)
128+
129+
p.Go(func() error { return err2 })
130+
wait2 := p.Wait()
131+
// On reuse, only the new error should be returned
132+
require.ErrorIs(t, wait2, err2)
133+
require.NotErrorIs(t, wait1, err2)
134+
})
120135
}

pool/result_context_pool.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
3232
// returns an error if any of the tasks errored.
3333
func (p *ResultContextPool[T]) Wait() ([]T, error) {
3434
err := p.contextPool.Wait()
35-
return p.agg.collect(p.collectErrored), err
35+
results := p.agg.collect(p.collectErrored)
36+
p.agg = resultAggregator[T]{}
37+
return results, err
3638
}
3739

3840
// WithCollectErrored configures the pool to still collect the result of a task

pool/result_context_pool_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,20 @@ func TestResultContextPool(t *testing.T) {
228228
})
229229
}
230230
})
231+
232+
t.Run("reuse", func(t *testing.T) {
233+
// Test for https://github.com/sourcegraph/conc/issues/128
234+
p := pool.NewWithResults[int]().WithContext(context.Background())
235+
236+
p.Go(func(context.Context) (int, error) { return 1, err1 })
237+
results1, errs1 := p.Wait()
238+
require.Empty(t, results1)
239+
require.ErrorIs(t, errs1, err1)
240+
241+
p.Go(func(context.Context) (int, error) { return 2, err2 })
242+
results2, errs2 := p.Wait()
243+
require.Empty(t, results2)
244+
require.ErrorIs(t, errs2, err2)
245+
require.NotErrorIs(t, errs2, err1)
246+
})
231247
}

pool/result_error_pool.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ func (p *ResultErrorPool[T]) Go(f func() (T, error)) {
3434
// returning the results and any errors from tasks.
3535
func (p *ResultErrorPool[T]) Wait() ([]T, error) {
3636
err := p.errorPool.Wait()
37-
return p.agg.collect(p.collectErrored), err
37+
results := p.agg.collect(p.collectErrored)
38+
p.agg = resultAggregator[T]{} // reset for reuse
39+
return results, err
3840
}
3941

4042
// WithCollectErrored configures the pool to still collect the result of a task

pool/result_error_pool_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,20 @@ func TestResultErrorPool(t *testing.T) {
130130
})
131131
}
132132
})
133+
134+
t.Run("reuse", func(t *testing.T) {
135+
// Test for https://github.com/sourcegraph/conc/issues/128
136+
p := pool.NewWithResults[int]().WithErrors()
137+
138+
p.Go(func() (int, error) { return 1, err1 })
139+
results1, errs1 := p.Wait()
140+
require.Empty(t, results1)
141+
require.ErrorIs(t, errs1, err1)
142+
143+
p.Go(func() (int, error) { return 2, err2 })
144+
results2, errs2 := p.Wait()
145+
require.Empty(t, results2)
146+
require.ErrorIs(t, errs2, err2)
147+
require.NotErrorIs(t, errs2, err1)
148+
})
133149
}

pool/result_pool.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ func (p *ResultPool[T]) Go(f func() T) {
4040
// a slice of results from tasks that did not panic.
4141
func (p *ResultPool[T]) Wait() []T {
4242
p.pool.Wait()
43-
return p.agg.collect(true)
43+
results := p.agg.collect(true)
44+
p.agg = resultAggregator[T]{} // reset for reuse
45+
return results
4446
}
4547

4648
// MaxGoroutines returns the maximum size of the pool.

pool/result_pool_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,17 @@ func TestResultGroup(t *testing.T) {
113113
})
114114
}
115115
})
116+
117+
t.Run("reuse", func(t *testing.T) {
118+
// Test for https://github.com/sourcegraph/conc/issues/128
119+
p := pool.NewWithResults[int]()
120+
121+
p.Go(func() int { return 1 })
122+
results1 := p.Wait()
123+
require.Equal(t, []int{1}, results1)
124+
125+
p.Go(func() int { return 2 })
126+
results2 := p.Wait()
127+
require.Equal(t, []int{2}, results2)
128+
})
116129
}

0 commit comments

Comments
 (0)