Skip to content

Commit 0c66f30

Browse files
authored
add oneshot mono api. (#11)
* add oneshot mono api.
1 parent fde7184 commit 0c66f30

9 files changed

+392
-80
lines changed

Diff for: mono/mono_bench_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package mono_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/jjeffcaii/reactor-go/mono"
8+
)
9+
10+
func BenchmarkJust(b *testing.B) {
11+
b.RunParallel(func(pb *testing.PB) {
12+
for pb.Next() {
13+
mono.Just(1).Subscribe(context.Background())
14+
}
15+
})
16+
}
17+
18+
func BenchmarkCreate(b *testing.B) {
19+
gen := func(i context.Context, sink mono.Sink) {
20+
sink.Success(1)
21+
}
22+
b.ResetTimer()
23+
b.RunParallel(func(pb *testing.PB) {
24+
for pb.Next() {
25+
mono.Create(gen).Subscribe(context.Background())
26+
}
27+
})
28+
}
29+
30+
func BenchmarkJustOneshot(b *testing.B) {
31+
b.RunParallel(func(pb *testing.PB) {
32+
for pb.Next() {
33+
mono.JustOneshot(1).Subscribe(context.Background())
34+
}
35+
})
36+
}
37+
38+
func BenchmarkCreateOneshot(b *testing.B) {
39+
gen := func(i context.Context, sink mono.Sink) {
40+
sink.Success(1)
41+
}
42+
b.ResetTimer()
43+
b.RunParallel(func(pb *testing.PB) {
44+
for pb.Next() {
45+
mono.CreateOneshot(gen).Subscribe(context.Background())
46+
}
47+
})
48+
}

Diff for: mono/mono_error.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88
)
99

1010
type monoError struct {
11-
e error
11+
inner error
1212
}
1313

14-
func (p monoError) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
14+
func (e monoError) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
1515
s.OnSubscribe(ctx, internal.EmptySubscription)
16-
s.OnError(p.e)
16+
s.OnError(e.inner)
1717
}
1818

1919
func newMonoError(e error) monoError {
20-
return monoError{e: e}
20+
return monoError{inner: e}
2121
}

Diff for: mono/mono_test.go

+125-43
Original file line numberDiff line numberDiff line change
@@ -290,49 +290,6 @@ func testContextDone(m mono.Mono, t *testing.T) {
290290
assert.Error(t, err, "should catch error")
291291
}
292292

293-
func BenchmarkNative(b *testing.B) {
294-
var sum int64
295-
b.ResetTimer()
296-
b.RunParallel(func(pb *testing.PB) {
297-
for pb.Next() {
298-
var v Any = int64(1)
299-
atomic.AddInt64(&sum, v.(int64))
300-
}
301-
})
302-
}
303-
304-
func BenchmarkJust(b *testing.B) {
305-
var sum int64
306-
m := mono.Just(int64(1))
307-
b.ResetTimer()
308-
b.RunParallel(func(pb *testing.PB) {
309-
s := reactor.NewSubscriber(reactor.OnNext(func(v Any) error {
310-
atomic.AddInt64(&sum, v.(int64))
311-
return nil
312-
}))
313-
for pb.Next() {
314-
m.SubscribeWith(context.Background(), s)
315-
}
316-
})
317-
}
318-
319-
func BenchmarkCreate(b *testing.B) {
320-
var sum int64
321-
m := mono.Create(func(i context.Context, sink mono.Sink) {
322-
sink.Success(int64(1))
323-
})
324-
b.ResetTimer()
325-
b.RunParallel(func(pb *testing.PB) {
326-
s := reactor.NewSubscriber(reactor.OnNext(func(v Any) error {
327-
atomic.AddInt64(&sum, v.(int64))
328-
return nil
329-
}))
330-
for pb.Next() {
331-
m.SubscribeWith(context.Background(), s)
332-
}
333-
})
334-
}
335-
336293
func TestError(t *testing.T) {
337294
mockErr := errors.New("this is a mock error")
338295
var sig reactor.SignalType
@@ -438,3 +395,128 @@ func TestBlock(t *testing.T) {
438395
assert.NoError(t, err)
439396
assert.Nil(t, v)
440397
}
398+
399+
func TestOneshot(t *testing.T) {
400+
for _, m := range []mono.Mono{
401+
mono.JustOneshot(1),
402+
mono.CreateOneshot(func(ctx context.Context, s mono.Sink) {
403+
s.Success(1)
404+
}),
405+
} {
406+
result, err := m.
407+
Map(func(any reactor.Any) (reactor.Any, error) {
408+
return any.(int) * 2, nil
409+
}).
410+
DoOnNext(func(v reactor.Any) error {
411+
assert.Equal(t, 2, v.(int))
412+
return nil
413+
}).
414+
DoOnError(func(e error) {
415+
assert.FailNow(t, "unreachable")
416+
}).
417+
Block(context.Background())
418+
assert.NoError(t, err)
419+
assert.Equal(t, 2, result)
420+
}
421+
422+
n, err := mono.JustOneshot(1).
423+
FlatMap(func(any reactor.Any) mono.Mono {
424+
return mono.Just(any.(int) * 3)
425+
}).
426+
Block(context.Background())
427+
assert.NoError(t, err)
428+
assert.Equal(t, 3, n)
429+
430+
discardCalls := new(int32)
431+
432+
mono.
433+
CreateOneshot(func(ctx context.Context, s mono.Sink) {
434+
s.Success(111)
435+
}).
436+
Filter(func(any reactor.Any) bool {
437+
return any.(int) > 222
438+
}).
439+
DoOnDiscard(func(v reactor.Any) {
440+
assert.Equal(t, 111, v)
441+
atomic.AddInt32(discardCalls, 1)
442+
}).
443+
SwitchIfEmpty(mono.Just(333)).
444+
DoOnNext(func(v reactor.Any) error {
445+
assert.Equal(t, 333, v)
446+
return nil
447+
}).
448+
Subscribe(context.Background())
449+
450+
assert.Equal(t, int32(1), atomic.LoadInt32(discardCalls))
451+
452+
_, err = mono.
453+
CreateOneshot(func(ctx context.Context, s mono.Sink) {
454+
time.Sleep(100 * time.Millisecond)
455+
s.Success(1)
456+
}).
457+
Timeout(50 * time.Millisecond).
458+
Block(context.Background())
459+
assert.Error(t, err)
460+
assert.True(t, reactor.IsCancelledError(err))
461+
462+
done := make(chan struct{})
463+
now := time.Now()
464+
mono.Just(1).
465+
DoFinally(func(s reactor.SignalType) {
466+
close(done)
467+
}).
468+
DelayElement(100*time.Millisecond).
469+
Subscribe(context.Background(), reactor.OnNext(func(v reactor.Any) error {
470+
assert.True(t, time.Since(now)/1e6 >= 100)
471+
return nil
472+
}))
473+
<-done
474+
}
475+
476+
func TestErrorOneshot(t *testing.T) {
477+
fakeErr := errors.New("fake error")
478+
finallyCalls := new(int32)
479+
subscribeCalls := new(int32)
480+
_, err := mono.
481+
ErrorOneshot(fakeErr).
482+
DoFinally(func(s reactor.SignalType) {
483+
atomic.AddInt32(finallyCalls, 1)
484+
}).
485+
DoOnSubscribe(func(ctx context.Context, su reactor.Subscription) {
486+
atomic.AddInt32(subscribeCalls, 1)
487+
}).
488+
DoOnCancel(func() {
489+
assert.FailNow(t, "unreachable")
490+
}).
491+
DoOnNext(func(v reactor.Any) error {
492+
assert.FailNow(t, "unreachable")
493+
return nil
494+
}).
495+
DoOnError(func(e error) {
496+
assert.Equal(t, fakeErr, e)
497+
}).
498+
SubscribeOn(scheduler.Parallel()).
499+
Block(context.Background())
500+
assert.Error(t, err, "should return error")
501+
assert.Equal(t, fakeErr, err)
502+
assert.Equal(t, int32(1), atomic.LoadInt32(finallyCalls))
503+
assert.Equal(t, int32(1), atomic.LoadInt32(subscribeCalls))
504+
}
505+
506+
func TestIsSubscribeOnParallel(t *testing.T) {
507+
assert.False(t, mono.IsSubscribeAsync(mono.Just(1)))
508+
assert.True(t, mono.IsSubscribeAsync(mono.Just(1).SubscribeOn(scheduler.Parallel())))
509+
assert.True(t, mono.IsSubscribeAsync(mono.Just(1).SubscribeOn(scheduler.Single())))
510+
assert.True(t, mono.IsSubscribeAsync(mono.JustOneshot(1).SubscribeOn(scheduler.Elastic())))
511+
}
512+
513+
func TestJust(t *testing.T) {
514+
assert.Panics(t, func() {
515+
mono.Just(nil)
516+
})
517+
assert.Panics(t, func() {
518+
mono.JustOneshot(nil)
519+
})
520+
assert.NotNil(t, mono.Just(1))
521+
assert.NotNil(t, mono.JustOneshot(1))
522+
}

Diff for: mono/processor_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mono_test
22

33
import (
44
"context"
5+
"errors"
56
"testing"
67
"time"
78

@@ -51,3 +52,19 @@ func TestProcessor_Context(t *testing.T) {
5152
Subscribe(ctx)
5253
<-done
5354
}
55+
56+
func TestProcessor_Error(t *testing.T) {
57+
fakeErr := errors.New("fake error")
58+
p := mono.CreateProcessor()
59+
done := make(chan error, 1)
60+
p.
61+
DoOnError(func(e error) {
62+
done <- e
63+
}).
64+
Subscribe(context.Background())
65+
66+
time.Sleep(100 * time.Millisecond)
67+
p.Error(fakeErr)
68+
e := <-done
69+
assert.Equal(t, fakeErr, e)
70+
}

Diff for: mono/utils.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@ package mono
22

33
import (
44
"context"
5-
"errors"
65
"time"
76
)
87

98
var empty = wrap(newMonoJust(nil))
10-
var errJustNilValue = errors.New("require non nil value")
9+
var _errJustNilValue = "require non nil value"
1110

1211
func Error(e error) Mono {
1312
return wrap(newMonoError(e))
1413
}
1514

15+
func ErrorOneshot(e error) Mono {
16+
return borrowOneshotWrapper(newMonoError(e))
17+
}
18+
1619
func Empty() Mono {
1720
return empty
1821
}
@@ -26,15 +29,26 @@ func JustOrEmpty(v Any) Mono {
2629

2730
func Just(v Any) Mono {
2831
if v == nil {
29-
panic(errJustNilValue)
32+
panic(_errJustNilValue)
3033
}
3134
return wrap(newMonoJust(v))
3235
}
3336

37+
func JustOneshot(v Any) Mono {
38+
if v == nil {
39+
panic(_errJustNilValue)
40+
}
41+
return borrowOneshotWrapper(newMonoJust(v))
42+
}
43+
3444
func Create(gen func(ctx context.Context, s Sink)) Mono {
3545
return wrap(newMonoCreate(gen))
3646
}
3747

48+
func CreateOneshot(gen func(ctx context.Context, s Sink)) Mono {
49+
return borrowOneshotWrapper(newMonoCreate(gen))
50+
}
51+
3852
func Delay(delay time.Duration) Mono {
3953
return wrap(newMonoDelay(delay))
4054
}

Diff for: mono/wrapper.go

+4-30
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"time"
77

88
"github.com/jjeffcaii/reactor-go"
9-
"github.com/jjeffcaii/reactor-go/internal/subscribers"
109
"github.com/jjeffcaii/reactor-go/scheduler"
1110
)
1211

@@ -69,7 +68,7 @@ func (p wrapper) DoOnSubscribe(fn reactor.FnOnSubscribe) Mono {
6968
}
7069

7170
func (p wrapper) DelayElement(delay time.Duration) Mono {
72-
return wrap(newMonoDelayElement(p.RawPublisher, delay, scheduler.Elastic()))
71+
return wrap(newMonoDelayElement(p.RawPublisher, delay, scheduler.Parallel()))
7372
}
7473

7574
func (p wrapper) Timeout(timeout time.Duration) Mono {
@@ -80,40 +79,15 @@ func (p wrapper) Timeout(timeout time.Duration) Mono {
8079
}
8180

8281
func (p wrapper) Block(ctx context.Context) (Any, error) {
83-
done := make(chan struct{})
84-
vchan := make(chan reactor.Any, 1)
85-
echan := make(chan error, 1)
86-
b := subscribers.NewBlockSubscriber(done, vchan, echan)
87-
p.SubscribeWith(ctx, b)
88-
<-done
89-
90-
defer close(vchan)
91-
defer close(echan)
92-
93-
select {
94-
case value := <-vchan:
95-
return value, nil
96-
case err := <-echan:
97-
return nil, err
98-
default:
99-
return nil, nil
100-
}
82+
return block(ctx, p.RawPublisher)
10183
}
10284

10385
func (p wrapper) Success(v Any) {
104-
p.mustProcessor().Success(v)
86+
mustProcessor(p.RawPublisher).Success(v)
10587
}
10688

10789
func (p wrapper) Error(e error) {
108-
p.mustProcessor().Error(e)
109-
}
110-
111-
func (p wrapper) mustProcessor() *processor {
112-
pp, ok := p.RawPublisher.(*processor)
113-
if !ok {
114-
panic(errNotProcessor)
115-
}
116-
return pp
90+
mustProcessor(p.RawPublisher).Error(e)
11791
}
11892

11993
func wrap(r reactor.RawPublisher) wrapper {

0 commit comments

Comments
 (0)