Skip to content

Commit cd3e478

Browse files
authored
Merge pull request #58 from libp2p/feature/paralel-sequential-routers
2 parents dfd16c1 + 568a01c commit cd3e478

7 files changed

+1430
-0
lines changed

compconfig.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package routinghelpers
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/libp2p/go-libp2p/core/routing"
8+
"github.com/multiformats/go-multihash"
9+
)
10+
11+
type ParallelRouter struct {
12+
Timeout time.Duration
13+
IgnoreError bool
14+
Router routing.Routing
15+
ExecuteAfter time.Duration
16+
}
17+
18+
type SequentialRouter struct {
19+
Timeout time.Duration
20+
IgnoreError bool
21+
Router routing.Routing
22+
}
23+
24+
type ProvideManyRouter interface {
25+
ProvideMany(ctx context.Context, keys []multihash.Multihash) error
26+
Ready() bool
27+
}

compparallel.go

+323
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
package routinghelpers
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/hashicorp/go-multierror"
11+
"github.com/ipfs/go-cid"
12+
"github.com/libp2p/go-libp2p/core/peer"
13+
"github.com/libp2p/go-libp2p/core/routing"
14+
"github.com/multiformats/go-multihash"
15+
)
16+
17+
var _ routing.Routing = &composableParallel{}
18+
var _ ProvideManyRouter = &composableParallel{}
19+
20+
type composableParallel struct {
21+
routers []*ParallelRouter
22+
}
23+
24+
// NewComposableParallel creates a Router that will execute methods from provided Routers in parallel.
25+
// On all methods, If IgnoreError flag is set, that Router will not stop the entire execution.
26+
// On all methods, If ExecuteAfter is set, that Router will be executed after the timer.
27+
// Router specific timeout will start counting AFTER the ExecuteAfter timer.
28+
func NewComposableParallel(routers []*ParallelRouter) *composableParallel {
29+
return &composableParallel{
30+
routers: routers,
31+
}
32+
}
33+
34+
// Provide will call all Routers in parallel.
35+
func (r *composableParallel) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
36+
return executeParallel(ctx, r.routers,
37+
func(ctx context.Context, r routing.Routing) error {
38+
return r.Provide(ctx, cid, provide)
39+
},
40+
)
41+
}
42+
43+
// ProvideMany will call all supported Routers in parallel.
44+
func (r *composableParallel) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
45+
return executeParallel(ctx, r.routers,
46+
func(ctx context.Context, r routing.Routing) error {
47+
pm, ok := r.(ProvideManyRouter)
48+
if !ok {
49+
return nil
50+
}
51+
return pm.ProvideMany(ctx, keys)
52+
},
53+
)
54+
}
55+
56+
// Ready will call all supported ProvideMany Routers SEQUENTIALLY.
57+
// If some of them are not ready, this method will return false.
58+
func (r *composableParallel) Ready() bool {
59+
for _, ro := range r.routers {
60+
pm, ok := ro.Router.(ProvideManyRouter)
61+
if !ok {
62+
continue
63+
}
64+
65+
if !pm.Ready() {
66+
return false
67+
}
68+
}
69+
70+
return true
71+
}
72+
73+
// FindProvidersAsync will execute all Routers in parallel, iterating results from them in unspecified order.
74+
// If count is set, only that amount of elements will be returned without any specification about from what router is obtained.
75+
// To gather providers from a set of Routers first, you can use the ExecuteAfter timer to delay some Router execution.
76+
func (r *composableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
77+
var totalCount int64
78+
ch, _ := getChannelOrErrorParallel(
79+
ctx,
80+
r.routers,
81+
func(ctx context.Context, r routing.Routing) (<-chan peer.AddrInfo, error) {
82+
return r.FindProvidersAsync(ctx, cid, count), nil
83+
},
84+
func() bool {
85+
return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0
86+
},
87+
)
88+
89+
return ch
90+
}
91+
92+
// FindPeer will execute all Routers in parallel, getting the first AddrInfo found and cancelling all other Router calls.
93+
func (r *composableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
94+
return getValueOrErrorParallel(ctx, r.routers,
95+
func(ctx context.Context, r routing.Routing) (peer.AddrInfo, bool, error) {
96+
addr, err := r.FindPeer(ctx, id)
97+
return addr, addr.ID == "", err
98+
},
99+
)
100+
}
101+
102+
// PutValue will execute all Routers in parallel. If a Router fails and IgnoreError flag is not set, the whole execution will fail.
103+
// Some Puts before the failure might be successful, even if we return an error.
104+
func (r *composableParallel) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
105+
return executeParallel(ctx, r.routers,
106+
func(ctx context.Context, r routing.Routing) error {
107+
return r.PutValue(ctx, key, val, opts...)
108+
},
109+
)
110+
}
111+
112+
// GetValue will execute all Routers in parallel. The first value found will be returned, cancelling all other executions.
113+
func (r *composableParallel) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
114+
return getValueOrErrorParallel(ctx, r.routers,
115+
func(ctx context.Context, r routing.Routing) ([]byte, bool, error) {
116+
val, err := r.GetValue(ctx, key, opts...)
117+
return val, len(val) == 0, err
118+
})
119+
}
120+
121+
func (r *composableParallel) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
122+
return getChannelOrErrorParallel(
123+
ctx,
124+
r.routers,
125+
func(ctx context.Context, r routing.Routing) (<-chan []byte, error) {
126+
return r.SearchValue(ctx, key, opts...)
127+
},
128+
func() bool { return false },
129+
)
130+
}
131+
132+
func (r *composableParallel) Bootstrap(ctx context.Context) error {
133+
return executeParallel(ctx, r.routers,
134+
func(ctx context.Context, r routing.Routing) error {
135+
return r.Bootstrap(ctx)
136+
})
137+
}
138+
139+
func getValueOrErrorParallel[T any](
140+
ctx context.Context,
141+
routers []*ParallelRouter,
142+
f func(context.Context, routing.Routing) (T, bool, error),
143+
) (value T, err error) {
144+
outCh := make(chan T)
145+
errCh := make(chan error)
146+
147+
// global cancel context to stop early other router's execution.
148+
ctx, cancelAll := context.WithCancel(ctx)
149+
defer cancelAll()
150+
var wg sync.WaitGroup
151+
for _, r := range routers {
152+
wg.Add(1)
153+
go func(r *ParallelRouter) {
154+
defer wg.Done()
155+
tim := time.NewTimer(r.ExecuteAfter)
156+
defer tim.Stop()
157+
select {
158+
case <-ctx.Done():
159+
case <-tim.C:
160+
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
161+
defer cancel()
162+
value, empty, err := f(ctx, r.Router)
163+
if err != nil &&
164+
!errors.Is(err, routing.ErrNotFound) &&
165+
!r.IgnoreError {
166+
select {
167+
case <-ctx.Done():
168+
case errCh <- err:
169+
}
170+
return
171+
}
172+
if empty {
173+
return
174+
}
175+
select {
176+
case <-ctx.Done():
177+
return
178+
case outCh <- value:
179+
}
180+
}
181+
}(r)
182+
}
183+
184+
// goroutine closing everything when finishing execution
185+
go func() {
186+
wg.Wait()
187+
close(outCh)
188+
close(errCh)
189+
}()
190+
191+
select {
192+
case out, ok := <-outCh:
193+
if !ok {
194+
return value, routing.ErrNotFound
195+
}
196+
return out, nil
197+
case err, ok := <-errCh:
198+
if !ok {
199+
return value, routing.ErrNotFound
200+
}
201+
return value, err
202+
case <-ctx.Done():
203+
return value, ctx.Err()
204+
}
205+
}
206+
207+
func executeParallel(
208+
ctx context.Context,
209+
routers []*ParallelRouter,
210+
f func(context.Context, routing.Routing,
211+
) error) error {
212+
var wg sync.WaitGroup
213+
errCh := make(chan error)
214+
for _, r := range routers {
215+
wg.Add(1)
216+
go func(r *ParallelRouter) {
217+
defer wg.Done()
218+
tim := time.NewTimer(r.ExecuteAfter)
219+
defer tim.Stop()
220+
select {
221+
case <-ctx.Done():
222+
if !r.IgnoreError {
223+
errCh <- ctx.Err()
224+
}
225+
case <-tim.C:
226+
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
227+
defer cancel()
228+
err := f(ctx, r.Router)
229+
if err != nil &&
230+
!r.IgnoreError {
231+
errCh <- err
232+
}
233+
}
234+
}(r)
235+
}
236+
237+
go func() {
238+
wg.Wait()
239+
close(errCh)
240+
}()
241+
242+
var errOut error
243+
for err := range errCh {
244+
errOut = multierror.Append(errOut, err)
245+
}
246+
247+
return errOut
248+
}
249+
250+
func getChannelOrErrorParallel[T any](
251+
ctx context.Context,
252+
routers []*ParallelRouter,
253+
f func(context.Context, routing.Routing) (<-chan T, error),
254+
shouldStop func() bool,
255+
) (chan T, error) {
256+
outCh := make(chan T)
257+
errCh := make(chan error)
258+
var wg sync.WaitGroup
259+
ctx, cancelAll := context.WithCancel(ctx)
260+
for _, r := range routers {
261+
wg.Add(1)
262+
go func(r *ParallelRouter) {
263+
defer wg.Done()
264+
tim := time.NewTimer(r.ExecuteAfter)
265+
defer tim.Stop()
266+
select {
267+
case <-ctx.Done():
268+
return
269+
case <-tim.C:
270+
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
271+
defer cancel()
272+
valueChan, err := f(ctx, r.Router)
273+
if err != nil && !r.IgnoreError {
274+
select {
275+
case <-ctx.Done():
276+
case errCh <- err:
277+
}
278+
return
279+
}
280+
for {
281+
select {
282+
case <-ctx.Done():
283+
return
284+
case val, ok := <-valueChan:
285+
if !ok {
286+
return
287+
}
288+
289+
if shouldStop() {
290+
return
291+
}
292+
293+
select {
294+
case <-ctx.Done():
295+
return
296+
case outCh <- val:
297+
}
298+
}
299+
}
300+
}
301+
}(r)
302+
}
303+
304+
// goroutine closing everything when finishing execution
305+
go func() {
306+
wg.Wait()
307+
close(outCh)
308+
close(errCh)
309+
cancelAll()
310+
}()
311+
312+
select {
313+
case err, ok := <-errCh:
314+
if !ok {
315+
return nil, routing.ErrNotFound
316+
}
317+
return nil, err
318+
case <-ctx.Done():
319+
return nil, ctx.Err()
320+
default:
321+
return outCh, nil
322+
}
323+
}

0 commit comments

Comments
 (0)