Skip to content

Commit 468ccce

Browse files
authored
feat(mono/zip): Zip operator refinement and support ZipWith (#31)
* feat(mono/zip): Zip operator refinement and support ZipWith * refactor: rename some mono files * fix: ut
1 parent b50119b commit 468ccce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1532
-796
lines changed

error.go

-3
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ func IsCancelledError(err error) bool {
1818
if _, ok := err.(contextError); ok {
1919
return true
2020
}
21-
if _, ok := err.(*contextError); ok {
22-
return true
23-
}
2421
return false
2522
}
2623

error_test.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,15 @@ import (
1111
func TestContextError(t *testing.T) {
1212
fakeErr := errors.New("fake error")
1313
err := reactor.NewContextError(fakeErr)
14-
assert.True(t, errors.Cause(err) == fakeErr)
14+
assert.True(t, errors.Cause(err) == fakeErr, "should cause by fakeErr")
15+
assert.Equal(t, fakeErr.Error(), err.Error(), "bad error string")
16+
}
17+
18+
func TestIsCancelledError(t *testing.T) {
19+
fakeErr := errors.New("fake error")
20+
assert.False(t, reactor.IsCancelledError(nil))
21+
assert.False(t, reactor.IsCancelledError(fakeErr))
22+
assert.True(t, reactor.IsCancelledError(reactor.ErrSubscribeCancelled))
23+
ctxErr := reactor.NewContextError(errors.New("fake context error"))
24+
assert.True(t, reactor.IsCancelledError(ctxErr))
1525
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/jjeffcaii/reactor-go
33
go 1.12
44

55
require (
6+
github.com/golang/mock v1.4.4
67
github.com/panjf2000/ants/v2 v2.4.3
78
github.com/pkg/errors v0.9.1
89
github.com/stretchr/testify v1.6.1

go.sum

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
44
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
6+
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
57
github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU=
68
github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
79
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -13,6 +15,12 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
1315
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
1416
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
1517
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
18+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
19+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
20+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
21+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
22+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
23+
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
1624
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
1725
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1826
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=

internal/subscribers/do_finally.go

+3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ func (d *DoFinallySubscriber) Request(n int) {
4848
}
4949

5050
func (d *DoFinallySubscriber) Cancel() {
51+
if d.s == nil {
52+
return
53+
}
5154
d.s.Cancel()
5255
d.runFinally(reactor.SignalTypeCancel)
5356
}

internal/subscribers/switch_if_empty.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import (
77
"github.com/jjeffcaii/reactor-go"
88
)
99

10+
var (
11+
_ reactor.Subscriber = (*SwitchIfEmptySubscriber)(nil)
12+
_ reactor.Subscription = (*SwitchIfEmptySubscriber)(nil)
13+
)
14+
1015
type SwitchIfEmptySubscriber struct {
1116
ctx context.Context
1217
actual reactor.Subscriber
@@ -19,7 +24,7 @@ type SwitchIfEmptySubscriber struct {
1924

2025
func (s *SwitchIfEmptySubscriber) Request(n int) {
2126
if n < 1 {
22-
panic(reactor.ErrNegativeRequest)
27+
return
2328
}
2429
s.requested = n
2530
if s.su != nil {

internal/types.go

-8
This file was deleted.

mono/op_context.go mono/context.go

-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ type monoContext struct {
1212
kv []Any
1313
}
1414

15-
func (p *monoContext) Parent() reactor.RawPublisher {
16-
return p.source
17-
}
18-
1915
func (p *monoContext) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
2016
for i := 0; i < len(p.kv); i += 2 {
2117
ctx = context.WithValue(ctx, p.kv[i], p.kv[i+1])

mono/mono_create.go mono/create.go

+32-27
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mono
22

33
import (
44
"context"
5+
"math"
56
"sync"
67
"sync/atomic"
78

@@ -10,10 +11,27 @@ import (
1011
"github.com/pkg/errors"
1112
)
1213

13-
var _sinkPool = sync.Pool{
14-
New: func() interface{} {
15-
return new(sink)
16-
},
14+
var globalSinkPool sinkPool
15+
16+
type sinkPool struct {
17+
inner sync.Pool
18+
}
19+
20+
func (p *sinkPool) get() *sink {
21+
if exist, _ := p.inner.Get().(*sink); exist != nil {
22+
atomic.StoreInt32(&exist.stat, 0)
23+
return exist
24+
}
25+
return &sink{}
26+
}
27+
28+
func (p *sinkPool) put(s *sink) {
29+
if s == nil {
30+
return
31+
}
32+
s.actual = nil
33+
atomic.StoreInt32(&s.stat, math.MinInt32)
34+
p.inner.Put(s)
1735
}
1836

1937
type Sink interface {
@@ -30,18 +48,6 @@ type sink struct {
3048
stat int32
3149
}
3250

33-
func borrowSink(sub reactor.Subscriber) *sink {
34-
s := _sinkPool.Get().(*sink)
35-
atomic.StoreInt32(&s.stat, 0)
36-
s.actual = sub
37-
return s
38-
}
39-
40-
func returnSink(s *sink) {
41-
s.actual = nil
42-
_sinkPool.Put(s)
43-
}
44-
4551
func newMonoCreate(gen func(context.Context, Sink)) monoCreate {
4652
return monoCreate{
4753
sinker: func(ctx context.Context, sink Sink) {
@@ -79,29 +85,27 @@ func (s *sink) Success(v Any) {
7985
}
8086

8187
func (s *sink) Request(n int) {
82-
if n < 1 {
83-
panic(reactor.ErrNegativeRequest)
84-
}
88+
// ignore
8589
}
8690

8791
func (s *sink) Cancel() {
8892
atomic.CompareAndSwapInt32(&s.stat, 0, statCancel)
8993
}
9094

9195
func (s *sink) Complete() {
92-
defer returnSink(s)
96+
defer globalSinkPool.put(s)
9397
if atomic.CompareAndSwapInt32(&s.stat, 0, statComplete) {
9498
s.actual.OnComplete()
9599
}
96100
}
97101

98102
func (s *sink) Error(err error) {
99-
defer returnSink(s)
100-
if atomic.CompareAndSwapInt32(&s.stat, 0, statError) {
101-
s.actual.OnError(err)
103+
defer globalSinkPool.put(s)
104+
if !atomic.CompareAndSwapInt32(&s.stat, 0, statError) {
105+
hooks.Global().OnErrorDrop(err)
102106
return
103107
}
104-
hooks.Global().OnErrorDrop(err)
108+
s.actual.OnError(err)
105109
}
106110

107111
func (s *sink) Next(v Any) {
@@ -120,7 +124,8 @@ func (s *sink) Next(v Any) {
120124
}
121125

122126
func (m monoCreate) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
123-
sink := borrowSink(s)
124-
s.OnSubscribe(ctx, sink)
125-
m.sinker(ctx, sink)
127+
sk := globalSinkPool.get()
128+
sk.actual = s
129+
s.OnSubscribe(ctx, sk)
130+
m.sinker(ctx, sk)
126131
}

mono/create_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package mono
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/golang/mock/gomock"
9+
"github.com/jjeffcaii/reactor-go"
10+
)
11+
12+
func TestCreate_Panic(t *testing.T) {
13+
ctrl := gomock.NewController(t)
14+
defer ctrl.Finish()
15+
s := NewMockSubscriber(ctrl)
16+
17+
s.EXPECT().OnSubscribe(gomock.Any(), gomock.Any()).Do(MockRequestInfinite).Times(2)
18+
s.EXPECT().OnNext(gomock.Any()).Times(0)
19+
s.EXPECT().OnError(gomock.Any()).Times(2)
20+
s.EXPECT().OnComplete().Times(0)
21+
22+
newMonoCreate(func(ctx context.Context, s Sink) {
23+
panic("fake panic")
24+
}).SubscribeWith(context.Background(), s)
25+
26+
newMonoCreate(func(ctx context.Context, s Sink) {
27+
panic(errors.New("fake error"))
28+
}).SubscribeWith(context.Background(), s)
29+
}
30+
31+
func TestCreate_OnNextPanic(t *testing.T) {
32+
var cnt int
33+
onError := reactor.OnError(func(e error) {
34+
cnt++
35+
})
36+
m := newMonoCreate(func(ctx context.Context, s Sink) {
37+
s.Success(1)
38+
})
39+
m.SubscribeWith(context.Background(),
40+
reactor.NewSubscriber(
41+
reactor.OnNext(func(v reactor.Any) error {
42+
panic("fake panic")
43+
}),
44+
onError,
45+
))
46+
m.SubscribeWith(context.Background(),
47+
reactor.NewSubscriber(
48+
reactor.OnNext(func(v reactor.Any) error {
49+
panic(errors.New("fake error"))
50+
}),
51+
onError,
52+
))
53+
}

mono/mono_delay.go mono/delay.go

File renamed without changes.

mono/delay_element.go

-4
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,6 @@ type monoDelayElement struct {
105105
sc scheduler.Scheduler
106106
}
107107

108-
func (p *monoDelayElement) Parent() reactor.RawPublisher {
109-
return p.source
110-
}
111-
112108
func (p *monoDelayElement) SubscribeWith(ctx context.Context, actual reactor.Subscriber) {
113109
actual = newDelayElementSubscriber(actual, p.delay, p.sc)
114110
p.source.SubscribeWith(ctx, actual)

mono/do_finally.go

-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ type monoDoFinally struct {
1212
onFinally reactor.FnOnFinally
1313
}
1414

15-
func (m monoDoFinally) Parent() reactor.RawPublisher {
16-
return m.source
17-
}
18-
1915
func (m monoDoFinally) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
2016
m.source.SubscribeWith(ctx, subscribers.NewDoFinallySubscriber(s, m.onFinally))
2117
}

mono/do_finally_test.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,24 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/golang/mock/gomock"
78
"github.com/jjeffcaii/reactor-go"
89
"github.com/jjeffcaii/reactor-go/mono"
910
"github.com/stretchr/testify/assert"
1011
)
1112

1213
func TestDoFinally(t *testing.T) {
14+
const fakeValue = 42
15+
ctrl := gomock.NewController(t)
16+
defer ctrl.Finish()
17+
sub := mono.NewMockSubscriber(ctrl)
18+
sub.EXPECT().OnSubscribe(gomock.Any(), gomock.Any()).Do(mono.MockRequestInfinite).Times(1)
19+
sub.EXPECT().OnNext(gomock.Any()).Times(1)
20+
sub.EXPECT().OnError(gomock.Any()).Times(0)
21+
sub.EXPECT().OnComplete().Times(1)
22+
1323
var seq []int
14-
mono.Just(77778888).
24+
mono.Just(fakeValue).
1525
DoOnNext(func(v reactor.Any) error {
1626
seq = append(seq, 1)
1727
return nil
@@ -23,6 +33,6 @@ func TestDoFinally(t *testing.T) {
2333
seq = append(seq, 2)
2434
return nil
2535
}).
26-
Subscribe(context.Background())
36+
SubscribeWith(context.Background(), sub)
2737
assert.Equal(t, []int{1, 2, 3}, seq, "wrong execute order")
2838
}

mono/empty.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package mono
2+
3+
import (
4+
"context"
5+
6+
"github.com/jjeffcaii/reactor-go"
7+
"github.com/jjeffcaii/reactor-go/internal"
8+
)
9+
10+
type monoEmpty struct {
11+
}
12+
13+
func newMonoEmpty() monoEmpty {
14+
return monoEmpty{}
15+
}
16+
17+
func (m monoEmpty) SubscribeWith(ctx context.Context, actual reactor.Subscriber) {
18+
actual.OnSubscribe(ctx, internal.EmptySubscription)
19+
actual.OnComplete()
20+
}

mono/mono_error.go mono/error.go

File renamed without changes.

mono/op_filter.go mono/filter.go

-4
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ type monoFilter struct {
1919
f reactor.Predicate
2020
}
2121

22-
func (m monoFilter) Parent() reactor.RawPublisher {
23-
return m.s
24-
}
25-
2622
func newFilterSubscriber(actual reactor.Subscriber, predicate reactor.Predicate) *filterSubscriber {
2723
return &filterSubscriber{
2824
actual: actual,

mono/filter_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package mono
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/golang/mock/gomock"
8+
"github.com/jjeffcaii/reactor-go"
9+
)
10+
11+
func TestMonoFilter(t *testing.T) {
12+
ctrl := gomock.NewController(t)
13+
defer ctrl.Finish()
14+
sub := NewMockSubscriber(ctrl)
15+
16+
sub.EXPECT().OnNext(gomock.Eq(42)).Times(1)
17+
sub.EXPECT().OnComplete().Times(1)
18+
sub.EXPECT().OnError(gomock.Any()).Times(0)
19+
sub.EXPECT().OnSubscribe(gomock.Any(), gomock.Any()).Do(MockRequestInfinite).Times(1)
20+
21+
predicate := func(any reactor.Any) bool {
22+
return any.(int) > 0
23+
}
24+
newMonoFilter(newMonoJust(42), predicate).SubscribeWith(context.Background(), sub)
25+
26+
sub = NewMockSubscriber(ctrl)
27+
sub.EXPECT().OnNext(gomock.Any()).Times(0)
28+
sub.EXPECT().OnComplete().Times(1)
29+
sub.EXPECT().OnError(gomock.Any()).Times(0)
30+
sub.EXPECT().OnSubscribe(gomock.Any(), gomock.Any()).Do(MockRequestInfinite).Times(1)
31+
32+
newMonoFilter(newMonoJust(-42), predicate).SubscribeWith(context.Background(), sub)
33+
}

0 commit comments

Comments
 (0)