Skip to content

Commit b50119b

Browse files
authored
fix(mono): add panic recover for most of Mono operations (#30)
* fix(mono): add panic recover for most of the Mono operations * fix: lint
1 parent 4fb36b5 commit b50119b

15 files changed

+386
-54
lines changed

flux/op_filter.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/jjeffcaii/reactor-go"
77
"github.com/jjeffcaii/reactor-go/internal"
8+
"github.com/pkg/errors"
89
)
910

1011
type fluxFilter struct {
@@ -42,11 +43,23 @@ func (p *filterSubscriber) OnError(err error) {
4243
}
4344

4445
func (p *filterSubscriber) OnNext(v Any) {
46+
if p.f == nil {
47+
p.OnError(errors.New("the Filter predicate is nil"))
48+
return
49+
}
50+
4551
defer func() {
46-
if err := internal.TryRecoverError(recover()); err != nil {
47-
p.OnError(err)
52+
rec := recover()
53+
if rec == nil {
54+
return
55+
}
56+
if e, ok := rec.(error); ok {
57+
p.OnError(errors.WithStack(e))
58+
} else {
59+
p.OnError(errors.Errorf("%v", rec))
4860
}
4961
}()
62+
5063
if p.f(v) {
5164
p.actual.OnNext(v)
5265
return

internal/misc.go

-15
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,11 @@ package internal
22

33
import (
44
"errors"
5-
"fmt"
65
)
76

87
var ErrCallOnSubscribeDuplicated = errors.New("call OnSubscribe duplicated")
98
var EmptySubscription = emptySubscription{}
109

11-
func TryRecoverError(re interface{}) error {
12-
if re == nil {
13-
return nil
14-
}
15-
switch e := re.(type) {
16-
case error:
17-
return e
18-
case string:
19-
return errors.New(e)
20-
default:
21-
return fmt.Errorf("%s", e)
22-
}
23-
}
24-
2510
type emptySubscription struct {
2611
}
2712

internal/misc_test.go

-8
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
11
package internal_test
22

33
import (
4-
"errors"
54
"testing"
65

76
"github.com/jjeffcaii/reactor-go/internal"
87
"github.com/stretchr/testify/assert"
98
)
109

11-
func TestTryRecoverError(t *testing.T) {
12-
fakeErr := errors.New("fake error")
13-
assert.Equal(t, fakeErr, internal.TryRecoverError(fakeErr))
14-
assert.Error(t, internal.TryRecoverError("fake error"))
15-
assert.Error(t, internal.TryRecoverError(123))
16-
}
17-
1810
func TestEmptySubscription(t *testing.T) {
1911
assert.NotPanics(t, func() {
2012
internal.EmptySubscription.Cancel()

internal/subscribers/switch_if_empty.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package subscribers
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/jjeffcaii/reactor-go"
78
)
@@ -54,12 +55,17 @@ func (s *SwitchIfEmptySubscriber) OnSubscribe(ctx context.Context, su reactor.Su
5455
}
5556

5657
func (s *SwitchIfEmptySubscriber) OnComplete() {
57-
if !s.nextOnce {
58-
s.nextOnce = true
59-
s.other.SubscribeWith(s.ctx, s)
60-
} else {
58+
if s.nextOnce {
6159
s.actual.OnComplete()
60+
return
6261
}
62+
s.nextOnce = true
63+
if s.other == nil {
64+
s.actual.OnError(errors.New("the alternative SwitchIfEmpty Mono is nil"))
65+
} else {
66+
s.other.SubscribeWith(s.ctx, s)
67+
}
68+
6369
}
6470

6571
func NewSwitchIfEmptySubscriber(alternative reactor.RawPublisher, actual reactor.Subscriber) *SwitchIfEmptySubscriber {

internal/subscribers/switch_value_if_error.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ func (s *SwitchValueIfErrorSubscriber) Cancel() {
4141
func (s *SwitchValueIfErrorSubscriber) OnError(err error) {
4242
if atomic.AddInt32(&s.errorCalls, 1) == 1 {
4343
hooks.Global().OnErrorDrop(err)
44-
s.actual.OnNext(s.v)
44+
if s.v != nil {
45+
s.actual.OnNext(s.v)
46+
}
4547
s.actual.OnComplete()
4648
} else {
4749
s.actual.OnError(err)

mono/mono_create.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,14 @@ package mono
22

33
import (
44
"context"
5-
"errors"
65
"sync"
76
"sync/atomic"
87

98
"github.com/jjeffcaii/reactor-go"
109
"github.com/jjeffcaii/reactor-go/hooks"
11-
"github.com/jjeffcaii/reactor-go/internal"
10+
"github.com/pkg/errors"
1211
)
1312

14-
var _errRunSinkFailed = errors.New("execute creation func failed")
15-
1613
var _sinkPool = sync.Pool{
1714
New: func() interface{} {
1815
return new(sink)
@@ -49,8 +46,14 @@ func newMonoCreate(gen func(context.Context, Sink)) monoCreate {
4946
return monoCreate{
5047
sinker: func(ctx context.Context, sink Sink) {
5148
defer func() {
52-
if e := recover(); e != nil {
53-
sink.Error(_errRunSinkFailed)
49+
rec := recover()
50+
if rec == nil {
51+
return
52+
}
53+
if e, ok := rec.(error); ok {
54+
sink.Error(errors.WithStack(e))
55+
} else {
56+
sink.Error(errors.Errorf("%v", rec))
5457
}
5558
}()
5659

@@ -103,8 +106,14 @@ func (s *sink) Error(err error) {
103106

104107
func (s *sink) Next(v Any) {
105108
defer func() {
106-
if err := internal.TryRecoverError(recover()); err != nil {
107-
s.Error(err)
109+
rec := recover()
110+
if rec == nil {
111+
return
112+
}
113+
if e, ok := rec.(error); ok {
114+
s.Error(errors.WithStack(e))
115+
} else {
116+
s.Error(errors.Errorf("%v", rec))
108117
}
109118
}()
110119
s.actual.OnNext(v)

mono/mono_just.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"sync/atomic"
77

88
"github.com/jjeffcaii/reactor-go"
9-
"github.com/jjeffcaii/reactor-go/internal"
9+
"github.com/pkg/errors"
1010
)
1111

1212
var _justSubscriptionPool = sync.Pool{
@@ -64,10 +64,15 @@ func (j *justSubscription) Request(n int) {
6464
return
6565
}
6666
defer func() {
67-
if err := internal.TryRecoverError(recover()); err != nil {
68-
j.actual.OnError(err)
69-
} else {
67+
rec := recover()
68+
if rec == nil {
7069
j.actual.OnComplete()
70+
return
71+
}
72+
if e, ok := rec.(error); ok {
73+
j.actual.OnError(errors.WithStack(e))
74+
} else {
75+
j.actual.OnError(errors.Errorf("%v", rec))
7176
}
7277
}()
7378
j.actual.OnNext(j.parent.value)

mono/mono_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package mono_test
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"sync/atomic"
87
"testing"
@@ -12,6 +11,7 @@ import (
1211
"github.com/jjeffcaii/reactor-go/hooks"
1312
"github.com/jjeffcaii/reactor-go/mono"
1413
"github.com/jjeffcaii/reactor-go/scheduler"
14+
"github.com/pkg/errors"
1515
"github.com/stretchr/testify/assert"
1616
)
1717

@@ -144,7 +144,7 @@ func testPanic(m mono.Mono, t *testing.T) {
144144
in.DoOnError(func(e error) {
145145
catches = e
146146
}).Subscribe(context.Background())
147-
assert.Equal(t, fakeErr, catches, "not that error")
147+
assert.Equal(t, fakeErr, errors.Cause(catches), "not that error")
148148
}
149149
checker(m.DoOnNext(func(v Any) error {
150150
return fakeErr

mono/mono_zip_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,41 @@ func TestZip_context(t *testing.T) {
121121
assert.Error(t, err)
122122
assert.True(t, reactor.IsCancelledError(err))
123123
}
124+
125+
func TestZip_EdgeCase(t *testing.T) {
126+
var (
127+
nextCnt = new(int32)
128+
completeCnt = new(int32)
129+
errorCnt = new(int32)
130+
)
131+
mono.Zip(mono.JustOneshot("1"), mono.JustOneshot("2")).
132+
FlatMap(func(any reactor.Any) mono.Mono {
133+
if any != nil {
134+
return mono.Zip(mono.JustOneshot("333"), mono.JustOneshot("44444444")).
135+
Filter(func(any reactor.Any) bool {
136+
panic("fake panic")
137+
}).
138+
Map(func(any reactor.Any) (reactor.Any, error) {
139+
panic("ddddddd")
140+
})
141+
}
142+
return mono.JustOneshot("dddd")
143+
}).
144+
Subscribe(context.Background(),
145+
reactor.OnNext(func(v reactor.Any) error {
146+
atomic.AddInt32(nextCnt, 1)
147+
return nil
148+
}),
149+
reactor.OnError(func(e error) {
150+
atomic.AddInt32(errorCnt, 1)
151+
t.Logf("%v", e)
152+
}),
153+
reactor.OnComplete(func() {
154+
atomic.AddInt32(completeCnt, 1)
155+
}),
156+
)
157+
158+
assert.Equal(t, int32(0), atomic.LoadInt32(nextCnt), "next count should be zero")
159+
assert.Equal(t, int32(1), atomic.LoadInt32(errorCnt), "error count should be 1")
160+
assert.Equal(t, int32(0), atomic.LoadInt32(completeCnt), "complete count should be zero")
161+
}

mono/op_filter.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/jjeffcaii/reactor-go"
77
"github.com/jjeffcaii/reactor-go/internal"
8+
"github.com/pkg/errors"
89
)
910

1011
type filterSubscriber struct {
@@ -46,8 +47,14 @@ func (f *filterSubscriber) OnError(err error) {
4647

4748
func (f *filterSubscriber) OnNext(v Any) {
4849
defer func() {
49-
if err := internal.TryRecoverError(recover()); err != nil {
50-
f.OnError(err)
50+
rec := recover()
51+
if rec == nil {
52+
return
53+
}
54+
if e, ok := rec.(error); ok {
55+
f.OnError(errors.WithStack(e))
56+
} else {
57+
f.OnError(errors.Errorf("%v", rec))
5158
}
5259
}()
5360
if f.predicate(v) {

mono/op_flatmap.go

+26-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync/atomic"
66

77
"github.com/jjeffcaii/reactor-go"
8+
"github.com/pkg/errors"
89
)
910

1011
const (
@@ -71,11 +72,34 @@ func (p *flatMapSubscriber) OnNext(v Any) {
7172
if atomic.LoadInt32(&p.stat) != 0 {
7273
return
7374
}
74-
m := p.mapper(v)
75+
nextMono, err := p.computeNextMono(v)
76+
if err != nil {
77+
p.actual.OnError(err)
78+
return
79+
}
7580
inner := &innerFlatMapSubscriber{
7681
parent: p,
7782
}
78-
m.SubscribeWith(p.ctx, inner)
83+
nextMono.SubscribeWith(p.ctx, inner)
84+
}
85+
86+
func (p *flatMapSubscriber) computeNextMono(v Any) (next Mono, err error) {
87+
defer func() {
88+
rec := recover()
89+
if rec == nil {
90+
return
91+
}
92+
if e, ok := rec.(error); ok {
93+
err = errors.WithStack(e)
94+
} else {
95+
err = errors.Errorf("%v", rec)
96+
}
97+
}()
98+
next = p.mapper(v)
99+
if next == nil {
100+
err = errors.New("the FlatMap result is nil")
101+
}
102+
return
79103
}
80104

81105
func (p *flatMapSubscriber) OnSubscribe(ctx context.Context, s reactor.Subscription) {

mono/op_map.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync"
66

77
"github.com/jjeffcaii/reactor-go"
8+
"github.com/pkg/errors"
89
)
910

1011
var _mapSubscriberPool = sync.Pool{
@@ -68,12 +69,25 @@ func (m *mapSubscriber) OnError(err error) {
6869

6970
func (m *mapSubscriber) OnNext(v Any) {
7071
if m == nil || m.actual == nil || m.t == nil {
72+
// TODO:
7173
return
7274
}
73-
if o, err := m.t(v); err != nil {
75+
defer func() {
76+
rec := recover()
77+
if rec == nil {
78+
return
79+
}
80+
if e, ok := rec.(error); ok {
81+
m.actual.OnError(errors.WithStack(e))
82+
} else {
83+
m.actual.OnError(errors.Errorf("%v", rec))
84+
}
85+
}()
86+
87+
if transformed, err := m.t(v); err != nil {
7488
m.actual.OnError(err)
7589
} else {
76-
m.actual.OnNext(o)
90+
m.actual.OnNext(transformed)
7791
}
7892
}
7993

0 commit comments

Comments
 (0)