Skip to content

Commit d799146

Browse files
committed
Requested changes
Signed-off-by: Antonio Navarro Perez <[email protected]>
1 parent 584edb3 commit d799146

File tree

1 file changed

+37
-24
lines changed

1 file changed

+37
-24
lines changed

compparallel.go

+37-24
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,19 @@ func NewComposableParallel(routers []*ParallelRouter) *ComposableParallel {
2828
func (r *ComposableParallel) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
2929
var wg sync.WaitGroup
3030
errCh := make(chan error)
31-
closeCh := make(chan bool)
3231
for _, r := range r.routers {
3332
r := r
3433
wg.Add(1)
3534
go func() {
3635
defer wg.Done()
36+
tim := time.NewTimer(r.ExecuteAfter)
37+
defer tim.Stop()
3738
select {
3839
case <-ctx.Done():
39-
if ctx.Err() != nil && !r.IgnoreError {
40+
if !r.IgnoreError {
4041
errCh <- ctx.Err()
4142
}
42-
case <-time.After(r.ExecuteAfter):
43+
case <-tim.C:
4344
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
4445
defer cancel()
4546
err := r.Router.Provide(ctx, cid, provide)
@@ -48,22 +49,18 @@ func (r *ComposableParallel) Provide(ctx context.Context, cid cid.Cid, provide b
4849
!r.IgnoreError {
4950
errCh <- err
5051
}
51-
case <-closeCh:
52-
return
5352
}
5453
}()
5554
}
5655

5756
go func() {
5857
wg.Wait()
59-
close(closeCh)
58+
close(errCh)
6059
}()
6160

6261
var errOut error
63-
select {
64-
case err := <-errCh:
62+
for err := range errCh {
6563
errOut = multierror.Append(errOut, err)
66-
case <-closeCh:
6764
}
6865

6966
return errOut
@@ -72,13 +69,18 @@ func (r *ComposableParallel) Provide(ctx context.Context, cid cid.Cid, provide b
7269
func (r *ComposableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
7370
addrChanOut := make(chan peer.AddrInfo)
7471
var totalCount int64
72+
var wg sync.WaitGroup
7573
for _, r := range r.routers {
7674
r := r
75+
wg.Add(1)
7776
go func() {
77+
wg.Done()
78+
tim := time.NewTimer(r.ExecuteAfter)
79+
defer tim.Stop()
7880
select {
7981
case <-ctx.Done():
8082
return
81-
case <-time.After(r.ExecuteAfter):
83+
case <-tim.C:
8284
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
8385
defer cancel()
8486
addrChan := r.Router.FindProvidersAsync(ctx, cid, count)
@@ -95,13 +97,23 @@ func (r *ComposableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid
9597
return
9698
}
9799

98-
addrChanOut <- addr
100+
select {
101+
case <-ctx.Done():
102+
return
103+
case addrChanOut <- addr:
104+
}
105+
99106
}
100107
}
101108
}
102109
}()
103110
}
104111

112+
go func() {
113+
wg.Wait()
114+
close(addrChanOut)
115+
}()
116+
105117
return addrChanOut
106118
}
107119

@@ -116,12 +128,14 @@ func (r *ComposableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.Add
116128
wg.Add(1)
117129
go func() {
118130
defer wg.Done()
131+
tim := time.NewTimer(r.ExecuteAfter)
132+
defer tim.Stop()
119133
select {
120134
case <-ctx.Done():
121-
if ctx.Err() != nil && !r.IgnoreError {
135+
if !r.IgnoreError {
122136
errCh <- ctx.Err()
123137
}
124-
case <-time.After(r.ExecuteAfter):
138+
case <-tim.C:
125139
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
126140
defer cancel()
127141
addr, err := r.Router.FindPeer(ctx, id)
@@ -163,18 +177,19 @@ func (r *ComposableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.Add
163177
func (r *ComposableParallel) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
164178
var wg sync.WaitGroup
165179
errCh := make(chan error)
166-
closeCh := make(chan bool)
167180
for _, r := range r.routers {
168181
r := r
169182
wg.Add(1)
170183
go func() {
171184
defer wg.Done()
185+
tim := time.NewTimer(r.ExecuteAfter)
186+
defer tim.Stop()
172187
select {
173188
case <-ctx.Done():
174-
if ctx.Err() != nil && !r.IgnoreError {
189+
if !r.IgnoreError {
175190
errCh <- ctx.Err()
176191
}
177-
case <-time.After(r.ExecuteAfter):
192+
case <-tim.C:
178193
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
179194
defer cancel()
180195
err := r.Router.PutValue(ctx, key, val, opts...)
@@ -183,22 +198,18 @@ func (r *ComposableParallel) PutValue(ctx context.Context, key string, val []byt
183198
!r.IgnoreError {
184199
errCh <- err
185200
}
186-
case <-closeCh:
187-
return
188201
}
189202
}()
190203
}
191204

192205
go func() {
193206
wg.Wait()
194-
close(closeCh)
207+
close(errCh)
195208
}()
196209

197210
var errOut error
198-
select {
199-
case err := <-errCh:
211+
for err := range errCh {
200212
errOut = multierror.Append(errOut, err)
201-
case <-closeCh:
202213
}
203214

204215
return errOut
@@ -215,12 +226,14 @@ func (r *ComposableParallel) GetValue(ctx context.Context, key string, opts ...r
215226
wg.Add(1)
216227
go func() {
217228
defer wg.Done()
229+
tim := time.NewTimer(r.ExecuteAfter)
230+
defer tim.Stop()
218231
select {
219232
case <-ctx.Done():
220-
if ctx.Err() != nil && !r.IgnoreError {
233+
if !r.IgnoreError {
221234
errCh <- ctx.Err()
222235
}
223-
case <-time.After(r.ExecuteAfter):
236+
case <-tim.C:
224237
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
225238
defer cancel()
226239
val, err := r.Router.GetValue(ctx, key, opts...)

0 commit comments

Comments
 (0)